Fix: sessiond: assert on empty payload when handling client out event
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
index c5ee4b11ed02052a77f014103b675e93336d6f71..76a7363bf8d03bb907f420abdbd377deff7f8563 100644 (file)
@@ -48,8 +48,8 @@
 #include "lttng-sessiond.h"
 #include "kernel.h"
 
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
 
 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
 #define MAX_CAPTURE_SIZE (PIPE_BUF)
@@ -2039,7 +2039,7 @@ int handle_notification_thread_command_add_tracer_event_source(
                        lttng_domain_type_str(domain_type));
 
        /* Adding the read side pipe to the event poll. */
-       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
        if (ret < 0) {
                ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
                                tracer_event_source_fd,
@@ -2125,7 +2125,7 @@ find_tracer_event_source_element(struct notification_thread_state *state,
 
        source_element = NULL;
 end:
-       return NULL;
+       return source_element;
 }
 
 static
@@ -2153,6 +2153,12 @@ int remove_tracer_event_source_from_pollset(
 
        source_element->is_fd_in_poll_set = false;
 
+       /*
+        * Force the notification thread to restart the poll() loop to ensure
+        * that any events from the removed fd are removed.
+        */
+       state->restart_poll = true;
+
        ret = drain_event_notifier_notification_pipe(state, source_element->fd,
                        source_element->domain);
        if (ret) {
@@ -3103,28 +3109,55 @@ end:
        return 0;
 }
 
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+               struct notification_thread_command **cmd)
+{
+       int ret;
+       uint64_t counter;
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+       if (ret != sizeof(counter)) {
+               ret = -1;
+               goto error_unlock;
+       }
+
+       /* Simulate behaviour of EFD_SEMAPHORE for older kernels. */
+       counter -= 1;
+       if (counter != 0) {
+               ret = lttng_write(handle->cmd_queue.event_fd, &counter,
+                               sizeof(counter));
+               if (ret != sizeof(counter)) {
+                       PERROR("Failed to write back to event_fd for EFD_SEMAPHORE emulation");
+                       ret = -1;
+                       goto error_unlock;
+               }
+       }
+
+       *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+                       struct notification_thread_command, cmd_list_node);
+       cds_list_del(&((*cmd)->cmd_list_node));
+       ret = 0;
+
+error_unlock:
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       return ret;
+}
+
 /* Returns 0 on success, 1 on exit requested, negative value on error. */
 int handle_notification_thread_command(
                struct notification_thread_handle *handle,
                struct notification_thread_state *state)
 {
        int ret;
-       uint64_t counter;
        struct notification_thread_command *cmd;
 
-       /* Read the event pipe to put it back into a quiescent state. */
-       ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
-                       sizeof(counter));
-       if (ret != sizeof(counter)) {
+       ret = pop_cmd_queue(handle, &cmd);
+       if (ret) {
                goto error;
        }
 
-       pthread_mutex_lock(&handle->cmd_queue.lock);
-       cmd = cds_list_first_entry(&handle->cmd_queue.list,
-                       struct notification_thread_command, cmd_list_node);
-       cds_list_del(&cmd->cmd_list_node);
-       pthread_mutex_unlock(&handle->cmd_queue.lock);
-
        DBG("Received `%s` command",
                        notification_command_type_str(cmd->type));
        switch (cmd->type) {
@@ -3359,9 +3392,9 @@ int handle_notification_thread_client_connect(
                goto error;
        }
 
+       client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
        ret = lttng_poll_add(&state->events, client->socket,
-                       LPOLLIN | LPOLLERR |
-                       LPOLLHUP | LPOLLRDHUP);
+                       client->communication.current_poll_events);
        if (ret < 0) {
                ERR("Failed to add notification channel client socket to poll set");
                ret = 0;
@@ -3493,6 +3526,18 @@ int handle_notification_thread_trigger_unregister_all(
        return error_occurred ? -1 : 0;
 }
 
+static
+bool client_has_outbound_data_left(
+               const struct notification_client *client)
+{
+       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+                       &client->communication.outbound.payload, 0, -1);
+       const bool has_data = pv.buffer.size != 0;
+       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+       return has_data || has_fds;
+}
+
 static
 int client_handle_transmission_status(
                struct notification_client *client,
@@ -3503,24 +3548,51 @@ int client_handle_transmission_status(
 
        switch (transmission_status) {
        case CLIENT_TRANSMISSION_STATUS_COMPLETE:
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN);
-               if (ret) {
-                       goto end;
-               }
-
-               break;
        case CLIENT_TRANSMISSION_STATUS_QUEUED:
+       {
+               int current_poll_events;
+               int new_poll_events;
                /*
                 * We want to be notified whenever there is buffer space
-                * available to send the rest of the payload.
+                * available to send the rest of the payload if we are
+                * waiting to send data to the client.
+                *
+                * The state of the outbound queue being sampled here is
+                * fine since:
+                *   - it is okay to wake-up "for nothing" in case we see
+                *     that data is left, but another thread succeeds in
+                *     flushing it before us when handling the client "out"
+                *     event. We will simply stop monitoring that event the next
+                *     time it wakes us up and we see no data left to be sent,
+                *   - if another thread fails to flush the entire client
+                *     outgoing queue, it will issue a "communication update"
+                *     command and cause the client's (e)poll mask to be
+                *     re-evaluated.
+                *
+                * The situation we seek to avoid would be to disable the
+                * monitoring of "out" client events indefinitely when there is
+                * data to be sent, which can't happen because of the
+                * aforementioned "communication update" mechanism.
                 */
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN_OUT);
-               if (ret) {
-                       goto end;
+               pthread_mutex_lock(&client->lock);
+               current_poll_events = client->communication.current_poll_events;
+               new_poll_events = client_has_outbound_data_left(client) ?
+                               CLIENT_POLL_EVENTS_IN_OUT :
+                                     CLIENT_POLL_EVENTS_IN;
+               client->communication.current_poll_events = new_poll_events;
+               pthread_mutex_unlock(&client->lock);
+
+               /* Update the monitored event set only if it changed. */
+               if (current_poll_events != new_poll_events) {
+                       ret = lttng_poll_mod(&state->events, client->socket,
+                                       new_poll_events);
+                       if (ret) {
+                               goto end;
+                       }
                }
+
                break;
+       }
        case CLIENT_TRANSMISSION_STATUS_FAIL:
                ret = notification_thread_client_disconnect(client, state);
                if (ret) {
@@ -3660,18 +3732,6 @@ error:
        return CLIENT_TRANSMISSION_STATUS_ERROR;
 }
 
-static
-bool client_has_outbound_data_left(
-               const struct notification_client *client)
-{
-       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
-                       &client->communication.outbound.payload, 0, -1);
-       const bool has_data = pv.buffer.size != 0;
-       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
-       return has_data || has_fds;
-}
-
 /* Client lock must _not_ be held by the caller. */
 static
 int client_send_command_reply(struct notification_client *client,
@@ -3814,9 +3874,11 @@ int client_handle_message_handshake(struct notification_client *client,
                        &client->communication.inbound.creds);
        client->gid = LTTNG_SOCK_GET_GID_CRED(
                        &client->communication.inbound.creds);
-       DBG("Received handshake from client (uid = %u, gid = %u) with version %i.%i",
+       client->is_sessiond = LTTNG_SOCK_GET_PID_CRED(&client->communication.inbound.creds) == getpid();
+       DBG("Received handshake from client: uid = %u, gid = %u, protocol version = %i.%i, client is sessiond = %s",
                        client->uid, client->gid, (int) client->major,
-                       (int) client->minor);
+                       (int) client->minor,
+                       client->is_sessiond ? "true" : "false");
 
        if (handshake_client->major !=
                        LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
@@ -4076,7 +4138,47 @@ int handle_notification_thread_client_out(
        }
 
        pthread_mutex_lock(&client->lock);
-       transmission_status = client_flush_outgoing_queue(client);
+       if (!client_has_outbound_data_left(client)) {
+               /*
+                * A client "out" event can be received when no payload is left
+                * to send under some circumstances.
+                *
+                * Many threads can flush a client's outgoing queue and, if they
+                * had to queue their message (socket was full), will use the
+                * "communication update" command to signal the (e)poll thread
+                * to monitor for space being made available in the socket.
+                *
+                * Commands are sent over an internal pipe serviced by the same
+                * thread as the client sockets.
+                *
+                * When space is made available in the socket, there is a race
+                * between the (e)poll thread and the other threads that may
+                * wish to use the client's socket to flush its outgoing queue.
+                *
+                * A non-(e)poll thread may attempt (and succeed) in flushing
+                * the queue before the (e)poll thread gets a chance to service
+                * the client's "out" event.
+                *
+                * In this situation, the (e)poll thread processing the client
+                * out event will see an empty payload: there is nothing to do
+                * except unsubscribing (e)poll "out" events.
+                *
+                * Note that this thread is the (e)poll thread so it can modify
+                * the (e)poll mask directly without using a communication
+                * update command. Other threads that flush the outgoing queue
+                * will use the "communication update" command to wake up this
+                * thread and force it to monitor "out" events.
+                *
+                * When other threads succeed in emptying the outgoing queue,
+                * they don't need to update the (e)poll mask: if the "out"
+                * event is monitored, it will fire once and the (e)poll
+                * thread will reach this condition, causing the event to
+                * stop being monitored.
+                */
+               transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+       } else {
+               transmission_status = client_flush_outgoing_queue(client);
+       }
        pthread_mutex_unlock(&client->lock);
 
        ret = client_handle_transmission_status(
@@ -4414,6 +4516,15 @@ int notification_client_list_send_evaluation(
                        goto skip_client;
                }
 
+               if (lttng_trigger_is_hidden(trigger) && !client->is_sessiond) {
+                       /*
+                        * Notifications resulting from an hidden trigger are
+                        * only sent to the session daemon.
+                        */
+                       DBG("Skipping client as the trigger is hidden and the client is not the session daemon");
+                       goto skip_client;
+               }
+
                if (source_object_creds) {
                        if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
                                        client->gid != lttng_credentials_get_gid(source_object_creds) &&
This page took 0.026788 seconds and 4 git commands to generate.