Cleanup: rotation-thread: enforce conding standard following fix
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
index 815b4e95f1a5644d78fd18282c4f8cd0b52e6887..6f58a178d3abfe9aa34df0b669765341a66d9d10 100644 (file)
@@ -11,7 +11,6 @@
 #include "health-sessiond.hpp"
 #include "lttng-sessiond.hpp"
 #include "notification-thread-commands.hpp"
-#include "rotate.hpp"
 #include "rotation-thread.hpp"
 #include "session.hpp"
 #include "thread.hpp"
 #include <common/config/session-config.hpp>
 #include <common/defaults.hpp>
 #include <common/error.hpp>
+#include <common/eventfd.hpp>
+#include <common/exception.hpp>
+#include <common/file-descriptor.hpp>
+#include <common/format.hpp>
 #include <common/futex.hpp>
 #include <common/hashtable/utils.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/locked-reference.hpp>
+#include <common/make-unique-wrapper.hpp>
+#include <common/pthread-lock.hpp>
+#include <common/scope-exit.hpp>
 #include <common/time.hpp>
 #include <common/urcu.hpp>
 #include <common/utils.hpp>
 
+#include <lttng/action/action-internal.hpp>
 #include <lttng/condition/condition-internal.hpp>
 #include <lttng/location-internal.hpp>
 #include <lttng/notification/channel-internal.hpp>
@@ -37,6 +45,7 @@
 #include <lttng/trigger/trigger.h>
 
 #include <inttypes.h>
+#include <memory>
 #include <signal.h>
 #include <sys/eventfd.h>
 #include <sys/stat.h>
 #include <urcu.h>
 #include <urcu/list.h>
 
-struct lttng_notification_channel *rotate_notification_channel = nullptr;
-/*
- * This eventfd is used to wake-up the rotation thread whenever a command
- * completes on the notification channel. This ensures that any notification
- * that was queued while waiting for a reply to the command is eventually
- * consumed.
- */
-int rotate_notification_channel_subscription_change_eventfd = -1;
-
-struct rotation_thread {
-       struct lttng_poll_event events;
-};
+namespace ls = lttng::sessiond;
 
 /*
  * The timer thread enqueues jobs and wakes up the rotation thread.
  * When the rotation thread wakes up, it empties the queue.
  */
-struct rotation_thread_timer_queue {
+struct ls::rotation_thread_timer_queue {
        struct lttng_pipe *event_pipe;
        struct cds_list_head list;
        pthread_mutex_t lock;
 };
 
-struct rotation_thread_handle {
-       struct rotation_thread_timer_queue *rotation_timer_queue;
-       /* Access to the notification thread cmd_queue */
-       struct notification_thread_handle *notification_thread_handle;
-       /* Thread-specific quit pipe. */
-       struct lttng_pipe *quit_pipe;
-};
-
 namespace {
 struct rotation_thread_job {
-       enum rotation_thread_job_type type;
+       using uptr = std::unique_ptr<
+               rotation_thread_job,
+               lttng::details::create_unique_class<rotation_thread_job, lttng::free>>;
+
+       enum ls::rotation_thread_job_type type;
        struct ltt_session *session;
        /* List member in struct rotation_thread_timer_queue. */
        struct cds_list_head head;
 };
-} /* namespace */
 
-static const char *get_job_type_str(enum rotation_thread_job_type job_type)
+const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
 {
        switch (job_type) {
-       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+       case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
                return "CHECK_PENDING_ROTATION";
-       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+       case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
                return "SCHEDULED_ROTATION";
        default:
                abort();
        }
 }
 
-struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
-{
-       struct rotation_thread_timer_queue *queue = nullptr;
-
-       queue = zmalloc<rotation_thread_timer_queue>();
-       if (!queue) {
-               PERROR("Failed to allocate timer rotate queue");
-               goto end;
-       }
-
-       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
-       CDS_INIT_LIST_HEAD(&queue->list);
-       pthread_mutex_init(&queue->lock, nullptr);
-end:
-       return queue;
-}
-
-void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
-{
-       if (!queue) {
-               return;
-       }
-
-       lttng_pipe_destroy(queue->event_pipe);
-
-       pthread_mutex_lock(&queue->lock);
-       LTTNG_ASSERT(cds_list_empty(&queue->list));
-       pthread_mutex_unlock(&queue->lock);
-       pthread_mutex_destroy(&queue->lock);
-       free(queue);
-}
-
-/*
- * Destroy the thread data previously created by the init function.
- */
-void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
-{
-       lttng_pipe_destroy(handle->quit_pipe);
-       free(handle);
-}
-
-struct rotation_thread_handle *
-rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
-                             struct notification_thread_handle *notification_thread_handle)
-{
-       struct rotation_thread_handle *handle;
-
-       handle = zmalloc<rotation_thread_handle>();
-       if (!handle) {
-               goto end;
-       }
-
-       handle->rotation_timer_queue = rotation_timer_queue;
-       handle->notification_thread_handle = notification_thread_handle;
-       handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
-       if (!handle->quit_pipe) {
-               goto error;
-       }
-
-end:
-       return handle;
-error:
-       rotation_thread_handle_destroy(handle);
-       return nullptr;
-}
-
 /*
  * Called with the rotation_thread_timer_queue lock held.
  * Return true if the same timer job already exists in the queue, false if not.
  */
