Fix: sessiond: size-based notification occasionally not triggered
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
index aa6ba07800d6dac81340405c6023e9d9d4dc3fa1..815b4e95f1a5644d78fd18282c4f8cd0b52e6887 100644 (file)
@@ -7,42 +7,51 @@
  */
 
 #define _LGPL_SOURCE
-#include <lttng/trigger/trigger.h>
-#include <common/error.hpp>
+#include "cmd.hpp"
+#include "health-sessiond.hpp"
+#include "lttng-sessiond.hpp"
+#include "notification-thread-commands.hpp"
+#include "rotate.hpp"
+#include "rotation-thread.hpp"
+#include "session.hpp"
+#include "thread.hpp"
+#include "timer.hpp"
+#include "utils.hpp"
+
+#include <common/align.hpp>
 #include <common/config/session-config.hpp>
 #include <common/defaults.hpp>
-#include <common/utils.hpp>
+#include <common/error.hpp>
 #include <common/futex.hpp>
-#include <common/align.hpp>
-#include <common/time.hpp>
 #include <common/hashtable/utils.hpp>
-#include <sys/stat.h>
-#include <time.h>
-#include <signal.h>
-#include <inttypes.h>
-
 #include <common/kernel-ctl/kernel-ctl.hpp>
-#include <lttng/notification/channel-internal.hpp>
-#include <lttng/rotate-internal.hpp>
-#include <lttng/location-internal.hpp>
+#include <common/time.hpp>
+#include <common/urcu.hpp>
+#include <common/utils.hpp>
+
 #include <lttng/condition/condition-internal.hpp>
+#include <lttng/location-internal.hpp>
+#include <lttng/notification/channel-internal.hpp>
 #include <lttng/notification/notification-internal.hpp>
+#include <lttng/rotate-internal.hpp>
+#include <lttng/trigger/trigger.h>
 
-#include "rotation-thread.hpp"
-#include "lttng-sessiond.hpp"
-#include "health-sessiond.hpp"
-#include "rotate.hpp"
-#include "cmd.hpp"
-#include "session.hpp"
-#include "timer.hpp"
-#include "notification-thread-commands.hpp"
-#include "utils.hpp"
-#include "thread.hpp"
-
+#include <inttypes.h>
+#include <signal.h>
+#include <sys/eventfd.h>
+#include <sys/stat.h>
+#include <time.h>
 #include <urcu.h>
 #include <urcu/list.h>
 
-struct lttng_notification_channel *rotate_notification_channel = NULL;
+struct lttng_notification_channel *rotate_notification_channel = nullptr;
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
 
 struct rotation_thread {
        struct lttng_poll_event events;
@@ -75,8 +84,7 @@ struct rotation_thread_job {
 };
 } /* namespace */
 
-static
-const char *get_job_type_str(enum rotation_thread_job_type job_type)
+static const char *get_job_type_str(enum rotation_thread_job_type job_type)
 {
        switch (job_type) {
        case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
@@ -88,9 +96,9 @@ const char *get_job_type_str(enum rotation_thread_job_type job_type)
        }
 }
 
-struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
 {
-       struct rotation_thread_timer_queue *queue = NULL;
+       struct rotation_thread_timer_queue *queue = nullptr;
 
        queue = zmalloc<rotation_thread_timer_queue>();
        if (!queue) {
@@ -100,13 +108,12 @@ struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
 
        queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
        CDS_INIT_LIST_HEAD(&queue->list);
-       pthread_mutex_init(&queue->lock, NULL);
+       pthread_mutex_init(&queue->lock, nullptr);
 end:
        return queue;
 }
 
-void rotation_thread_timer_queue_destroy(
-               struct rotation_thread_timer_queue *queue)
+void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
 {
        if (!queue) {
                return;
@@ -124,16 +131,15 @@ void rotation_thread_timer_queue_destroy(
 /*
  * Destroy the thread data previously created by the init function.
  */
-void rotation_thread_handle_destroy(
-               struct rotation_thread_handle *handle)
+void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
 {
        lttng_pipe_destroy(handle->quit_pipe);
        free(handle);
 }
 
-struct rotation_thread_handle *rotation_thread_handle_create(
-               struct rotation_thread_timer_queue *rotation_timer_queue,
-               struct notification_thread_handle *notification_thread_handle)
+struct rotation_thread_handle *
+rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
+                             struct notification_thread_handle *notification_thread_handle)
 {
        struct rotation_thread_handle *handle;
 
@@ -153,22 +159,21 @@ end:
        return handle;
 error:
        rotation_thread_handle_destroy(handle);
-       return NULL;
+       return nullptr;
 }
 
 /*
  * Called with the rotation_thread_timer_queue lock held.
  * Return true if the same timer job already exists in the queue, false if not.
  */
-static
-bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
-               enum rotation_thread_job_type job_type,
-               struct ltt_session *session)
+static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
+                            enum rotation_thread_job_type job_type,
+                            struct ltt_session *session)
 {
        bool exists = false;
        struct rotation_thread_job *job;
 
-       cds_list_for_each_entry(job, &queue->list, head) {
+       cds_list_for_each_entry (job, &queue->list, head) {
                if (job->session == session && job->type == job_type) {
                        exists = true;
                        goto end;
@@ -179,12 +184,12 @@ end:
 }
 
 void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
-               enum rotation_thread_job_type job_type,
-               struct ltt_session *session)
+                                enum rotation_thread_job_type job_type,
+                                struct ltt_session *session)
 {
        int ret;
        const char dummy = '!';
-       struct rotation_thread_job *job = NULL;
+       struct rotation_thread_job *job = nullptr;
        const char *job_type_str = get_job_type_str(job_type);
 
        pthread_mutex_lock(&queue->lock);
@@ -199,7 +204,8 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
        job = zmalloc<rotation_thread_job>();
        if (!job) {
                PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
-                               job_type_str, session->name);
+                      job_type_str,
+                      session->name);
                goto end;
        }
        /* No reason for this to fail as the caller must hold a reference. */
@@ -209,8 +215,7 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
        job->type = job_type;
        cds_list_add_tail(&job->head, &queue->list);
 
-       ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy,
-                       sizeof(dummy));
+       ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
        if (ret < 0) {
                /*
                 * We do not want to block in the timer handler, the job has
@@ -221,7 +226,7 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
                DIAGNOSTIC_PUSH
                DIAGNOSTIC_IGNORE_LOGICAL_OP
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
-               DIAGNOSTIC_POP
+                       DIAGNOSTIC_POP
                        /*
                         * Not an error, but would be surprising and indicate
                         * that the rotation thread can't keep up with the
@@ -231,7 +236,8 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
                        goto end;
                }
                PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
-                               job_type_str, session->name);
+                      job_type_str,
+                      session->name);
                goto end;
        }
 
@@ -239,9 +245,7 @@ end:
        pthread_mutex_unlock(&queue->lock);
 }
 
-static
-int init_poll_set(struct lttng_poll_event *poll_set,
-               struct rotation_thread_handle *handle)
+static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
 {
        int ret;
 
@@ -256,16 +260,14 @@ int init_poll_set(struct lttng_poll_event *poll_set,
                goto error;
        }
 
-       ret = lttng_poll_add(poll_set,
-                       lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
+       ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
        if (ret < 0) {
                ERR("Failed to add quit pipe read fd to poll set");
                goto error;
        }
 
-       ret = lttng_poll_add(poll_set,
-                       lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
-                       LPOLLIN);
+       ret = lttng_poll_add(
+               poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
        if (ret < 0) {
                ERR("Failed to add rotate_pending fd to poll set");
                goto error;
@@ -277,18 +279,23 @@ error:
        return ret;
 }
 
-static
-void fini_thread_state(struct rotation_thread *state)
+static void fini_thread_state(struct rotation_thread *state)
 {
        lttng_poll_clean(&state->events);
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
+
+       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+               if (close_ret) {
+                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+               }
+       }
 }
 
-static
-int init_thread_state(struct rotation_thread_handle *handle,
-               struct rotation_thread *state)
+static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
 {
        int ret;
 
@@ -301,27 +308,39 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
-       rotate_notification_channel = lttng_notification_channel_create(
-                       lttng_session_daemon_notification_endpoint);
+       rotate_notification_channel =
+               lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
        if (!rotate_notification_channel) {
                ERR("Could not create notification channel");
                ret = -1;
                goto end;
        }
-       ret = lttng_poll_add(&state->events, rotate_notification_channel->socket,
-                       LPOLLIN);
+       ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
        if (ret < 0) {
                ERR("Failed to add notification fd to pollset");
                goto end;
        }
 
+       rotate_notification_channel_subscription_change_eventfd =
+               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+       if (rotate_notification_channel_subscription_change_eventfd < 0) {
+               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+               ret = -1;
+               goto end;
+       }
+       ret = lttng_poll_add(
+               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+       if (ret < 0) {
+               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+               goto end;
+       }
+
 end:
        return ret;
 }
 
-static
-void check_session_rotation_pending_on_consumers(struct ltt_session *session,
-               bool *_rotation_completed)
+static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
+                                                       bool *_rotation_completed)
 {
        int ret = 0;
        struct consumer_socket *socket;
@@ -330,6 +349,7 @@ void check_session_rotation_pending_on_consumers(struct ltt_session *session,
        uint64_t relayd_id;
        bool chunk_exists_on_peer = false;
        enum lttng_trace_chunk_status chunk_status;
+       lttng::urcu::read_lock_guard read_lock;
 
        LTTNG_ASSERT(session->chunk_being_archived);
 
@@ -337,21 +357,22 @@ void check_session_rotation_pending_on_consumers(struct ltt_session *session,
         * Check for a local pending rotation on all consumers (32-bit
         * user space, 64-bit user space, and kernel).
         */
-       rcu_read_lock();
        if (!session->ust_session) {
                goto skip_ust;
        }
-       cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
-                       &iter, socket, node.node) {
+
+       cds_lfht_for_each_entry (
+               session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
                relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
-                               -1ULL :
-                               session->ust_session->consumer->net_seq_index;
+                       -1ULL :
+                       session->ust_session->consumer->net_seq_index;
 
                pthread_mutex_lock(socket->lock);
                ret = consumer_trace_chunk_exists(socket,
-                               relayd_id,
-                               session->id, session->chunk_being_archived,
-                               &exists_status);
+                                                 relayd_id,
+                                                 session->id,
+                                                 session->chunk_being_archived,
+                                                 &exists_status);
                if (ret) {
                        pthread_mutex_unlock(socket->lock);
                        ERR("Error occurred while checking rotation status on consumer daemon");
@@ -370,17 +391,18 @@ skip_ust:
        if (!session->kernel_session) {
                goto skip_kernel;
        }
-       cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
-                               &iter, socket, node.node) {
+       cds_lfht_for_each_entry (
+               session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
                pthread_mutex_lock(socket->lock);
                relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
-                               -1ULL :
-                               session->kernel_session->consumer->net_seq_index;
+                       -1ULL :
+                       session->kernel_session->consumer->net_seq_index;
 
                ret = consumer_trace_chunk_exists(socket,
-                               relayd_id,
-                               session->id, session->chunk_being_archived,
-                               &exists_status);
+                                                 relayd_id,
+                                                 session->id,
+                                                 session->chunk_being_archived,
+                                                 &exists_status);
                if (ret) {
                        pthread_mutex_unlock(socket->lock);
                        ERR("Error occurred while checking rotation status on consumer daemon");
@@ -396,26 +418,23 @@ skip_ust:
        }
 skip_kernel:
 end:
-       rcu_read_unlock();
 
        if (!chunk_exists_on_peer) {
                uint64_t chunk_being_archived_id;
 
-               chunk_status = lttng_trace_chunk_get_id(
-                               session->chunk_being_archived,
-                               &chunk_being_archived_id);
+               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+                                                       &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-               DBG("Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
-                               chunk_being_archived_id,
-                               session->name);
+               DBG("Rotation of trace archive %" PRIu64
+                   " of session \"%s\" is complete on all consumers",
+                   chunk_being_archived_id,
+                   session->name);
        }
        *_rotation_completed = !chunk_exists_on_peer;
        if (ret) {
-               ret = session_reset_rotation_state(session,
-                               LTTNG_ROTATION_STATE_ERROR);
+               ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
                if (ret) {
-                       ERR("Failed to reset rotation state of session \"%s\"",
-                                       session->name);
+                       ERR("Failed to reset rotation state of session \"%s\"", session->name);
                }
        }
 }
@@ -425,9 +444,9 @@ end:
  * Should only return non-zero in the event of a fatal error. Doing so will
  * shutdown the thread.
  */
-static
-int check_session_rotation_pending(struct ltt_session *session,
-               struct notification_thread_handle *notification_thread_handle)
+static int
+check_session_rotation_pending(struct ltt_session *session,
+                              struct notification_thread_handle *notification_thread_handle)
 {
        int ret;
        struct lttng_trace_archive_location *location;
@@ -441,12 +460,13 @@ int check_session_rotation_pending(struct ltt_session *session,
                goto end;
        }
 
-       chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
-                       &chunk_being_archived_id);
+       chunk_status =
+               lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
        DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
-                       session->name, chunk_being_archived_id);
+           session->name,
+           chunk_being_archived_id);
 
        /*
         * The rotation-pending check timer of a session is launched in
@@ -461,10 +481,8 @@ int check_session_rotation_pending(struct ltt_session *session,
                goto check_ongoing_rotation;
        }
 
-       check_session_rotation_pending_on_consumers(session,
-                       &rotation_completed);
-       if (!rotation_completed ||
-                       session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+       check_session_rotation_pending_on_consumers(session, &rotation_completed);
+       if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
                goto check_ongoing_rotation;
        }
 
@@ -472,8 +490,8 @@ int check_session_rotation_pending(struct ltt_session *session,
         * Now we can clear the "ONGOING" state in the session. New
         * rotations can start now.
         */
