X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.cpp;fp=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.cpp;h=95932b972e424273ccae84b56fea671af751262e;hp=909ccbff2cc1b6f36eba822957bdfec10a11a957;hb=9016dbfc649a21e911ed0d5cc19bb68db35af531;hpb=8a880a84dcb8ee64cbc4a5e04cae6775b4f9babd diff --git a/src/bin/lttng-sessiond/notification-thread-events.cpp b/src/bin/lttng-sessiond/notification-thread-events.cpp index 909ccbff2..95932b972 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.cpp +++ b/src/bin/lttng-sessiond/notification-thread-events.cpp @@ -47,8 +47,8 @@ #include "lttng-sessiond.hpp" #include "kernel.hpp" -#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) -#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT) +#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) +#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT) /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */ #define MAX_CAPTURE_SIZE (PIPE_BUF) @@ -3394,9 +3394,9 @@ int handle_notification_thread_client_connect( goto error; } + client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN; ret = lttng_poll_add(&state->events, client->socket, - LPOLLIN | LPOLLERR | - LPOLLHUP | LPOLLRDHUP); + client->communication.current_poll_events); if (ret < 0) { ERR("Failed to add notification channel client socket to poll set"); ret = 0; @@ -3530,6 +3530,18 @@ int handle_notification_thread_trigger_unregister_all( return error_occurred ? -1 : 0; } +static +bool client_has_outbound_data_left( + const struct notification_client *client) +{ + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const bool has_data = pv.buffer.size != 0; + const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); + + return has_data || has_fds; +} + static int client_handle_transmission_status( struct notification_client *client, @@ -3540,24 +3552,51 @@ int client_handle_transmission_status( switch (transmission_status) { case CLIENT_TRANSMISSION_STATUS_COMPLETE: - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN); - if (ret) { - goto end; - } - - break; case CLIENT_TRANSMISSION_STATUS_QUEUED: + { + int current_poll_events; + int new_poll_events; /* * We want to be notified whenever there is buffer space - * available to send the rest of the payload. + * available to send the rest of the payload if we are + * waiting to send data to the client. + * + * The state of the outbound queue being sampled here is + * fine since: + * - it is okay to wake-up "for nothing" in case we see + * that data is left, but another thread succeeds in + * flushing it before us when handling the client "out" + * event. We will simply stop monitoring that event the next + * time it wakes us up and we see no data left to be sent, + * - if another thread fails to flush the entire client + * outgoing queue, it will issue a "communication update" + * command and cause the client's (e)poll mask to be + * re-evaluated. + * + * The situation we seek to avoid would be to disable the + * monitoring of "out" client events indefinitely when there is + * data to be sent, which can't happen because of the + * aforementioned "communication update" mechanism. */ - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN_OUT); - if (ret) { - goto end; + pthread_mutex_lock(&client->lock); + current_poll_events = client->communication.current_poll_events; + new_poll_events = client_has_outbound_data_left(client) ? + CLIENT_POLL_EVENTS_IN_OUT : + CLIENT_POLL_EVENTS_IN; + client->communication.current_poll_events = new_poll_events; + pthread_mutex_unlock(&client->lock); + + /* Update the monitored event set only if it changed. */ + if (current_poll_events != new_poll_events) { + ret = lttng_poll_mod(&state->events, client->socket, + new_poll_events); + if (ret) { + goto end; + } } + break; + } case CLIENT_TRANSMISSION_STATUS_FAIL: ret = notification_thread_client_disconnect(client, state); if (ret) { @@ -3697,18 +3736,6 @@ error: return CLIENT_TRANSMISSION_STATUS_ERROR; } -static -bool client_has_outbound_data_left( - const struct notification_client *client) -{ - const struct lttng_payload_view pv = lttng_payload_view_from_payload( - &client->communication.outbound.payload, 0, -1); - const bool has_data = pv.buffer.size != 0; - const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); - - return has_data || has_fds; -} - /* Client lock must _not_ be held by the caller. */ static int client_send_command_reply(struct notification_client *client, @@ -4117,7 +4144,47 @@ int handle_notification_thread_client_out( } pthread_mutex_lock(&client->lock); - transmission_status = client_flush_outgoing_queue(client); + if (!client_has_outbound_data_left(client)) { + /* + * A client "out" event can be received when no payload is left + * to send under some circumstances. + * + * Many threads can flush a client's outgoing queue and, if they + * had to queue their message (socket was full), will use the + * "communication update" command to signal the (e)poll thread + * to monitor for space being made available in the socket. + * + * Commands are sent over an internal pipe serviced by the same + * thread as the client sockets. + * + * When space is made available in the socket, there is a race + * between the (e)poll thread and the other threads that may + * wish to use the client's socket to flush its outgoing queue. + * + * A non-(e)poll thread may attempt (and succeed) in flushing + * the queue before the (e)poll thread gets a chance to service + * the client's "out" event. + * + * In this situation, the (e)poll thread processing the client + * out event will see an empty payload: there is nothing to do + * except unsubscribing (e)poll "out" events. + * + * Note that this thread is the (e)poll thread so it can modify + * the (e)poll mask directly without using a communication + * update command. Other threads that flush the outgoing queue + * will use the "communication update" command to wake up this + * thread and force it to monitor "out" events. + * + * When other threads succeed in emptying the outgoing queue, + * they don't need to update the (e)poll mask: if the "out" + * event is monitored, it will fire once and the (e)poll + * thread will reach this condition, causing the event to + * stop being monitored. + */ + transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + } else { + transmission_status = client_flush_outgoing_queue(client); + } pthread_mutex_unlock(&client->lock); ret = client_handle_transmission_status(