-static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
-                            enum rotation_thread_job_type job_type,
-                            struct ltt_session *session)
+bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
+                     ls::rotation_thread_job_type job_type,
+                     ltt_session *session)
 {
        bool exists = false;
        struct rotation_thread_job *job;
@@ -183,164 +110,7 @@ end:
        return exists;
 }
 
-void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
-                                enum rotation_thread_job_type job_type,
-                                struct ltt_session *session)
-{
-       int ret;
-       const char dummy = '!';
-       struct rotation_thread_job *job = nullptr;
-       const char *job_type_str = get_job_type_str(job_type);
-
-       pthread_mutex_lock(&queue->lock);
-       if (timer_job_exists(queue, job_type, session)) {
-               /*
-                * This timer job is already pending, we don't need to add
-                * it.
-                */
-               goto end;
-       }
-
-       job = zmalloc<rotation_thread_job>();
-       if (!job) {
-               PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
-                      job_type_str,
-                      session->name);
-               goto end;
-       }
-       /* No reason for this to fail as the caller must hold a reference. */
-       (void) session_get(session);
-
-       job->session = session;
-       job->type = job_type;
-       cds_list_add_tail(&job->head, &queue->list);
-
-       ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
-       if (ret < 0) {
-               /*
-                * We do not want to block in the timer handler, the job has
-                * been enqueued in the list, the wakeup pipe is probably full,
-                * the job will be processed when the rotation_thread catches
-                * up.
-                */
-               DIAGNOSTIC_PUSH
-               DIAGNOSTIC_IGNORE_LOGICAL_OP
-               if (errno == EAGAIN || errno == EWOULDBLOCK) {
-                       DIAGNOSTIC_POP
-                       /*
-                        * Not an error, but would be surprising and indicate
-                        * that the rotation thread can't keep up with the
-                        * current load.
-                        */
-                       DBG("Wake-up pipe of rotation thread job queue is full");
-                       goto end;
-               }
-               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
-                      job_type_str,
-                      session->name);
-               goto end;
-       }
-
-end:
-       pthread_mutex_unlock(&queue->lock);
-}
-
-static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
-{
-       int ret;
-
-       /*
-        * Create pollset with size 3:
-        *      - rotation thread quit pipe,
-        *      - rotation thread timer queue pipe,
-        *      - notification channel sock,
-        */
-       ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
-       if (ret < 0) {
-               goto error;
-       }
-
-       ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add quit pipe read fd to poll set");
-               goto error;
-       }
-
-       ret = lttng_poll_add(
-               poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add rotate_pending fd to poll set");
-               goto error;
-       }
-
-       return ret;
-error:
-       lttng_poll_clean(poll_set);
-       return ret;
-}
-
-static void fini_thread_state(struct rotation_thread *state)
-{
-       lttng_poll_clean(&state->events);
-       if (rotate_notification_channel) {
-               lttng_notification_channel_destroy(rotate_notification_channel);
-       }
-
-       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
-               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
-
-               if (close_ret) {
-                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
-               }
-       }
-}
-
-static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
-{
-       int ret;
-
-       memset(state, 0, sizeof(*state));
-       lttng_poll_init(&state->events);
-
-       ret = init_poll_set(&state->events, handle);
-       if (ret) {
-               ERR("Failed to initialize rotation thread poll set");
-               goto end;
-       }
-
-       rotate_notification_channel =
-               lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
-       if (!rotate_notification_channel) {
-               ERR("Could not create notification channel");
-               ret = -1;
-               goto end;
-       }
-       ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add notification fd to pollset");
-               goto end;
-       }
-
-       rotate_notification_channel_subscription_change_eventfd =
-               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
-       if (rotate_notification_channel_subscription_change_eventfd < 0) {
-               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
-               ret = -1;
-               goto end;
-       }
-       ret = lttng_poll_add(
-               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
-               goto end;
-       }
-
-end:
-       return ret;
-}
-
-static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
-                                                       bool *_rotation_completed)
+void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
 {
        int ret = 0;
        struct consumer_socket *socket;
@@ -351,70 +121,66 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess
        enum lttng_trace_chunk_status chunk_status;
        lttng::urcu::read_lock_guard read_lock;
 
-       LTTNG_ASSERT(session->chunk_being_archived);
+       LTTNG_ASSERT(session.chunk_being_archived);
 
        /*
         * Check for a local pending rotation on all consumers (32-bit
         * user space, 64-bit user space, and kernel).
         */
-       if (!session->ust_session) {
+       if (!session.ust_session) {
                goto skip_ust;
        }
 
        cds_lfht_for_each_entry (
-               session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
-               relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+               session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
+               relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session->ust_session->consumer->net_seq_index;
+                       session.ust_session->consumer->net_seq_index;
 
-               pthread_mutex_lock(socket->lock);
+               lttng::pthread::lock_guard socket_lock(*socket->lock);
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session->id,
-                                                 session->chunk_being_archived,
+                                                 session.id,
+                                                 session.chunk_being_archived,
                                                  &exists_status);
                if (ret) {
-                       pthread_mutex_unlock(socket->lock);
                        ERR("Error occurred while checking rotation status on consumer daemon");
                        goto end;
                }
 
                if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
-                       pthread_mutex_unlock(socket->lock);
                        chunk_exists_on_peer = true;
                        goto end;
                }
-               pthread_mutex_unlock(socket->lock);
        }
 
 skip_ust:
-       if (!session->kernel_session) {
+       if (!session.kernel_session) {
                goto skip_kernel;
        }
+
        cds_lfht_for_each_entry (
-               session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
-               pthread_mutex_lock(socket->lock);
-               relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+               session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
+               lttng::pthread::lock_guard socket_lock(*socket->lock);
+
+               relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session->kernel_session->consumer->net_seq_index;
+                       session.kernel_session->consumer->net_seq_index;
 
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session->id,
-                                                 session->chunk_being_archived,
+                                                 session.id,
+                                                 session.chunk_being_archived,
                                                  &exists_status);
                if (ret) {
-                       pthread_mutex_unlock(socket->lock);
                        ERR("Error occurred while checking rotation status on consumer daemon");
                        goto end;
                }
 
                if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
-                       pthread_mutex_unlock(socket->lock);
                        chunk_exists_on_peer = true;
                        goto end;
                }
-               pthread_mutex_unlock(socket->lock);
        }
 skip_kernel:
 end:
@@ -422,19 +188,20 @@ end:
        if (!chunk_exists_on_peer) {
                uint64_t chunk_being_archived_id;
 
-               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
                DBG("Rotation of trace archive %" PRIu64
                    " of session \"%s\" is complete on all consumers",
                    chunk_being_archived_id,
-                   session->name);
+                   session.name);
        }
-       *_rotation_completed = !chunk_exists_on_peer;
+
+       _rotation_completed = !chunk_exists_on_peer;
        if (ret) {
                ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
                if (ret) {
-                       ERR("Failed to reset rotation state of session \"%s\"", session->name);
+                       ERR("Failed to reset rotation state of session \"%s\"", session.name);
                }
        }
 }
@@ -444,9 +211,8 @@ end:
  * Should only return non-zero in the event of a fatal error. Doing so will
  * shutdown the thread.
  */