-       chunk_status = lttng_trace_chunk_get_name(session->chunk_being_archived,
-                       &archived_chunk_name, NULL);
+       chunk_status = lttng_trace_chunk_get_name(
+               session->chunk_being_archived, &archived_chunk_name, nullptr);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
        free(session->last_archived_chunk_name);
        session->last_archived_chunk_name = strdup(archived_chunk_name);
@@ -485,29 +503,29 @@ int check_session_rotation_pending(struct ltt_session *session,
        if (!session->quiet_rotation) {
                location = session_get_trace_archive_location(session);
                ret = notification_thread_command_session_rotation_completed(
-                               notification_thread_handle,
-                               session->id,
-                               session->last_archived_chunk_id.value,
-                               location);
+                       notification_thread_handle,
+                       session->id,
+                       session->last_archived_chunk_id.value,
+                       location);
                lttng_trace_archive_location_put(location);
                if (ret != LTTNG_OK) {
                        ERR("Failed to notify notification thread of completed rotation for session %s",
-                                       session->name);
+                           session->name);
                }
        }
 
        ret = 0;
 check_ongoing_rotation:
        if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
-               chunk_status = lttng_trace_chunk_get_id(
-                               session->chunk_being_archived,
-                               &chunk_being_archived_id);
+               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+                                                       &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
                DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
