projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix: sessiond: client/client_list lock inversion on disconnect
[lttng-tools.git]
/
src
/
bin
/
lttng-sessiond
/
notification-thread-events.c
diff --git
a/src/bin/lttng-sessiond/notification-thread-events.c
b/src/bin/lttng-sessiond/notification-thread-events.c
index 2a9423dcfafee2933b44a3b8947fd4dbc364ece6..ff810decec454228d7bc594410e811eef9ee211b 100644
(file)
--- a/
src/bin/lttng-sessiond/notification-thread-events.c
+++ b/
src/bin/lttng-sessiond/notification-thread-events.c
@@
-2523,10
+2523,8
@@
int handle_notification_thread_command(
client_id);
ret = 0;
} else {
client_id);
ret = 0;
} else {
- pthread_mutex_lock(&client->lock);
ret = client_handle_transmission_status(
client, client_status, state);
ret = client_handle_transmission_status(
client, client_status, state);
- pthread_mutex_unlock(&client->lock);
}
rcu_read_unlock();
break;
}
rcu_read_unlock();
break;
@@
-2579,14
+2577,11
@@
end:
return ret;
}
return ret;
}
-/* Client lock must be acquired by caller. */
static
int client_reset_inbound_state(struct notification_client *client)
{
int ret;
static
int client_reset_inbound_state(struct notification_client *client)
{
int ret;
- ASSERT_LOCKED(client->lock);
-
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, 0);
assert(!ret);
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, 0);
assert(!ret);
@@
-2617,6
+2612,7
@@
int handle_notification_thread_client_connect(
ret = -1;
goto error;
}
ret = -1;
goto error;
}
+
pthread_mutex_init(&client->lock, NULL);
client->id = state->next_notification_client_id++;
CDS_INIT_LIST_HEAD(&client->condition_list);
pthread_mutex_init(&client->lock, NULL);
client->id = state->next_notification_client_id++;
CDS_INIT_LIST_HEAD(&client->condition_list);
@@
-2624,9
+2620,7
@@
int handle_notification_thread_client_connect(
lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
client->communication.inbound.expect_creds = true;
lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
client->communication.inbound.expect_creds = true;
- pthread_mutex_lock(&client->lock);
ret = client_reset_inbound_state(client);
ret = client_reset_inbound_state(client);
- pthread_mutex_unlock(&client->lock);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
ret = 0;
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
ret = 0;
@@
-2676,13
+2670,16
@@
int handle_notification_thread_client_connect(
rcu_read_unlock();
return ret;
rcu_read_unlock();
return ret;
+
error:
notification_client_destroy(client, state);
return ret;
}
error:
notification_client_destroy(client, state);
return ret;
}
-/* RCU read-lock must be held by the caller. */
-/* Client lock must be held by the caller */
+/*
+ * RCU read-lock must be held by the caller.
+ * Client lock must _not_ be held by the caller.
+ */
static
int notification_thread_client_disconnect(
struct notification_client *client,
static
int notification_thread_client_disconnect(
struct notification_client *client,
@@
-2692,16
+2689,18
@@
int notification_thread_client_disconnect(
struct lttng_condition_list_element *condition_list_element, *tmp;
/* Acquire the client lock to disable its communication atomically. */
struct lttng_condition_list_element *condition_list_element, *tmp;
/* Acquire the client lock to disable its communication atomically. */
+ pthread_mutex_lock(&client->lock);
client->communication.active = false;
client->communication.active = false;
+ cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
+ cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
+ pthread_mutex_unlock(&client->lock);
+
ret = lttng_poll_del(&state->events, client->socket);
if (ret) {
ERR("[notification-thread] Failed to remove client socket %d from poll set",
client->socket);
}
ret = lttng_poll_del(&state->events, client->socket);
if (ret) {
ERR("[notification-thread] Failed to remove client socket %d from poll set",
client->socket);
}
- cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
- cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
-
/* Release all conditions to which the client was subscribed. */
cds_list_for_each_entry_safe(condition_list_element, tmp,
&client->condition_list, node) {
/* Release all conditions to which the client was subscribed. */
cds_list_for_each_entry_safe(condition_list_element, tmp,
&client->condition_list, node) {
@@
-2735,9
+2734,7
@@
int handle_notification_thread_client_disconnect(
goto end;
}
goto end;
}
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(client, state);
ret = notification_thread_client_disconnect(client, state);
- pthread_mutex_unlock(&client->lock);
end:
rcu_read_unlock();
return ret;
end:
rcu_read_unlock();
return ret;
@@
-2756,10
+2753,8
@@
int handle_notification_thread_client_disconnect_all(
client_socket_ht_node) {
int ret;
client_socket_ht_node) {
int ret;
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(
client, state);
ret = notification_thread_client_disconnect(
client, state);
- pthread_mutex_unlock(&client->lock);
if (ret) {
error_encoutered = true;
}
if (ret) {
error_encoutered = true;
}
@@
-2804,8
+2799,6
@@
int client_handle_transmission_status(
goto end;
}
goto end;
}
- client->communication.outbound.queued_command_reply = false;
- client->communication.outbound.dropped_notification = false;
break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
/*
break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
/*
@@
-2873,11
+2866,13
@@
enum client_transmission_status client_flush_outgoing_queue(
if (ret) {
goto error;
}
if (ret) {
goto error;
}
+
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
} else if (ret < 0) {
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
} else if (ret < 0) {
- /* Generic error, dis
connect the client
. */
+ /* Generic error, dis
able the client's communication
. */
ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
+ client->communication.active = false;
status = CLIENT_TRANSMISSION_STATUS_FAIL;
} else {
/* No error and flushed the queue completely. */
status = CLIENT_TRANSMISSION_STATUS_FAIL;
} else {
/* No error and flushed the queue completely. */
@@
-2886,6
+2881,9
@@
enum client_transmission_status client_flush_outgoing_queue(
if (ret) {
goto error;
}
if (ret) {
goto error;
}
+
+ client->communication.outbound.queued_command_reply = false;
+ client->communication.outbound.dropped_notification = false;
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
}
end:
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
}
end:
@@
-2894,7
+2892,7
@@
error:
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-/* Client lock must
be acquired by
caller. */
+/* Client lock must
_not_ be held by the
caller. */
static
int client_send_command_reply(struct notification_client *client,
struct notification_thread_state *state,
static
int client_send_command_reply(struct notification_client *client,
struct notification_thread_state *state,
@@
-2911,38
+2909,40
@@
int client_send_command_reply(struct notification_client *client,
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
- ASSERT_LOCKED(client->lock);
+ memcpy(buffer, &msg, sizeof(msg));
+ memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
+ DBG("[notification-thread] Send command reply (%i)", (int) status);
+ pthread_mutex_lock(&client->lock);
if (client->communication.outbound.queued_command_reply) {
/* Protocol error. */
if (client->communication.outbound.queued_command_reply) {
/* Protocol error. */
- goto error;
+ goto error
_unlock
;
}
}
- memcpy(buffer, &msg, sizeof(msg));
- memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
- DBG("[notification-thread] Send command reply (%i)", (int) status);
-
/* Enqueue buffer to outgoing queue and flush it. */
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer,
buffer, sizeof(buffer));
if (ret) {
/* Enqueue buffer to outgoing queue and flush it. */
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer,
buffer, sizeof(buffer));
if (ret) {
- goto error;
+ goto error
_unlock
;
}
transmission_status = client_flush_outgoing_queue(client);
}
transmission_status = client_flush_outgoing_queue(client);
+ if (client->communication.outbound.buffer.size != 0) {
+ /* Queue could not be emptied. */
+ client->communication.outbound.queued_command_reply = true;
+ }
+
+ pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(
client, transmission_status, state);
if (ret) {
goto error;
}
ret = client_handle_transmission_status(
client, transmission_status, state);
if (ret) {
goto error;
}
- if (client->communication.outbound.buffer.size != 0) {
- /* Queue could not be emptied. */
- client->communication.outbound.queued_command_reply = true;
- }
-
return 0;
return 0;
+error_unlock:
+ pthread_mutex_unlock(&client->lock);
error:
return -1;
}
error:
return -1;
}
@@
-2952,9
+2952,6
@@
int client_handle_message_unknown(struct notification_client *client,
struct notification_thread_state *state)
{
int ret;
struct notification_thread_state *state)
{
int ret;
-
- pthread_mutex_lock(&client->lock);
-
/*
* Receiving message header. The function will be called again
* once the rest of the message as been received and can be
/*
* Receiving message header. The function will be called again
* once the rest of the message as been received and can be
@@
-2991,7
+2988,6
@@
int client_handle_message_unknown(struct notification_client *client,
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, msg->size);
end:
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, msg->size);
end:
- pthread_mutex_unlock(&client->lock);
return ret;
}
return ret;
}
@@
-3012,9
+3008,6
@@
int client_handle_message_handshake(struct notification_client *client,
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
- enum client_transmission_status transmission_status;
-
- pthread_mutex_lock(&client->lock);
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
@@
-3045,39
+3038,38
@@
int client_handle_message_handshake(struct notification_client *client,
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
}
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
}
+ pthread_mutex_lock(&client->lock);
+ /* Outgoing queue will be flushed when the command reply is sent. */
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer, send_buffer,
sizeof(send_buffer));
if (ret) {
ERR("[notification-thread] Failed to send protocol version to notification channel client");
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer, send_buffer,
sizeof(send_buffer));
if (ret) {
ERR("[notification-thread] Failed to send protocol version to notification channel client");
- goto end;
+ goto end
_unlock
;
}
client->validated = true;
client->communication.active = true;
}
client->validated = true;
client->communication.active = true;
+ pthread_mutex_unlock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
- ret = client_handle_transmission_status(
- client, transmission_status, state);
+ /* Set reception state to receive the next message header. */
+ ret = client_reset_inbound_state(client);
if (ret) {
if (ret) {
+ ERR("[notification-thread] Failed to reset client communication's inbound state");
goto end;
}
goto end;
}
+ /* Flushes the outgoing queue. */
ret = client_send_command_reply(client, state, status);
if (ret) {
ERR("[notification-thread] Failed to send reply to notification channel client");
goto end;
}
ret = client_send_command_reply(client, state, status);
if (ret) {
ERR("[notification-thread] Failed to send reply to notification channel client");
goto end;
}
- /* Set reception state to receive the next message header. */
- ret = client_reset_inbound_state(client);
- if (ret) {
- ERR("[notification-thread] Failed to reset client communication's inbound state");
- goto end;
- }
-
-end:
+ goto end;
+end_unlock:
pthread_mutex_unlock(&client->lock);
pthread_mutex_unlock(&client->lock);
+end:
return ret;
}
return ret;
}
@@
-3097,10
+3089,12
@@
int client_handle_message_subscription(
0, -1);
size_t expected_condition_size;
0, -1);
size_t expected_condition_size;
- pthread_mutex_lock(&client->lock);
+ /*
+ * No need to lock client to sample the inbound state as the only
+ * other thread accessing clients (action executor) only uses the
+ * outbound state.
+ */
expected_condition_size = client->communication.inbound.buffer.size;
expected_condition_size = client->communication.inbound.buffer.size;
- pthread_mutex_unlock(&client->lock);
-
ret = lttng_condition_create_from_payload(&condition_view, &condition);
if (ret != expected_condition_size) {
ERR("[notification-thread] Malformed condition received from client");
ret = lttng_condition_create_from_payload(&condition_view, &condition);
if (ret != expected_condition_size) {
ERR("[notification-thread] Malformed condition received from client");
@@
-3114,26
+3108,24
@@
int client_handle_message_subscription(
ret = notification_thread_client_unsubscribe(
client, condition, state, &status);
}
ret = notification_thread_client_unsubscribe(
client, condition, state, &status);
}
- if (ret) {
- goto end;
- }
- pthread_mutex_lock(&client->lock);
- ret = client_send_command_reply(client, state, status);
if (ret) {
if (ret) {
- ERR("[notification-thread] Failed to send reply to notification channel client");
- goto end_unlock;
+ goto end;
}
/* Set reception state to receive the next message header. */
ret = client_reset_inbound_state(client);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
}
/* Set reception state to receive the next message header. */
ret = client_reset_inbound_state(client);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
- goto end_unlock;
+ goto end;
+ }
+
+ ret = client_send_command_reply(client, state, status);
+ if (ret) {
+ ERR("[notification-thread] Failed to send reply to notification channel client");
+ goto end;
}
}
-end_unlock:
- pthread_mutex_unlock(&client->lock);
end:
return ret;
}
end:
return ret;
}
@@
-3197,7
+3189,6
@@
int handle_notification_thread_client_in(
goto end;
}
goto end;
}
- pthread_mutex_lock(&client->lock);
offset = client->communication.inbound.buffer.size -
client->communication.inbound.bytes_to_receive;
if (client->communication.inbound.expect_creds) {
offset = client->communication.inbound.buffer.size -
client->communication.inbound.bytes_to_receive;
if (client->communication.inbound.expect_creds) {
@@
-3219,7
+3210,7
@@
int handle_notification_thread_client_in(
message_is_complete = client->communication.inbound
.bytes_to_receive == 0;
}
message_is_complete = client->communication.inbound
.bytes_to_receive == 0;
}
- pthread_mutex_unlock(&client->lock);
+
if (recv_ret < 0) {
goto error_disconnect_client;
}
if (recv_ret < 0) {
goto error_disconnect_client;
}
@@
-3238,9
+3229,7
@@
end:
rcu_read_unlock();
return ret;
error_disconnect_client:
rcu_read_unlock();
return ret;
error_disconnect_client:
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(client, state);
ret = notification_thread_client_disconnect(client, state);
- pthread_mutex_unlock(&client->lock);
goto end;
}
goto end;
}
@@
-3262,9
+3251,10
@@
int handle_notification_thread_client_out(
pthread_mutex_lock(&client->lock);
transmission_status = client_flush_outgoing_queue(client);
pthread_mutex_lock(&client->lock);
transmission_status = client_flush_outgoing_queue(client);
+ pthread_mutex_unlock(&client->lock);
+
ret = client_handle_transmission_status(
client, transmission_status, state);
ret = client_handle_transmission_status(
client, transmission_status, state);
- pthread_mutex_unlock(&client->lock);
if (ret) {
goto end;
}
if (ret) {
goto end;
}
@@
-3575,6
+3565,15
@@
int notification_client_list_send_evaluation(
ret = 0;
pthread_mutex_lock(&client->lock);
ret = 0;
pthread_mutex_lock(&client->lock);
+ if (!client->communication.active) {
+ /*
+ * Skip inactive client (protocol error or
+ * disconnecting).
+ */
+ DBG("Skipping client at it is marked as inactive");
+ goto skip_client;
+ }
+
if (source_object_creds) {
if (client->uid != source_object_creds->uid &&
client->gid != source_object_creds->gid &&
if (source_object_creds) {
if (client->uid != source_object_creds->uid &&
client->gid != source_object_creds->gid &&
@@
-3584,13
+3583,13
@@
int notification_client_list_send_evaluation(
* object.
*/
DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
* object.
*/
DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
- goto
unlock
_client;
+ goto
skip
_client;
}
}
if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
}
}
if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
- goto
unlock
_client;
+ goto
skip
_client;
}
DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
}
DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
@@
-3605,7
+3604,8
@@
int notification_client_list_send_evaluation(
*/
ret = client_notification_overflow(client);
if (ret) {
*/
ret = client_notification_overflow(client);
if (ret) {
- goto unlock_client;
+ /* Fatal error. */
+ goto skip_client;
}
}
}
}
@@
-3613,17
+3613,24
@@
int notification_client_list_send_evaluation(
&client->communication.outbound.buffer,
&msg_payload.buffer);
if (ret) {
&client->communication.outbound.buffer,
&msg_payload.buffer);
if (ret) {
- goto unlock_client;
+ /* Fatal error. */
+ goto skip_client;
}
transmission_status = client_flush_outgoing_queue(client);
}
transmission_status = client_flush_outgoing_queue(client);
+ pthread_mutex_unlock(&client->lock);
ret = client_report(client, transmission_status, user_data);
if (ret) {
ret = client_report(client, transmission_status, user_data);
if (ret) {
- goto unlock_client;
+ /* Fatal error. */
+ goto end_unlock_list;
}
}
-unlock_client:
+
+ continue;
+
+skip_client:
pthread_mutex_unlock(&client->lock);
if (ret) {
pthread_mutex_unlock(&client->lock);
if (ret) {
+ /* Fatal error. */
goto end_unlock_list;
}
}
goto end_unlock_list;
}
}
This page took
0.031271 seconds
and
4
git commands to generate.