-static int
-check_session_rotation_pending(struct ltt_session *session,
-                              struct notification_thread_handle *notification_thread_handle)
+int check_session_rotation_pending(ltt_session& session,
+                                  notification_thread_handle& notification_thread_handle)
 {
        int ret;
        struct lttng_trace_archive_location *location;
@@ -455,17 +221,17 @@ check_session_rotation_pending(struct ltt_session *session,
        const char *archived_chunk_name;
        uint64_t chunk_being_archived_id;
 
-       if (!session->chunk_being_archived) {
+       if (!session.chunk_being_archived) {
                ret = 0;
                goto end;
        }
 
        chunk_status =
-               lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
+               lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
        DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
-           session->name,
+           session.name,
            chunk_being_archived_id);
 
        /*
@@ -481,8 +247,8 @@ check_session_rotation_pending(struct ltt_session *session,
                goto check_ongoing_rotation;
        }
 
-       check_session_rotation_pending_on_consumers(session, &rotation_completed);
-       if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+       check_session_rotation_pending_on_consumers(session, rotation_completed);
+       if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
                goto check_ongoing_rotation;
        }
 
@@ -491,40 +257,41 @@ check_session_rotation_pending(struct ltt_session *session,
         * rotations can start now.
         */
        chunk_status = lttng_trace_chunk_get_name(
-               session->chunk_being_archived, &archived_chunk_name, nullptr);
+               session.chunk_being_archived, &archived_chunk_name, nullptr);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-       free(session->last_archived_chunk_name);
-       session->last_archived_chunk_name = strdup(archived_chunk_name);
-       if (!session->last_archived_chunk_name) {
+       free(session.last_archived_chunk_name);
+       session.last_archived_chunk_name = strdup(archived_chunk_name);
+       if (!session.last_archived_chunk_name) {
                PERROR("Failed to duplicate archived chunk name");
        }
+
        session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
 
-       if (!session->quiet_rotation) {
-               location = session_get_trace_archive_location(session);
+       if (!session.quiet_rotation) {
+               location = session_get_trace_archive_location(&session);
                ret = notification_thread_command_session_rotation_completed(
-                       notification_thread_handle,
-                       session->id,
-                       session->last_archived_chunk_id.value,
+                       &notification_thread_handle,
+                       session.id,
+                       session.last_archived_chunk_id.value,
                        location);
                lttng_trace_archive_location_put(location);
                if (ret != LTTNG_OK) {
                        ERR("Failed to notify notification thread of completed rotation for session %s",
-                           session->name);
+                           session.name);
                }
        }
 
        ret = 0;
 check_ongoing_rotation:
-       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
-               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+       if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
                DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
                    chunk_being_archived_id,
-                   session->name);
-               ret = timer_session_rotation_pending_check_start(session,
+                   session.name);
+               ret = timer_session_rotation_pending_check_start(&session,
                                                                 DEFAULT_ROTATE_PENDING_TIMER);
                if (ret) {
                        ERR("Failed to re-enable rotation pending timer");
@@ -538,173 +305,317 @@ end:
 }
 
 /* Call with the session and session_list locks held. */
-static int launch_session_rotation(struct ltt_session *session)
+int launch_session_rotation(ltt_session& session)
 {
        int ret;
        struct lttng_rotate_session_return rotation_return;
 
-       DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
+       DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
 
-       ret = cmd_rotate_session(
-               session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+       ASSERT_SESSION_LIST_LOCKED();
+       ASSERT_LOCKED(session.lock);
+
+       ret = cmd_rotate_session(&session,
+                                &rotation_return,
+                                false,
+                                LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        if (ret == LTTNG_OK) {
                DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
-                   session->name);
+                   session.name);
        } else {
                /* Don't consider errors as fatal. */
                DBG("Scheduled time-based rotation aborted for session %s: %s",
-                   session->name,
+                   session.name,
                    lttng_strerror(ret));
        }
+
        return 0;
 }
 
-static int run_job(struct rotation_thread_job *job,
-                  struct ltt_session *session,
-                  struct notification_thread_handle *notification_thread_handle)
+int run_job(const rotation_thread_job& job,
+           ltt_session& session,
+           notification_thread_handle& notification_thread_handle)
 {
        int ret;
 
-       switch (job->type) {
-       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+       switch (job.type) {
+       case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
                ret = launch_session_rotation(session);
                break;
-       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+       case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
                ret = check_session_rotation_pending(session, notification_thread_handle);
                break;
        default:
                abort();
        }
+
        return ret;
 }
 
-static int handle_job_queue(struct rotation_thread_handle *handle,
-                           struct rotation_thread *state __attribute__((unused)),
-                           struct rotation_thread_timer_queue *queue)
+bool shutdown_rotation_thread(void *thread_data)
 {
-       int ret = 0;
+       auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
 
-       for (;;) {
-               struct ltt_session *session;
-               struct rotation_thread_job *job;
+       return handle->shutdown();
+}
+} /* namespace */
 
-               /* Take the queue lock only to pop an element from the list. */
-               pthread_mutex_lock(&queue->lock);
-               if (cds_list_empty(&queue->list)) {
-                       pthread_mutex_unlock(&queue->lock);
-                       break;
+ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
+{
+       auto queue = zmalloc<ls::rotation_thread_timer_queue>();
+       if (!queue) {
+               PERROR("Failed to allocate timer rotate queue");
+               goto end;
+       }
+
+       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
+       CDS_INIT_LIST_HEAD(&queue->list);
+       pthread_mutex_init(&queue->lock, nullptr);
+end:
+       return queue;
+}
+
+void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
+{
+       if (!queue) {
+               return;
+       }
+
+       lttng_pipe_destroy(queue->event_pipe);
+
+       {
+               lttng::pthread::lock_guard queue_lock(queue->lock);
+
+               LTTNG_ASSERT(cds_list_empty(&queue->list));
+       }
+
+       pthread_mutex_destroy(&queue->lock);
+       free(queue);
+}
+
+ls::rotation_thread::rotation_thread(
+       rotation_thread_timer_queue& rotation_timer_queue,
+       notification_thread_handle& notification_thread_handle) :
+       _rotation_timer_queue{ rotation_timer_queue },
+       _notification_thread_handle{ notification_thread_handle }
+{
+       _quit_pipe.reset([]() {
+               auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
+               if (!raw_pipe) {
+                       LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
                }
-               job = cds_list_first_entry(&queue->list, typeof(*job), head);
-               cds_list_del(&job->head);
-               pthread_mutex_unlock(&queue->lock);
 
-               session_lock_list();
-               session = job->session;
-               if (!session) {
-                       DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
+               return raw_pipe;
+       }());
+
+       _notification_channel.reset([]() {
+               auto channel = lttng_notification_channel_create(
+                       lttng_session_daemon_notification_endpoint);
+               if (!channel) {
+                       LTTNG_THROW_ERROR(
+                               "Failed to create notification channel of rotation thread");
+               }
+
+               return channel;
+       }());
+
+       lttng_poll_init(&_events);
+
+       /*
+        * Create pollset with size 4:
+        *      - rotation thread quit pipe,
+        *      - rotation thread timer queue pipe,
+        *      - notification channel sock,
+        *      - subscribtion change event fd
+        */
+       if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
+               LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
+       }
+
+       if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
+       }
+
+       if (lttng_poll_add(&_events,
+                          lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
+                          LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
+       }
+
+       if (lttng_poll_add(&_events,
+                          _notification_channel_subscribtion_change_eventfd.fd(),
+                          LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR(
+                       "Failed to add rotation thread notification channel subscription change eventfd to poll set");
+       }
+
+       if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
+       }
+}
+
+ls::rotation_thread::~rotation_thread()
+{
+       lttng_poll_clean(&_events);
+}
+
+void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
+                                ls::rotation_thread_job_type job_type,
+                                ltt_session *session)
+{
+       const char dummy = '!';
+       struct rotation_thread_job *job = nullptr;
+       const char *job_type_str = get_job_type_str(job_type);
+       lttng::pthread::lock_guard queue_lock(queue->lock);
+
+       if (timer_job_exists(queue, job_type, session)) {
+               /*
+                * This timer job is already pending, we don't need to add
+                * it.
+                */
+               return;
+       }
+
+       job = zmalloc<rotation_thread_job>();
+       if (!job) {
+               PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
+                      job_type_str,
+                      session->name);
+               return;
+       }
+
+       /* No reason for this to fail as the caller must hold a reference. */
+       (void) session_get(session);
+
+       job->session = session;
+       job->type = job_type;
+       cds_list_add_tail(&job->head, &queue->list);
+
+       const int write_ret =
+               lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
+       if (write_ret < 0) {
+               /*
+                * We do not want to block in the timer handler, the job has
+                * been enqueued in the list, the wakeup pipe is probably full,
+                * the job will be processed when the rotation_thread catches
+                * up.
+                */
+               DIAGNOSTIC_PUSH
+               DIAGNOSTIC_IGNORE_LOGICAL_OP
+               if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                       DIAGNOSTIC_POP
                        /*
-                        * This is a non-fatal error, and we cannot report it to
-                        * the user (timer), so just print the error and
-                        * continue the processing.
-                        *
-                        * While the timer thread will purge pending signals for
-                        * a session on the session's destruction, it is
-                        * possible for a job targeting that session to have
-                        * already been queued before it was destroyed.
+                        * Not an error, but would be surprising and indicate
+                        * that the rotation thread can't keep up with the
+                        * current load.
                         */
-                       free(job);
-                       session_put(session);
-                       session_unlock_list();
-                       continue;
+                       DBG("Wake-up pipe of rotation thread job queue is full");
+                       return;
                }
 
-               session_lock(session);
-               ret = run_job(job, session, handle->notification_thread_handle);
-               session_unlock(session);
-               /* Release reference held by the job. */
-               session_put(session);
-               session_unlock_list();
-               free(job);
-               if (ret) {
-                       goto end;
-               }
+               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
+                      job_type_str,
+                      session->name);
+               return;
        }