-                               chunk_being_archived_id, session->name);
+                   chunk_being_archived_id,
+                   session->name);
                ret = timer_session_rotation_pending_check_start(session,
-                               DEFAULT_ROTATE_PENDING_TIMER);
+                                                                DEFAULT_ROTATE_PENDING_TIMER);
                if (ret) {
                        ERR("Failed to re-enable rotation pending timer");
                        ret = -1;
@@ -520,31 +538,30 @@ end:
 }
 
 /* Call with the session and session_list locks held. */
-static
-int launch_session_rotation(struct ltt_session *session)
+static int launch_session_rotation(struct ltt_session *session)
 {
        int ret;
        struct lttng_rotate_session_return rotation_return;
 
-       DBG("Launching scheduled time-based rotation on session \"%s\"",
-                       session->name);
+       DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
 
-       ret = cmd_rotate_session(session, &rotation_return, false,
-               LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+       ret = cmd_rotate_session(
+               session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        if (ret == LTTNG_OK) {
                DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
-                               session->name);
+                   session->name);
        } else {
                /* Don't consider errors as fatal. */
                DBG("Scheduled time-based rotation aborted for session %s: %s",
-                               session->name, lttng_strerror(ret));
+                   session->name,
+                   lttng_strerror(ret));
        }
        return 0;
 }
 
