X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.c;h=95101178f2f08c7d40bd0ae242ddf79193e45095;hp=2a9423dcfafee2933b44a3b8947fd4dbc364ece6;hb=6f1105342bcca0c5ba8177ae134c197c19ba215f;hpb=98b5ff34df5d330fe139907ed77b4327ce9afa48 diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index 2a9423dcf..95101178f 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -1184,8 +1184,8 @@ void notification_client_destroy(struct notification_client *client, client->socket = -1; } client->communication.active = false; - lttng_dynamic_buffer_reset(&client->communication.inbound.buffer); - lttng_dynamic_buffer_reset(&client->communication.outbound.buffer); + lttng_payload_reset(&client->communication.inbound.payload); + lttng_payload_reset(&client->communication.outbound.payload); pthread_mutex_destroy(&client->lock); call_rcu(&client->rcu_node, free_notification_client_rcu); } @@ -2523,10 +2523,8 @@ int handle_notification_thread_command( client_id); ret = 0; } else { - pthread_mutex_lock(&client->lock); ret = client_handle_transmission_status( client, client_status, state); - pthread_mutex_unlock(&client->lock); } rcu_read_unlock(); break; @@ -2579,17 +2577,13 @@ end: return ret; } -/* Client lock must be acquired by caller. */ static int client_reset_inbound_state(struct notification_client *client) { int ret; - ASSERT_LOCKED(client->lock); - ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, 0); - assert(!ret); + lttng_payload_clear(&client->communication.inbound.payload); client->communication.inbound.bytes_to_receive = sizeof(struct lttng_notification_channel_message); @@ -2598,8 +2592,9 @@ int client_reset_inbound_state(struct notification_client *client) LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1); LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1); ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, + &client->communication.inbound.payload.buffer, client->communication.inbound.bytes_to_receive); + return ret; } @@ -2617,16 +2612,15 @@ int handle_notification_thread_client_connect( ret = -1; goto error; } + pthread_mutex_init(&client->lock, NULL); client->id = state->next_notification_client_id++; CDS_INIT_LIST_HEAD(&client->condition_list); - lttng_dynamic_buffer_init(&client->communication.inbound.buffer); - lttng_dynamic_buffer_init(&client->communication.outbound.buffer); + lttng_payload_init(&client->communication.inbound.payload); + lttng_payload_init(&client->communication.outbound.payload); client->communication.inbound.expect_creds = true; - pthread_mutex_lock(&client->lock); ret = client_reset_inbound_state(client); - pthread_mutex_unlock(&client->lock); if (ret) { ERR("[notification-thread] Failed to reset client communication's inbound state"); ret = 0; @@ -2676,13 +2670,16 @@ int handle_notification_thread_client_connect( rcu_read_unlock(); return ret; + error: notification_client_destroy(client, state); return ret; } -/* RCU read-lock must be held by the caller. */ -/* Client lock must be held by the caller */ +/* + * RCU read-lock must be held by the caller. + * Client lock must _not_ be held by the caller. + */ static int notification_thread_client_disconnect( struct notification_client *client, @@ -2692,16 +2689,18 @@ int notification_thread_client_disconnect( struct lttng_condition_list_element *condition_list_element, *tmp; /* Acquire the client lock to disable its communication atomically. */ + pthread_mutex_lock(&client->lock); client->communication.active = false; + cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node); + cds_lfht_del(state->client_id_ht, &client->client_id_ht_node); + pthread_mutex_unlock(&client->lock); + ret = lttng_poll_del(&state->events, client->socket); if (ret) { ERR("[notification-thread] Failed to remove client socket %d from poll set", client->socket); } - cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node); - cds_lfht_del(state->client_id_ht, &client->client_id_ht_node); - /* Release all conditions to which the client was subscribed. */ cds_list_for_each_entry_safe(condition_list_element, tmp, &client->condition_list, node) { @@ -2735,9 +2734,7 @@ int handle_notification_thread_client_disconnect( goto end; } - pthread_mutex_lock(&client->lock); ret = notification_thread_client_disconnect(client, state); - pthread_mutex_unlock(&client->lock); end: rcu_read_unlock(); return ret; @@ -2756,10 +2753,8 @@ int handle_notification_thread_client_disconnect_all( client_socket_ht_node) { int ret; - pthread_mutex_lock(&client->lock); ret = notification_thread_client_disconnect( client, state); - pthread_mutex_unlock(&client->lock); if (ret) { error_encoutered = true; } @@ -2804,8 +2799,6 @@ int client_handle_transmission_status( goto end; } - client->communication.outbound.queued_command_reply = false; - client->communication.outbound.dropped_notification = false; break; case CLIENT_TRANSMISSION_STATUS_QUEUED: /* @@ -2842,6 +2835,10 @@ enum client_transmission_status client_flush_outgoing_queue( ssize_t ret; size_t to_send_count; enum client_transmission_status status; + struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const int fds_to_send_count = + lttng_payload_view_get_fd_handle_count(&pv); ASSERT_LOCKED(client->lock); @@ -2850,51 +2847,122 @@ enum client_transmission_status client_flush_outgoing_queue( goto end; } - assert(client->communication.outbound.buffer.size != 0); - to_send_count = client->communication.outbound.buffer.size; + if (pv.buffer.size == 0) { + /* + * If both data and fds are equal to zero, we are in an invalid + * state. + */ + assert(fds_to_send_count != 0); + goto send_fds; + } + + /* Send data. */ + to_send_count = pv.buffer.size; DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue", client->socket); ret = lttcomm_send_unix_sock_non_block(client->socket, - client->communication.outbound.buffer.data, + pv.buffer.data, to_send_count); if ((ret >= 0 && ret < to_send_count)) { DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed", client->socket); to_send_count -= max(ret, 0); - memcpy(client->communication.outbound.buffer.data, - client->communication.outbound.buffer.data + - client->communication.outbound.buffer.size - to_send_count, + memmove(client->communication.outbound.payload.buffer.data, + pv.buffer.data + + pv.buffer.size - to_send_count, to_send_count); ret = lttng_dynamic_buffer_set_size( - &client->communication.outbound.buffer, + &client->communication.outbound.payload.buffer, to_send_count); if (ret) { goto error; } + status = CLIENT_TRANSMISSION_STATUS_QUEUED; + goto end; } else if (ret < 0) { - /* Generic error, disconnect the client. */ + /* Generic error, disable the client's communication. */ ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)", client->socket); + client->communication.active = false; status = CLIENT_TRANSMISSION_STATUS_FAIL; + goto end; } else { - /* No error and flushed the queue completely. */ + /* + * No error and flushed the queue completely. + * + * The payload buffer size is used later to + * check if there is notifications queued. So albeit that the + * direct caller knows that the transmission is complete, we + * need to set the buffer size to zero. + */ ret = lttng_dynamic_buffer_set_size( - &client->communication.outbound.buffer, 0); + &client->communication.outbound.payload.buffer, 0); if (ret) { goto error; } + } + +send_fds: + /* No fds to send, transmission is complete. */ + if (fds_to_send_count == 0) { + status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + goto end; + } + + ret = lttcomm_send_payload_view_fds_unix_sock_non_block( + client->socket, &pv); + if (ret < 0) { + /* Generic error, disable the client's communication. */ + ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)", + client->socket); + client->communication.active = false; + status = CLIENT_TRANSMISSION_STATUS_FAIL; + goto end; + } else if (ret == 0) { + /* Nothing could be sent. */ + status = CLIENT_TRANSMISSION_STATUS_QUEUED; + } else { + /* Fd passing is an all or nothing kind of thing. */ status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + /* + * The payload _fd_array count is used later to + * check if there is notifications queued. So although the + * direct caller knows that the transmission is complete, we + * need to clear the _fd_array for the queuing check. + */ + lttng_dynamic_pointer_array_clear( + &client->communication.outbound.payload + ._fd_handles); } + end: + if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) { + client->communication.outbound.queued_command_reply = false; + client->communication.outbound.dropped_notification = false; + lttng_payload_clear(&client->communication.outbound.payload); + } + return status; error: return CLIENT_TRANSMISSION_STATUS_ERROR; } -/* Client lock must be acquired by caller. */ +static +bool client_has_outbound_data_left( + const struct notification_client *client) +{ + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const bool has_data = pv.buffer.size != 0; + const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); + + return has_data || has_fds; +} + +/* Client lock must _not_ be held by the caller. */ static int client_send_command_reply(struct notification_client *client, struct notification_thread_state *state, @@ -2911,38 +2979,41 @@ int client_send_command_reply(struct notification_client *client, char buffer[sizeof(msg) + sizeof(reply)]; enum client_transmission_status transmission_status; - ASSERT_LOCKED(client->lock); + memcpy(buffer, &msg, sizeof(msg)); + memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); + DBG("[notification-thread] Send command reply (%i)", (int) status); + pthread_mutex_lock(&client->lock); if (client->communication.outbound.queued_command_reply) { /* Protocol error. */ - goto error; + goto error_unlock; } - memcpy(buffer, &msg, sizeof(msg)); - memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); - DBG("[notification-thread] Send command reply (%i)", (int) status); - /* Enqueue buffer to outgoing queue and flush it. */ ret = lttng_dynamic_buffer_append( - &client->communication.outbound.buffer, + &client->communication.outbound.payload.buffer, buffer, sizeof(buffer)); if (ret) { - goto error; + goto error_unlock; } transmission_status = client_flush_outgoing_queue(client); + + if (client_has_outbound_data_left(client)) { + /* Queue could not be emptied. */ + client->communication.outbound.queued_command_reply = true; + } + + pthread_mutex_unlock(&client->lock); ret = client_handle_transmission_status( client, transmission_status, state); if (ret) { goto error; } - if (client->communication.outbound.buffer.size != 0) { - /* Queue could not be emptied. */ - client->communication.outbound.queued_command_reply = true; - } - return 0; +error_unlock: + pthread_mutex_unlock(&client->lock); error: return -1; } @@ -2952,9 +3023,6 @@ int client_handle_message_unknown(struct notification_client *client, struct notification_thread_state *state) { int ret; - - pthread_mutex_lock(&client->lock); - /* * Receiving message header. The function will be called again * once the rest of the message as been received and can be @@ -2962,9 +3030,9 @@ int client_handle_message_unknown(struct notification_client *client, */ const struct lttng_notification_channel_message *msg; - assert(sizeof(*msg) == client->communication.inbound.buffer.size); + assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size); msg = (const struct lttng_notification_channel_message *) - client->communication.inbound.buffer.data; + client->communication.inbound.payload.buffer.data; if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) { @@ -2986,12 +3054,15 @@ int client_handle_message_unknown(struct notification_client *client, } client->communication.inbound.bytes_to_receive = msg->size; + client->communication.inbound.fds_to_receive = msg->fds; client->communication.inbound.msg_type = (enum lttng_notification_channel_message_type) msg->type; ret = lttng_dynamic_buffer_set_size( - &client->communication.inbound.buffer, msg->size); + &client->communication.inbound.payload.buffer, msg->size); + + /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */ + msg = NULL; end: - pthread_mutex_unlock(&client->lock); return ret; } @@ -3012,9 +3083,6 @@ int client_handle_message_handshake(struct notification_client *client, enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)]; - enum client_transmission_status transmission_status; - - pthread_mutex_lock(&client->lock); memcpy(send_buffer, &msg_header, sizeof(msg_header)); memcpy(send_buffer + sizeof(msg_header), &handshake_reply, @@ -3022,7 +3090,7 @@ int client_handle_message_handshake(struct notification_client *client, handshake_client = (struct lttng_notification_channel_command_handshake *) - client->communication.inbound.buffer + client->communication.inbound.payload.buffer .data; client->major = handshake_client->major; client->minor = handshake_client->minor; @@ -3045,39 +3113,38 @@ int client_handle_message_handshake(struct notification_client *client, status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION; } + pthread_mutex_lock(&client->lock); + /* Outgoing queue will be flushed when the command reply is sent. */ ret = lttng_dynamic_buffer_append( - &client->communication.outbound.buffer, send_buffer, + &client->communication.outbound.payload.buffer, send_buffer, sizeof(send_buffer)); if (ret) { ERR("[notification-thread] Failed to send protocol version to notification channel client"); - goto end; + goto end_unlock; } client->validated = true; client->communication.active = true; + pthread_mutex_unlock(&client->lock); - transmission_status = client_flush_outgoing_queue(client); - ret = client_handle_transmission_status( - client, transmission_status, state); + /* Set reception state to receive the next message header. */ + ret = client_reset_inbound_state(client); if (ret) { + ERR("[notification-thread] Failed to reset client communication's inbound state"); goto end; } + /* Flushes the outgoing queue. */ ret = client_send_command_reply(client, state, status); if (ret) { ERR("[notification-thread] Failed to send reply to notification channel client"); goto end; } - /* Set reception state to receive the next message header. */ - ret = client_reset_inbound_state(client); - if (ret) { - ERR("[notification-thread] Failed to reset client communication's inbound state"); - goto end; - } - -end: + goto end; +end_unlock: pthread_mutex_unlock(&client->lock); +end: return ret; } @@ -3092,15 +3159,17 @@ int client_handle_message_subscription( enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; struct lttng_payload_view condition_view = - lttng_payload_view_from_dynamic_buffer( - &client->communication.inbound.buffer, + lttng_payload_view_from_payload( + &client->communication.inbound.payload, 0, -1); size_t expected_condition_size; - pthread_mutex_lock(&client->lock); - expected_condition_size = client->communication.inbound.buffer.size; - pthread_mutex_unlock(&client->lock); - + /* + * No need to lock client to sample the inbound state as the only + * other thread accessing clients (action executor) only uses the + * outbound state. + */ + expected_condition_size = client->communication.inbound.payload.buffer.size; ret = lttng_condition_create_from_payload(&condition_view, &condition); if (ret != expected_condition_size) { ERR("[notification-thread] Malformed condition received from client"); @@ -3114,26 +3183,24 @@ int client_handle_message_subscription( ret = notification_thread_client_unsubscribe( client, condition, state, &status); } - if (ret) { - goto end; - } - pthread_mutex_lock(&client->lock); - ret = client_send_command_reply(client, state, status); if (ret) { - ERR("[notification-thread] Failed to send reply to notification channel client"); - goto end_unlock; + goto end; } /* Set reception state to receive the next message header. */ ret = client_reset_inbound_state(client); if (ret) { ERR("[notification-thread] Failed to reset client communication's inbound state"); - goto end_unlock; + goto end; + } + + ret = client_send_command_reply(client, state, status); + if (ret) { + ERR("[notification-thread] Failed to send reply to notification channel client"); + goto end; } -end_unlock: - pthread_mutex_unlock(&client->lock); end: return ret; } @@ -3187,7 +3254,6 @@ int handle_notification_thread_client_in( struct notification_client *client; ssize_t recv_ret; size_t offset; - bool message_is_complete = false; rcu_read_lock(); client = get_client_from_socket(socket, state); @@ -3197,12 +3263,11 @@ int handle_notification_thread_client_in( goto end; } - pthread_mutex_lock(&client->lock); - offset = client->communication.inbound.buffer.size - + offset = client->communication.inbound.payload.buffer.size - client->communication.inbound.bytes_to_receive; if (client->communication.inbound.expect_creds) { recv_ret = lttcomm_recv_creds_unix_sock(socket, - client->communication.inbound.buffer.data + offset, + client->communication.inbound.payload.buffer.data + offset, client->communication.inbound.bytes_to_receive, &client->communication.inbound.creds); if (recv_ret > 0) { @@ -3211,36 +3276,68 @@ int handle_notification_thread_client_in( } } else { recv_ret = lttcomm_recv_unix_sock_non_block(socket, - client->communication.inbound.buffer.data + offset, + client->communication.inbound.payload.buffer.data + offset, client->communication.inbound.bytes_to_receive); } if (recv_ret >= 0) { client->communication.inbound.bytes_to_receive -= recv_ret; - message_is_complete = client->communication.inbound - .bytes_to_receive == 0; - } - pthread_mutex_unlock(&client->lock); - if (recv_ret < 0) { + } else { goto error_disconnect_client; } - if (message_is_complete) { - ret = client_dispatch_message(client, state); - if (ret) { + if (client->communication.inbound.bytes_to_receive != 0) { + /* Message incomplete wait for more data. */ + ret = 0; + goto end; + } + + assert(client->communication.inbound.bytes_to_receive == 0); + + /* Receive fds. */ + if (client->communication.inbound.fds_to_receive != 0) { + ret = lttcomm_recv_payload_fds_unix_sock_non_block( + client->socket, + client->communication.inbound.fds_to_receive, + &client->communication.inbound.payload); + if (ret > 0) { /* - * Only returns an error if this client must be - * disconnected. + * Fds received. non blocking fds passing is all + * or nothing. */ + ssize_t expected_size; + + expected_size = sizeof(int) * + client->communication.inbound + .fds_to_receive; + assert(ret == expected_size); + client->communication.inbound.fds_to_receive = 0; + } else if (ret == 0) { + /* Received nothing. */ + ret = 0; + goto end; + } else { goto error_disconnect_client; } } + + /* At this point the message is complete.*/ + assert(client->communication.inbound.bytes_to_receive == 0 && + client->communication.inbound.fds_to_receive == 0); + ret = client_dispatch_message(client, state); + if (ret) { + /* + * Only returns an error if this client must be + * disconnected. + */ + goto error_disconnect_client; + } + end: rcu_read_unlock(); return ret; + error_disconnect_client: - pthread_mutex_lock(&client->lock); ret = notification_thread_client_disconnect(client, state); - pthread_mutex_unlock(&client->lock); goto end; } @@ -3262,9 +3359,10 @@ int handle_notification_thread_client_out( pthread_mutex_lock(&client->lock); transmission_status = client_flush_outgoing_queue(client); + pthread_mutex_unlock(&client->lock); + ret = client_handle_transmission_status( client, transmission_status, state); - pthread_mutex_unlock(&client->lock); if (ret) { goto end; } @@ -3458,7 +3556,7 @@ int client_notification_overflow(struct notification_client *client) client->communication.outbound.dropped_notification = true; ret = lttng_dynamic_buffer_append( - &client->communication.outbound.buffer, &msg, + &client->communication.outbound.payload.buffer, &msg, sizeof(msg)); if (ret) { PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue", @@ -3566,6 +3664,16 @@ int notification_client_list_send_evaluation( ->size = (uint32_t)( msg_payload.buffer.size - sizeof(msg_header)); + /* Update the payload number of fds. */ + { + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &msg_payload, 0, -1); + + ((struct lttng_notification_channel_message *) + msg_payload.buffer.data)->fds = (uint32_t) + lttng_payload_view_get_fd_handle_count(&pv); + } + pthread_mutex_lock(&client_list->lock); cds_list_for_each_entry_safe(client_list_element, tmp, &client_list->list, node) { @@ -3575,6 +3683,15 @@ int notification_client_list_send_evaluation( ret = 0; pthread_mutex_lock(&client->lock); + if (!client->communication.active) { + /* + * Skip inactive client (protocol error or + * disconnecting). + */ + DBG("Skipping client at it is marked as inactive"); + goto skip_client; + } + if (source_object_creds) { if (client->uid != source_object_creds->uid && client->gid != source_object_creds->gid && @@ -3584,18 +3701,19 @@ int notification_client_list_send_evaluation( * object. */ DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger"); - goto unlock_client; + goto skip_client; } } if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) { DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger"); - goto unlock_client; + goto skip_client; } DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)", client->socket, msg_payload.buffer.size); - if (client->communication.outbound.buffer.size) { + + if (client_has_outbound_data_left(client)) { /* * Outgoing data is already buffered for this client; * drop the notification and enqueue a "dropped @@ -3605,25 +3723,31 @@ int notification_client_list_send_evaluation( */ ret = client_notification_overflow(client); if (ret) { - goto unlock_client; + /* Fatal error. */ + goto skip_client; } } - ret = lttng_dynamic_buffer_append_buffer( - &client->communication.outbound.buffer, - &msg_payload.buffer); + ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload); if (ret) { - goto unlock_client; + /* Fatal error. */ + goto skip_client; } transmission_status = client_flush_outgoing_queue(client); + pthread_mutex_unlock(&client->lock); ret = client_report(client, transmission_status, user_data); if (ret) { - goto unlock_client; + /* Fatal error. */ + goto end_unlock_list; } -unlock_client: + + continue; + +skip_client: pthread_mutex_unlock(&client->lock); if (ret) { + /* Fatal error. */ goto end_unlock_list; } }