+}
 
-       ret = 0;
+void ls::rotation_thread::_handle_job_queue()
+{
+       for (;;) {
+               rotation_thread_job::uptr job;
+
+               {
+                       /* Take the queue lock only to pop an element from the list. */
+                       lttng::pthread::lock_guard rotation_timer_queue_lock(
+                               _rotation_timer_queue.lock);
+                       if (cds_list_empty(&_rotation_timer_queue.list)) {
+                               break;
+                       }
 
-end:
-       return ret;
+                       job.reset(cds_list_first_entry(
+                               &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
+                       cds_list_del(&job->head);
+               }
+
+               session_lock_list();
+               const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+
+               /* locked_ptr will unlock the session and release the ref held by the job. */
+               session_lock(job->session);
+               auto session = ltt_session::locked_ptr(job->session);
+
+               if (run_job(*job, *session, _notification_thread_handle)) {
+                       return;
+               }
+       }
 }
 
-static int handle_condition(const struct lttng_notification *notification,
-                           struct notification_thread_handle *notification_thread_handle)
+void ls::rotation_thread::_handle_notification(const lttng_notification &notification)
 {
        int ret = 0;
        const char *condition_session_name = nullptr;
-       enum lttng_condition_type condition_type;
        enum lttng_condition_status condition_status;
        enum lttng_evaluation_status evaluation_status;
        uint64_t consumed;
-       struct ltt_session *session;
-       const struct lttng_condition *condition =
-               lttng_notification_get_const_condition(notification);
-       const struct lttng_evaluation *evaluation =
-               lttng_notification_get_const_evaluation(notification);
-
-       condition_type = lttng_condition_get_type(condition);
+       auto *condition = lttng_notification_get_const_condition(&notification);
+       auto *evaluation = lttng_notification_get_const_evaluation(&notification);
+       const auto condition_type = lttng_condition_get_type(condition);
 
        if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
-               ret = -1;
-               ERR("Condition type and session usage type are not the same");
-               goto end;
+               LTTNG_THROW_ERROR("Unexpected condition type");
        }
 
-       /* Fetch info to test */
+       /* Fetch info to test. */
        condition_status = lttng_condition_session_consumed_size_get_session_name(
                condition, &condition_session_name);
        if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-               ERR("Session name could not be fetched");
-               ret = -1;
-               goto end;
+               LTTNG_THROW_ERROR("Session name could not be fetched from notification");
        }
+
        evaluation_status =
                lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
        if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
-               ERR("Failed to get evaluation");
-               ret = -1;
-               goto end;
+               LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
        }
 
