Fix: consumerd: slow metadata push slows down application registration
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
index 747740233094477c00fb59065cce2dd3f9861013..eba9947fb7409ecebee8fc081755226a1f67d680 100644 (file)
@@ -48,8 +48,8 @@
 #include "lttng-sessiond.h"
 #include "kernel.h"
 
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
 
 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
 #define MAX_CAPTURE_SIZE (PIPE_BUF)
@@ -2039,7 +2039,7 @@ int handle_notification_thread_command_add_tracer_event_source(
                        lttng_domain_type_str(domain_type));
 
        /* Adding the read side pipe to the event poll. */
-       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
        if (ret < 0) {
                ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
                                tracer_event_source_fd,
@@ -2153,6 +2153,12 @@ int remove_tracer_event_source_from_pollset(
 
        source_element->is_fd_in_poll_set = false;
 
+       /*
+        * Force the notification thread to restart the poll() loop to ensure
+        * that any events from the removed fd are removed.
+        */
+       state->restart_poll = true;
+
        ret = drain_event_notifier_notification_pipe(state, source_element->fd,
                        source_element->domain);
        if (ret) {
@@ -2803,6 +2809,7 @@ int handle_notification_thread_command_register_trigger(
                        client_list = notification_client_list_create(state, condition);
                        if (!client_list) {
                                ERR("Error creating notification client list for trigger %s", trigger->name);
+                               ret = -1;
                                goto error_free_ht_element;
                        }
                }
@@ -3103,28 +3110,55 @@ end:
        return 0;
 }
 
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+               struct notification_thread_command **cmd)
+{
+       int ret;
+       uint64_t counter;
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+       if (ret != sizeof(counter)) {
+               ret = -1;
+               goto error_unlock;
+       }
+
+       /* Simulate behaviour of EFD_SEMAPHORE for older kernels. */
+       counter -= 1;
+       if (counter != 0) {
+               ret = lttng_write(handle->cmd_queue.event_fd, &counter,
+                               sizeof(counter));
+               if (ret != sizeof(counter)) {
+                       PERROR("Failed to write back to event_fd for EFD_SEMAPHORE emulation");
+                       ret = -1;
+                       goto error_unlock;
+               }
+       }
+
+       *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+                       struct notification_thread_command, cmd_list_node);
+       cds_list_del(&((*cmd)->cmd_list_node));
+       ret = 0;
+
+error_unlock:
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       return ret;
+}
+
 /* Returns 0 on success, 1 on exit requested, negative value on error. */
 int handle_notification_thread_command(
                struct notification_thread_handle *handle,
                struct notification_thread_state *state)
 {
        int ret;
-       uint64_t counter;
        struct notification_thread_command *cmd;
 
-       /* Read the event pipe to put it back into a quiescent state. */
-       ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
-                       sizeof(counter));
-       if (ret != sizeof(counter)) {
+       ret = pop_cmd_queue(handle, &cmd);
+       if (ret) {
                goto error;
        }
 
-       pthread_mutex_lock(&handle->cmd_queue.lock);
-       cmd = cds_list_first_entry(&handle->cmd_queue.list,
-                       struct notification_thread_command, cmd_list_node);
-       cds_list_del(&cmd->cmd_list_node);
-       pthread_mutex_unlock(&handle->cmd_queue.lock);
-
        DBG("Received `%s` command",
                        notification_command_type_str(cmd->type));
        switch (cmd->type) {
@@ -3252,12 +3286,12 @@ end:
                free(cmd);
                cmd = NULL;
        } else {
-               lttng_waiter_wake_up(&cmd->reply_waiter);
+               lttng_waiter_wake(&cmd->reply_waiter);
        }
        return ret;
 error_unlock:
        /* Wake-up and return a fatal error to the calling thread. */
-       lttng_waiter_wake_up(&cmd->reply_waiter);
+       lttng_waiter_wake(&cmd->reply_waiter);
        cmd->reply_code = LTTNG_ERR_FATAL;
 error:
        /* Indicate a fatal error to the caller. */
@@ -3359,9 +3393,9 @@ int handle_notification_thread_client_connect(
                goto error;
        }
 
+       client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
        ret = lttng_poll_add(&state->events, client->socket,
-                       LPOLLIN | LPOLLERR |
-                       LPOLLHUP | LPOLLRDHUP);
+                       client->communication.current_poll_events);
        if (ret < 0) {
                ERR("Failed to add notification channel client socket to poll set");
                ret = 0;
@@ -3493,6 +3527,18 @@ int handle_notification_thread_trigger_unregister_all(
        return error_occurred ? -1 : 0;
 }
 
+static
+bool client_has_outbound_data_left(
+               const struct notification_client *client)
+{
+       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+                       &client->communication.outbound.payload, 0, -1);
+       const bool has_data = pv.buffer.size != 0;
+       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+       return has_data || has_fds;
+}
+
 static
 int client_handle_transmission_status(
                struct notification_client *client,
@@ -3503,24 +3549,51 @@ int client_handle_transmission_status(
 
        switch (transmission_status) {
        case CLIENT_TRANSMISSION_STATUS_COMPLETE:
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN);
-               if (ret) {
-                       goto end;
-               }
-
-               break;
        case CLIENT_TRANSMISSION_STATUS_QUEUED:
+       {
+               int current_poll_events;
+               int new_poll_events;
                /*
                 * We want to be notified whenever there is buffer space
-                * available to send the rest of the payload.
+                * available to send the rest of the payload if we are
+                * waiting to send data to the client.
+                *
+                * The state of the outbound queue being sampled here is
+                * fine since:
+                *   - it is okay to wake-up "for nothing" in case we see
+                *     that data is left, but another thread succeeds in
+                *     flushing it before us when handling the client "out"
+                *     event. We will simply stop monitoring that event the next
+                *     time it wakes us up and we see no data left to be sent,
+                *   - if another thread fails to flush the entire client
+                *     outgoing queue, it will issue a "communication update"
+                *     command and cause the client's (e)poll mask to be
+                *     re-evaluated.
+                *
+                * The situation we seek to avoid would be to disable the
+                * monitoring of "out" client events indefinitely when there is
+                * data to be sent, which can't happen because of the
+                * aforementioned "communication update" mechanism.
                 */
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN_OUT);
-               if (ret) {
-                       goto end;
+               pthread_mutex_lock(&client->lock);
+               current_poll_events = client->communication.current_poll_events;
+               new_poll_events = client_has_outbound_data_left(client) ?
+                               CLIENT_POLL_EVENTS_IN_OUT :
+                                     CLIENT_POLL_EVENTS_IN;
+               client->communication.current_poll_events = new_poll_events;
+               pthread_mutex_unlock(&client->lock);
+
+               /* Update the monitored event set only if it changed. */
+               if (current_poll_events != new_poll_events) {
+                       ret = lttng_poll_mod(&state->events, client->socket,
+                                       new_poll_events);
+                       if (ret) {
+                               goto end;
+                       }
                }
+
                break;
+       }
        case CLIENT_TRANSMISSION_STATUS_FAIL:
                ret = notification_thread_client_disconnect(client, state);
                if (ret) {
@@ -3660,18 +3733,6 @@ error:
        return CLIENT_TRANSMISSION_STATUS_ERROR;
 }
 
-static
-bool client_has_outbound_data_left(
-               const struct notification_client *client)
-{
-       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
-                       &client->communication.outbound.payload, 0, -1);
-       const bool has_data = pv.buffer.size != 0;
-       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
-       return has_data || has_fds;
-}
-
 /* Client lock must _not_ be held by the caller. */
 static
 int client_send_command_reply(struct notification_client *client,
@@ -4078,7 +4139,47 @@ int handle_notification_thread_client_out(
        }
 
        pthread_mutex_lock(&client->lock);
-       transmission_status = client_flush_outgoing_queue(client);
+       if (!client_has_outbound_data_left(client)) {
+               /*
+                * A client "out" event can be received when no payload is left
+                * to send under some circumstances.
+                *
+                * Many threads can flush a client's outgoing queue and, if they
+                * had to queue their message (socket was full), will use the
+                * "communication update" command to signal the (e)poll thread
+                * to monitor for space being made available in the socket.
+                *
+                * Commands are sent over an internal pipe serviced by the same
+                * thread as the client sockets.
+                *
+                * When space is made available in the socket, there is a race
+                * between the (e)poll thread and the other threads that may
+                * wish to use the client's socket to flush its outgoing queue.
+                *
+                * A non-(e)poll thread may attempt (and succeed) in flushing
+                * the queue before the (e)poll thread gets a chance to service
+                * the client's "out" event.
+                *
+                * In this situation, the (e)poll thread processing the client
+                * out event will see an empty payload: there is nothing to do
+                * except unsubscribing (e)poll "out" events.
+                *
+                * Note that this thread is the (e)poll thread so it can modify
+                * the (e)poll mask directly without using a communication
+                * update command. Other threads that flush the outgoing queue
+                * will use the "communication update" command to wake up this
+                * thread and force it to monitor "out" events.
+                *
+                * When other threads succeed in emptying the outgoing queue,
+                * they don't need to update the (e)poll mask: if the "out"
+                * event is monitored, it will fire once and the (e)poll
+                * thread will reach this condition, causing the event to
+                * stop being monitored.
+                */
+               transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+       } else {
+               transmission_status = client_flush_outgoing_queue(client);
+       }
        pthread_mutex_unlock(&client->lock);
 
        ret = client_handle_transmission_status(
This page took 0.026323 seconds and 4 git commands to generate.