-static
-int run_job(struct rotation_thread_job *job, struct ltt_session *session,
-               struct notification_thread_handle *notification_thread_handle)
+static int run_job(struct rotation_thread_job *job,
+                  struct ltt_session *session,
+                  struct notification_thread_handle *notification_thread_handle)
 {
        int ret;
 
@@ -553,8 +570,7 @@ int run_job(struct rotation_thread_job *job, struct ltt_session *session,
                ret = launch_session_rotation(session);
                break;
        case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
-               ret = check_session_rotation_pending(session,
-                               notification_thread_handle);
+               ret = check_session_rotation_pending(session, notification_thread_handle);
                break;
        default:
                abort();
@@ -562,10 +578,9 @@ int run_job(struct rotation_thread_job *job, struct ltt_session *session,
        return ret;
 }
 
-static
-int handle_job_queue(struct rotation_thread_handle *handle,
-               struct rotation_thread *state __attribute__((unused)),
-               struct rotation_thread_timer_queue *queue)
+static int handle_job_queue(struct rotation_thread_handle *handle,
+                           struct rotation_thread *state __attribute__((unused)),
+                           struct rotation_thread_timer_queue *queue)
 {
        int ret = 0;
 
@@ -579,16 +594,14 @@ int handle_job_queue(struct rotation_thread_handle *handle,
                        pthread_mutex_unlock(&queue->lock);
                        break;
                }
-               job = cds_list_first_entry(&queue->list,
-                               typeof(*job), head);
+               job = cds_list_first_entry(&queue->list, typeof(*job), head);
                cds_list_del(&job->head);
                pthread_mutex_unlock(&queue->lock);
 
                session_lock_list();
                session = job->session;
                if (!session) {
-                       DBG("Session \"%s\" not found",
-                                       session->name != NULL ? session->name : "");
+                       DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
                        /*
                         * This is a non-fatal error, and we cannot report it to
                         * the user (timer), so just print the error and
@@ -623,21 +636,20 @@ end:
        return ret;
 }
 
-static
-int handle_condition(const struct lttng_notification *notification,
-               struct notification_thread_handle *notification_thread_handle)
+static int handle_condition(const struct lttng_notification *notification,
+                           struct notification_thread_handle *notification_thread_handle)
 {
        int ret = 0;
-       const char *condition_session_name = NULL;
+       const char *condition_session_name = nullptr;
        enum lttng_condition_type condition_type;
        enum lttng_condition_status condition_status;
        enum lttng_evaluation_status evaluation_status;
        uint64_t consumed;
        struct ltt_session *session;
        const struct lttng_condition *condition =
-                       lttng_notification_get_const_condition(notification);
+               lttng_notification_get_const_condition(notification);
        const struct lttng_evaluation *evaluation =
-                       lttng_notification_get_const_evaluation(notification);
+               lttng_notification_get_const_evaluation(notification);
 
        condition_type = lttng_condition_get_type(condition);
 
@@ -649,14 +661,14 @@ int handle_condition(const struct lttng_notification *notification,
 
        /* Fetch info to test */
        condition_status = lttng_condition_session_consumed_size_get_session_name(
-                       condition, &condition_session_name);
+               condition, &condition_session_name);
        if (condition_status != LTTNG_CONDITION_STATUS_OK) {
                ERR("Session name could not be fetched");
                ret = -1;
                goto end;
        }
-       evaluation_status = lttng_evaluation_session_consumed_size_get_consumed_size(evaluation,
-                       &consumed);
+       evaluation_status =
+               lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
        if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
                ERR("Failed to get evaluation");
                ret = -1;
@@ -667,8 +679,8 @@ int handle_condition(const struct lttng_notification *notification,
        session = session_find_by_name(condition_session_name);
        if (!session) {
                DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
-                               lttng_condition_type_str(condition_type),
-                               condition_session_name);
+                   lttng_condition_type_str(condition_type),
+                   condition_session_name);
                /*
                 * Not a fatal error: a session can be destroyed before we get
                 * the chance to handle the notification.
@@ -680,20 +692,19 @@ int handle_condition(const struct lttng_notification *notification,
        session_lock(session);
 
        if (!lttng_trigger_is_equal(session->rotate_trigger,
-                       lttng_notification_get_const_trigger(notification))) {
+                                   lttng_notification_get_const_trigger(notification))) {
                /* Notification does not originate from our rotation trigger. */
                ret = 0;
                goto end_unlock;
        }
 
-       ret = unsubscribe_session_consumed_size_rotation(session,
-                       notification_thread_handle);
+       ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
        if (ret) {
                goto end_unlock;
        }
 
        ret = cmd_rotate_session(
-                       session, NULL, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        switch (ret) {
        case LTTNG_OK:
                break;
@@ -713,7 +724,7 @@ int handle_condition(const struct lttng_notification *notification,
        }
 
        ret = subscribe_session_consumed_size_rotation(
-                       session, consumed + session->rotate_size, notification_thread_handle);
+               session, consumed + session->rotate_size, notification_thread_handle);
        if (ret) {
                ERR("Failed to subscribe to session consumed size condition");
                goto end_unlock;
@@ -728,62 +739,77 @@ end:
        return ret;
 }
 
-static
-int handle_notification_channel(int fd __attribute__((unused)),
-               struct rotation_thread_handle *handle,
-               struct rotation_thread *state __attribute__((unused)))
+static int handle_notification_channel(int fd __attribute__((unused)),
+                                      struct rotation_thread_handle *handle,
+                                      struct rotation_thread *state __attribute__((unused)))
 {
        int ret;
-       bool notification_pending;
-       struct lttng_notification *notification = NULL;
+       bool notification_pending = true;
+       struct lttng_notification *notification = nullptr;
        enum lttng_notification_channel_status status;
 
-       status = lttng_notification_channel_has_pending_notification(
+       /*
+        * A notification channel may have multiple notifications queued-up internally in
+        * its buffers. This is because a notification channel multiplexes command replies
+        * and notifications. The current protocol specifies that multiple notifications can be
+        * received before the reply to a command.
+        *
+        * In such cases, the notification channel client implementation internally queues them and
+        * provides them on the next calls to lttng_notification_channel_get_next_notification().
+        * This is correct with respect to the public API, which is intended to be used in "blocking
+        * mode".
+        *
+        * However, this internal user relies on poll/epoll to wake-up when data is available
+        * on the notification channel's socket. As such, it can't assume that a wake-up means only
+        * one notification is available for consumption since many of them may have been queued in
+        * the channel's internal buffers.
+        */
+       while (notification_pending) {
+               status = lttng_notification_channel_has_pending_notification(
                        rotate_notification_channel, &notification_pending);
-       if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Error occurred while checking for pending notification");
-               ret = -1;
-               goto end;
-       }
+               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       ERR("Error occurred while checking for pending notification");
+                       ret = -1;
+                       goto end;
+               }
 
-       if (!notification_pending) {
-               ret = 0;
-               goto end;
-       }
+               if (!notification_pending) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Receive the next notification. */
-       status = lttng_notification_channel_get_next_notification(
-                       rotate_notification_channel,
-                       &notification);
+               /* Receive the next notification. */
+               status = lttng_notification_channel_get_next_notification(
+                       rotate_notification_channel, &notification);
+               switch (status) {
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+                       break;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+                       WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+                       ret = 0;
+                       goto end;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+                       ERR("Notification channel was closed");
+                       ret = -1;
+                       goto end;
+               default:
+                       /* Unhandled conditions / errors. */
+                       ERR("Unknown notification channel status");
+                       ret = -1;
+                       goto end;
+               }
 
-       switch (status) {
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
-               break;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
-               /* Not an error, we will wait for the next one */
-               ret = 0;
-               goto end;;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-               ERR("Notification channel was closed");
-               ret = -1;
-               goto end;
-       default:
-               /* Unhandled conditions / errors. */
-               ERR("Unknown notification channel status");
-               ret = -1;
-               goto end;
+               ret = handle_condition(notification, handle->notification_thread_handle);
+               lttng_notification_destroy(notification);
+               if (ret) {
+                       goto end;
+               }
        }
-
-       ret = handle_condition(notification,
-                       handle->notification_thread_handle);
-
 end:
-       lttng_notification_destroy(notification);
        return ret;
 }
 
-static
-void *thread_rotation(void *data)
+static void *thread_rotation(void *data)
 {
        int ret;
        struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
@@ -801,9 +827,7 @@ void *thread_rotation(void *data)
                goto end;
        }
 
-       queue_pipe_fd = lttng_pipe_get_readfd(
-                       handle->rotation_timer_queue->event_pipe);
-
+       queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
 
        ret = init_thread_state(handle, &thread);
        if (ret) {
@@ -834,21 +858,30 @@ void *thread_rotation(void *data)
                        int fd = LTTNG_POLL_GETFD(&thread.events, i);
                        uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
 
-                       DBG("Handling fd (%i) activity (%u)",
-                                       fd, revents);
+                       DBG("Handling fd (%i) activity (%u)", fd, revents);
 
                        if (revents & LPOLLERR) {
                                ERR("Polling returned an error on fd %i", fd);
                                goto error;
                        }
 
-                       if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, handle,
-                                               &thread);
+                       if (fd == rotate_notification_channel->socket ||
+                           fd == rotate_notification_channel_subscription_change_eventfd) {
+                               ret = handle_notification_channel(fd, handle, &thread);
                                if (ret) {
                                        ERR("Error occurred while handling activity on notification channel socket");
                                        goto error;
                                }
+
+                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
+                                       uint64_t eventfd_value;
+                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+                                       if (read_ret != sizeof(eventfd_value)) {
+                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+                                               goto error;
+                                       }
+                               }
                        } else {
                                /* Job queue or quit pipe activity. */
 
@@ -858,8 +891,8 @@ void *thread_rotation(void *data)
                                 * flushed and all references held in the queue
                                 * are released.
                                 */
-                               ret = handle_job_queue(handle, &thread,
-                                               handle->rotation_timer_queue);
+                               ret = handle_job_queue(
+                                       handle, &thread, handle->rotation_timer_queue);
                                if (ret) {
                                        ERR("Failed to handle rotation timer pipe event");
                                        goto error;
@@ -870,7 +903,8 @@ void *thread_rotation(void *data)
 
                                        ret = lttng_read(fd, &buf, 1);
                                        if (ret != 1) {
-                                               ERR("Failed to read from wakeup pipe (fd = %i)", fd);
+                                               ERR("Failed to read from wakeup pipe (fd = %i)",
+                                                   fd);
                                                goto error;
                                        }
                                } else {
@@ -888,11 +922,10 @@ end:
        health_unregister(the_health_sessiond);
        rcu_thread_offline();
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
-static
-bool shutdown_rotation_thread(void *thread_data)
+static bool shutdown_rotation_thread(void *thread_data)
 {
        struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
        const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
@@ -904,11 +937,8 @@ bool launch_rotation_thread(struct rotation_thread_handle *handle)
 {
        struct lttng_thread *thread;
 
-       thread = lttng_thread_create("Rotation",
-                       thread_rotation,
-                       shutdown_rotation_thread,
-                       NULL,
-                       handle);
+       thread = lttng_thread_create(
+               "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
        if (!thread) {
                goto error;
        }
This page took 0.036283 seconds and 4 git commands to generate.