+       DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
+               condition_session_name,
+               consumed);
+
        session_lock_list();
-       session = session_find_by_name(condition_session_name);
+       const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+
+       ltt_session::locked_ptr session{ [&condition_session_name]() {
+               auto raw_session_ptr = session_find_by_name(condition_session_name);
+
+               if (raw_session_ptr) {
+                       session_lock(raw_session_ptr);
+               }
+
+               return raw_session_ptr;
+       }() };
        if (!session) {
-               DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
-                   lttng_condition_type_str(condition_type),
-                   condition_session_name);
+               DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
+                       lttng_condition_type_str(condition_type),
+                       condition_session_name);
                /*
                 * Not a fatal error: a session can be destroyed before we get
                 * the chance to handle the notification.
                 */
-               ret = 0;
-               session_unlock_list();
-               goto end;
+               return;
        }
-       session_lock(session);
 
        if (!lttng_trigger_is_equal(session->rotate_trigger,
-                                   lttng_notification_get_const_trigger(notification))) {
-               /* Notification does not originate from our rotation trigger. */
-               ret = 0;
-               goto end_unlock;
+                                   lttng_notification_get_const_trigger(&notification))) {
+               DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
+               return;
        }
 
-       ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
-       if (ret) {
-               goto end_unlock;
-       }
+       unsubscribe_session_consumed_size_rotation(*session);
 
        ret = cmd_rotate_session(
-               session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        switch (ret) {
        case LTTNG_OK:
                break;
@@ -718,35 +629,16 @@ static int handle_condition(const struct lttng_notification *notification,
                DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
                break;
        default:
-               ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
-               ret = -1;
-               goto end_unlock;
-       }
-
-       ret = subscribe_session_consumed_size_rotation(
-               session, consumed + session->rotate_size, notification_thread_handle);
-       if (ret) {
-               ERR("Failed to subscribe to session consumed size condition");
-               goto end_unlock;
+               LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
+                               static_cast<lttng_error_code>(-ret));
        }
-       ret = 0;
 
-end_unlock:
-       session_unlock(session);
-       session_put(session);
-       session_unlock_list();
-end:
-       return ret;
+       subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
 }
 
-static int handle_notification_channel(int fd __attribute__((unused)),
-                                      struct rotation_thread_handle *handle,
-                                      struct rotation_thread *state __attribute__((unused)))
+void ls::rotation_thread::_handle_notification_channel_activity()
 {
-       int ret;
        bool notification_pending = true;
-       struct lttng_notification *notification = nullptr;
-       enum lttng_notification_channel_status status;
 
        /*
         * A notification channel may have multiple notifications queued-up internally in
@@ -765,122 +657,120 @@ static int handle_notification_channel(int fd __attribute__((unused)),
         * the channel's internal buffers.
         */
        while (notification_pending) {
-               status = lttng_notification_channel_has_pending_notification(
-                       rotate_notification_channel, &notification_pending);
-               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-                       ERR("Error occurred while checking for pending notification");
-                       ret = -1;
-                       goto end;
+               const auto pending_status = lttng_notification_channel_has_pending_notification(
+                       _notification_channel.get(), &notification_pending);
+               if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
                }
 
                if (!notification_pending) {
-                       ret = 0;
-                       goto end;
+                       return;
                }
 
                /* Receive the next notification. */
-               status = lttng_notification_channel_get_next_notification(
-                       rotate_notification_channel, &notification);
-               switch (status) {
+               lttng_notification::uptr notification;
+               enum lttng_notification_channel_status next_notification_status;
+
+               {
+                       struct lttng_notification *raw_notification_ptr;
+
+                       next_notification_status = lttng_notification_channel_get_next_notification(
+                               _notification_channel.get(), &raw_notification_ptr);
+                       notification.reset(raw_notification_ptr);
+               }
+
+               switch (next_notification_status) {
                case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
                        break;
                case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
                        WARN("Dropped notification detected on notification channel used by the rotation management thread.");
-                       ret = 0;
-                       goto end;
+                       return;
                case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-                       ERR("Notification channel was closed");
-                       ret = -1;
-                       goto end;
+                       LTTNG_THROW_ERROR("Notification channel was closed");
                default:
                        /* Unhandled conditions / errors. */
-                       ERR("Unknown notification channel status");
-                       ret = -1;
-                       goto end;
+                       LTTNG_THROW_ERROR("Unknown notification channel status");
                }
 
-               ret = handle_condition(notification, handle->notification_thread_handle);
-               lttng_notification_destroy(notification);
-               if (ret) {
-                       goto end;
-               }
+               _handle_notification(*notification);
        }
-end:
-       return ret;
 }
 
-static void *thread_rotation(void *data)
+void ls::rotation_thread::_thread_function() noexcept
 {
-       int ret;
-       struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
-       struct rotation_thread thread;
-       int queue_pipe_fd;
-
        DBG("Started rotation thread");
+
+       try {
+               _run();
+       } catch (const std::exception& e) {
+               ERR_FMT("Fatal rotation thread error: {}", e.what());
+       }
+
+       DBG("Thread exit");
+}
+
+void ls::rotation_thread::_run()
+{
        rcu_register_thread();
+       const auto unregister_rcu_thread =
+               lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
+
        rcu_thread_online();
+       const auto offline_rcu_thread =
+               lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
+
        health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
        health_code_update();
+       const auto unregister_health =
+               lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
 
-       if (!handle) {
-               ERR("Invalid thread context provided");
-               goto end;
-       }
-
-       queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
-
-       ret = init_thread_state(handle, &thread);
-       if (ret) {
-               goto error;
-       }
+       const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
 
        while (true) {
-               int fd_count, i;
-
                health_poll_entry();
                DBG("Entering poll wait");
-               ret = lttng_poll_wait(&thread.events, -1);
-               DBG("Poll wait returned (%i)", ret);
+               auto poll_wait_ret = lttng_poll_wait(&_events, -1);
+               DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
                health_poll_exit();
-               if (ret < 0) {
+               if (poll_wait_ret < 0) {
                        /*
                         * Restart interrupted system call.
                         */
                        if (errno == EINTR) {
                                continue;
                        }
-                       ERR("Error encountered during lttng_poll_wait (%i)", ret);
-                       goto error;
+
+                       LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
                }
 
-               fd_count = ret;
-               for (i = 0; i < fd_count; i++) {
-                       int fd = LTTNG_POLL_GETFD(&thread.events, i);
-                       uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
+               const auto fd_count = poll_wait_ret;
+               for (int i = 0; i < fd_count; i++) {
+                       const auto fd = LTTNG_POLL_GETFD(&_events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&_events, i);
 
-                       DBG("Handling fd (%i) activity (%u)", fd, revents);
+                       DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
 
                        if (revents & LPOLLERR) {
-                               ERR("Polling returned an error on fd %i", fd);
-                               goto error;
+                               LTTNG_THROW_ERROR(
+                                       fmt::format("Polling returned an error on fd: fd={}", fd));
                        }
 
-                       if (fd == rotate_notification_channel->socket ||
-                           fd == rotate_notification_channel_subscription_change_eventfd) {
-                               ret = handle_notification_channel(fd, handle, &thread);
-                               if (ret) {
-                                       ERR("Error occurred while handling activity on notification channel socket");
-                                       goto error;
+                       if (fd == _notification_channel->socket ||
+                           fd == _notification_channel_subscribtion_change_eventfd.fd()) {
+                               try {
+                                       _handle_notification_channel_activity();
+                               } catch (const lttng::ctl::error& e) {
+                                       /*
+                                        * The only non-fatal error (rotation failed), others
+                                        * are caught at the top-level.
+                                        */
+                                       DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
+                                               e.what());
+                                       continue;
                                }
 
-                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
-                                       uint64_t eventfd_value;
-                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
-
-                                       if (read_ret != sizeof(eventfd_value)) {
-                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
-                                               goto error;
-                                       }
+                               if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
+                                       _notification_channel_subscribtion_change_eventfd.decrement();
                                }
                        } else {
                                /* Job queue or quit pipe activity. */
@@ -891,59 +781,162 @@ static void *thread_rotation(void *data)
                                 * flushed and all references held in the queue
                                 * are released.
                                 */
-                               ret = handle_job_queue(
-                                       handle, &thread, handle->rotation_timer_queue);
-                               if (ret) {
-                                       ERR("Failed to handle rotation timer pipe event");
-                                       goto error;
-                               }
-
+                               _handle_job_queue();
                                if (fd == queue_pipe_fd) {
                                        char buf;
 
-                                       ret = lttng_read(fd, &buf, 1);
-                                       if (ret != 1) {
-                                               ERR("Failed to read from wakeup pipe (fd = %i)",
-                                                   fd);
-                                               goto error;
+                                       if (lttng_read(fd, &buf, 1) != 1) {
+                                               LTTNG_THROW_POSIX(
+                                                       fmt::format(
+                                                               "Failed to read from wakeup pipe: fd={}",
+                                                               fd),
+                                                       errno);
                                        }
                                } else {
                                        DBG("Quit pipe activity");
-                                       goto exit;
+                                       return;
                                }
                        }
                }
        }
-exit:
-error:
-       DBG("Thread exit");
-       fini_thread_state(&thread);
-end:
-       health_unregister(the_health_sessiond);
-       rcu_thread_offline();
-       rcu_unregister_thread();
-       return nullptr;
 }
 
-static bool shutdown_rotation_thread(void *thread_data)
+bool ls::rotation_thread::shutdown() const noexcept
 {
-       struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
-       const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
+       const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
 
        return notify_thread_pipe(write_fd) == 1;
 }
 
-bool launch_rotation_thread(struct rotation_thread_handle *handle)
+void ls::rotation_thread::launch_thread()
 {
-       struct lttng_thread *thread;
-
-       thread = lttng_thread_create(
-               "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
+       auto thread = lttng_thread_create(
+               "Rotation",
+               [](void *ptr) {
+                       auto handle = reinterpret_cast<rotation_thread *>(ptr);
+
+                       handle->_thread_function();
+                       return static_cast<void *>(nullptr);
+               },
+               shutdown_rotation_thread,
+               nullptr,
+               this);
        if (!thread) {
-               goto error;
+               LTTNG_THROW_ERROR("Failed to launch rotation thread");
        }
+
        lttng_thread_put(thread);
-       return true;
-error:
-       return false;
+}
+
+void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
+                                                                         std::uint64_t size)
+{
+       const struct lttng_credentials session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
+       };
+
+       ASSERT_LOCKED(session.lock);
+
+       auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
+               lttng_condition_session_consumed_size_create());
+       if (!rotate_condition) {
+               LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
+       }
+
+       auto condition_status =
+               lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
+       if (condition_status != LTTNG_CONDITION_STATUS_OK) {
+               LTTNG_THROW_ERROR(fmt::format(
+                       "Could not set session consumed size condition threshold: size={}", size));
+       }
+
+       condition_status = lttng_condition_session_consumed_size_set_session_name(rotate_condition.get(),
+                                                                                 session.name);
+       if (condition_status != LTTNG_CONDITION_STATUS_OK) {
+               LTTNG_THROW_ERROR(fmt::format(
+                       "Could not set session consumed size condition session name: name=`{}`",
+                       session.name));
+       }
+
+       auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
+               lttng_action_notify_create());
+       if (!notify_action) {
+               LTTNG_THROW_POSIX("Could not create notify action", errno);
+       }
+
+       LTTNG_ASSERT(!session.rotate_trigger);
+       /* trigger acquires its own reference to condition and action on success. */
+       auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
+               lttng_trigger_create(rotate_condition.get(), notify_action.get()));
+       if (!trigger)
+       {
+               LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
+       }
+
+       /* Ensure this trigger is not visible to external users. */
+       lttng_trigger_set_hidden(trigger.get());
+       lttng_trigger_set_credentials(trigger.get(), &session_creds);
+
+       auto nc_status =
+               lttng_notification_channel_subscribe(_notification_channel.get(), rotate_condition.get());
+       if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+               LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
+       }
+
+       /*
+        * Ensure any notification queued during the subscription are consumed by queueing an
+        * event.
+        */
+       _notification_channel_subscribtion_change_eventfd.increment();
+
+       const auto register_ret = notification_thread_command_register_trigger(
+               &_notification_thread_handle, trigger.get(), true);
+       if (register_ret != LTTNG_OK) {
+               LTTNG_THROW_CTL(
+                       fmt::format(
+                               "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
+                               session.name,
+                               size),
+                       register_ret);
+       }
+
+       /* Ownership transferred to the session. */
+       session.rotate_trigger = trigger.release();
+}
+
+void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
+{
+       LTTNG_ASSERT(session.rotate_trigger);
+
+       const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
+               lttng_trigger_put(session.rotate_trigger);
+               session.rotate_trigger = nullptr;
+       });
+
+       const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
+               _notification_channel.get(),
+               lttng_trigger_get_const_condition(session.rotate_trigger));
+       if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+               LTTNG_THROW_ERROR(fmt::format(
+                       "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
+                       session.name,
+                       static_cast<int>(unsubscribe_status)));
+       }
+
+       /*
+        * Ensure any notification queued during the un-subscription are consumed by queueing an
+        * event.
+        */
+       _notification_channel_subscribtion_change_eventfd.increment();
+
+       const auto unregister_status = notification_thread_command_unregister_trigger(
+               &_notification_thread_handle, session.rotate_trigger);
+       if (unregister_status != LTTNG_OK) {
+               LTTNG_THROW_CTL(
+                       fmt::format(
+                               "Failed to unregister trigger for automatic size-based rotation: session_name{}",
+                               session.name),
+                       unregister_status);
+       }
 }
This page took 0.040045 seconds and 4 git commands to generate.