X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Frotation-thread.cpp;fp=src%2Fbin%2Flttng-sessiond%2Frotation-thread.cpp;h=6f58a178d3abfe9aa34df0b669765341a66d9d10;hp=815b4e95f1a5644d78fd18282c4f8cd0b52e6887;hb=0038180de36c422cfaeade1145fa9fbc9436b8ad;hpb=8b75cd779ffe332281fec189cdf808e4ee452572 diff --git a/src/bin/lttng-sessiond/rotation-thread.cpp b/src/bin/lttng-sessiond/rotation-thread.cpp index 815b4e95f..6f58a178d 100644 --- a/src/bin/lttng-sessiond/rotation-thread.cpp +++ b/src/bin/lttng-sessiond/rotation-thread.cpp @@ -11,7 +11,6 @@ #include "health-sessiond.hpp" #include "lttng-sessiond.hpp" #include "notification-thread-commands.hpp" -#include "rotate.hpp" #include "rotation-thread.hpp" #include "session.hpp" #include "thread.hpp" @@ -22,13 +21,22 @@ #include #include #include +#include +#include +#include +#include #include #include #include +#include +#include +#include +#include #include #include #include +#include #include #include #include @@ -37,6 +45,7 @@ #include #include +#include #include #include #include @@ -44,131 +53,49 @@ #include #include -struct lttng_notification_channel *rotate_notification_channel = nullptr; -/* - * This eventfd is used to wake-up the rotation thread whenever a command - * completes on the notification channel. This ensures that any notification - * that was queued while waiting for a reply to the command is eventually - * consumed. - */ -int rotate_notification_channel_subscription_change_eventfd = -1; - -struct rotation_thread { - struct lttng_poll_event events; -}; +namespace ls = lttng::sessiond; /* * The timer thread enqueues jobs and wakes up the rotation thread. * When the rotation thread wakes up, it empties the queue. */ -struct rotation_thread_timer_queue { +struct ls::rotation_thread_timer_queue { struct lttng_pipe *event_pipe; struct cds_list_head list; pthread_mutex_t lock; }; -struct rotation_thread_handle { - struct rotation_thread_timer_queue *rotation_timer_queue; - /* Access to the notification thread cmd_queue */ - struct notification_thread_handle *notification_thread_handle; - /* Thread-specific quit pipe. */ - struct lttng_pipe *quit_pipe; -}; - namespace { struct rotation_thread_job { - enum rotation_thread_job_type type; + using uptr = std::unique_ptr< + rotation_thread_job, + lttng::details::create_unique_class>; + + enum ls::rotation_thread_job_type type; struct ltt_session *session; /* List member in struct rotation_thread_timer_queue. */ struct cds_list_head head; }; -} /* namespace */ -static const char *get_job_type_str(enum rotation_thread_job_type job_type) +const char *get_job_type_str(enum ls::rotation_thread_job_type job_type) { switch (job_type) { - case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION: + case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION: return "CHECK_PENDING_ROTATION"; - case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION: + case ls::rotation_thread_job_type::SCHEDULED_ROTATION: return "SCHEDULED_ROTATION"; default: abort(); } } -struct rotation_thread_timer_queue *rotation_thread_timer_queue_create() -{ - struct rotation_thread_timer_queue *queue = nullptr; - - queue = zmalloc(); - if (!queue) { - PERROR("Failed to allocate timer rotate queue"); - goto end; - } - - queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK); - CDS_INIT_LIST_HEAD(&queue->list); - pthread_mutex_init(&queue->lock, nullptr); -end: - return queue; -} - -void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue) -{ - if (!queue) { - return; - } - - lttng_pipe_destroy(queue->event_pipe); - - pthread_mutex_lock(&queue->lock); - LTTNG_ASSERT(cds_list_empty(&queue->list)); - pthread_mutex_unlock(&queue->lock); - pthread_mutex_destroy(&queue->lock); - free(queue); -} - -/* - * Destroy the thread data previously created by the init function. - */ -void rotation_thread_handle_destroy(struct rotation_thread_handle *handle) -{ - lttng_pipe_destroy(handle->quit_pipe); - free(handle); -} - -struct rotation_thread_handle * -rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue, - struct notification_thread_handle *notification_thread_handle) -{ - struct rotation_thread_handle *handle; - - handle = zmalloc(); - if (!handle) { - goto end; - } - - handle->rotation_timer_queue = rotation_timer_queue; - handle->notification_thread_handle = notification_thread_handle; - handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC); - if (!handle->quit_pipe) { - goto error; - } - -end: - return handle; -error: - rotation_thread_handle_destroy(handle); - return nullptr; -} - /* * Called with the rotation_thread_timer_queue lock held. * Return true if the same timer job already exists in the queue, false if not. */ -static bool timer_job_exists(const struct rotation_thread_timer_queue *queue, - enum rotation_thread_job_type job_type, - struct ltt_session *session) +bool timer_job_exists(const ls::rotation_thread_timer_queue *queue, + ls::rotation_thread_job_type job_type, + ltt_session *session) { bool exists = false; struct rotation_thread_job *job; @@ -183,164 +110,7 @@ end: return exists; } -void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue, - enum rotation_thread_job_type job_type, - struct ltt_session *session) -{ - int ret; - const char dummy = '!'; - struct rotation_thread_job *job = nullptr; - const char *job_type_str = get_job_type_str(job_type); - - pthread_mutex_lock(&queue->lock); - if (timer_job_exists(queue, job_type, session)) { - /* - * This timer job is already pending, we don't need to add - * it. - */ - goto end; - } - - job = zmalloc(); - if (!job) { - PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"", - job_type_str, - session->name); - goto end; - } - /* No reason for this to fail as the caller must hold a reference. */ - (void) session_get(session); - - job->session = session; - job->type = job_type; - cds_list_add_tail(&job->head, &queue->list); - - ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy)); - if (ret < 0) { - /* - * We do not want to block in the timer handler, the job has - * been enqueued in the list, the wakeup pipe is probably full, - * the job will be processed when the rotation_thread catches - * up. - */ - DIAGNOSTIC_PUSH - DIAGNOSTIC_IGNORE_LOGICAL_OP - if (errno == EAGAIN || errno == EWOULDBLOCK) { - DIAGNOSTIC_POP - /* - * Not an error, but would be surprising and indicate - * that the rotation thread can't keep up with the - * current load. - */ - DBG("Wake-up pipe of rotation thread job queue is full"); - goto end; - } - PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"", - job_type_str, - session->name); - goto end; - } - -end: - pthread_mutex_unlock(&queue->lock); -} - -static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle) -{ - int ret; - - /* - * Create pollset with size 3: - * - rotation thread quit pipe, - * - rotation thread timer queue pipe, - * - notification channel sock, - */ - ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC); - if (ret < 0) { - goto error; - } - - ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN); - if (ret < 0) { - ERR("Failed to add quit pipe read fd to poll set"); - goto error; - } - - ret = lttng_poll_add( - poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN); - if (ret < 0) { - ERR("Failed to add rotate_pending fd to poll set"); - goto error; - } - - return ret; -error: - lttng_poll_clean(poll_set); - return ret; -} - -static void fini_thread_state(struct rotation_thread *state) -{ - lttng_poll_clean(&state->events); - if (rotate_notification_channel) { - lttng_notification_channel_destroy(rotate_notification_channel); - } - - if (rotate_notification_channel_subscription_change_eventfd >= 0) { - const int close_ret = close(rotate_notification_channel_subscription_change_eventfd); - - if (close_ret) { - PERROR("Failed to close rotation thread notification channel subscription change eventfd"); - } - } -} - -static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state) -{ - int ret; - - memset(state, 0, sizeof(*state)); - lttng_poll_init(&state->events); - - ret = init_poll_set(&state->events, handle); - if (ret) { - ERR("Failed to initialize rotation thread poll set"); - goto end; - } - - rotate_notification_channel = - lttng_notification_channel_create(lttng_session_daemon_notification_endpoint); - if (!rotate_notification_channel) { - ERR("Could not create notification channel"); - ret = -1; - goto end; - } - ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN); - if (ret < 0) { - ERR("Failed to add notification fd to pollset"); - goto end; - } - - rotate_notification_channel_subscription_change_eventfd = - eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); - if (rotate_notification_channel_subscription_change_eventfd < 0) { - PERROR("Failed to create rotation thread notification channel subscription change eventfd"); - ret = -1; - goto end; - } - ret = lttng_poll_add( - &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN); - if (ret < 0) { - ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset"); - goto end; - } - -end: - return ret; -} - -static void check_session_rotation_pending_on_consumers(struct ltt_session *session, - bool *_rotation_completed) +void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed) { int ret = 0; struct consumer_socket *socket; @@ -351,70 +121,66 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess enum lttng_trace_chunk_status chunk_status; lttng::urcu::read_lock_guard read_lock; - LTTNG_ASSERT(session->chunk_being_archived); + LTTNG_ASSERT(session.chunk_being_archived); /* * Check for a local pending rotation on all consumers (32-bit * user space, 64-bit user space, and kernel). */ - if (!session->ust_session) { + if (!session.ust_session) { goto skip_ust; } cds_lfht_for_each_entry ( - session->ust_session->consumer->socks->ht, &iter, socket, node.node) { - relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ? + session.ust_session->consumer->socks->ht, &iter, socket, node.node) { + relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ? -1ULL : - session->ust_session->consumer->net_seq_index; + session.ust_session->consumer->net_seq_index; - pthread_mutex_lock(socket->lock); + lttng::pthread::lock_guard socket_lock(*socket->lock); ret = consumer_trace_chunk_exists(socket, relayd_id, - session->id, - session->chunk_being_archived, + session.id, + session.chunk_being_archived, &exists_status); if (ret) { - pthread_mutex_unlock(socket->lock); ERR("Error occurred while checking rotation status on consumer daemon"); goto end; } if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) { - pthread_mutex_unlock(socket->lock); chunk_exists_on_peer = true; goto end; } - pthread_mutex_unlock(socket->lock); } skip_ust: - if (!session->kernel_session) { + if (!session.kernel_session) { goto skip_kernel; } + cds_lfht_for_each_entry ( - session->kernel_session->consumer->socks->ht, &iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ? + session.kernel_session->consumer->socks->ht, &iter, socket, node.node) { + lttng::pthread::lock_guard socket_lock(*socket->lock); + + relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ? -1ULL : - session->kernel_session->consumer->net_seq_index; + session.kernel_session->consumer->net_seq_index; ret = consumer_trace_chunk_exists(socket, relayd_id, - session->id, - session->chunk_being_archived, + session.id, + session.chunk_being_archived, &exists_status); if (ret) { - pthread_mutex_unlock(socket->lock); ERR("Error occurred while checking rotation status on consumer daemon"); goto end; } if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) { - pthread_mutex_unlock(socket->lock); chunk_exists_on_peer = true; goto end; } - pthread_mutex_unlock(socket->lock); } skip_kernel: end: @@ -422,19 +188,20 @@ end: if (!chunk_exists_on_peer) { uint64_t chunk_being_archived_id; - chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, + chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers", chunk_being_archived_id, - session->name); + session.name); } - *_rotation_completed = !chunk_exists_on_peer; + + _rotation_completed = !chunk_exists_on_peer; if (ret) { ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR); if (ret) { - ERR("Failed to reset rotation state of session \"%s\"", session->name); + ERR("Failed to reset rotation state of session \"%s\"", session.name); } } } @@ -444,9 +211,8 @@ end: * Should only return non-zero in the event of a fatal error. Doing so will * shutdown the thread. */ -static int -check_session_rotation_pending(struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle) +int check_session_rotation_pending(ltt_session& session, + notification_thread_handle& notification_thread_handle) { int ret; struct lttng_trace_archive_location *location; @@ -455,17 +221,17 @@ check_session_rotation_pending(struct ltt_session *session, const char *archived_chunk_name; uint64_t chunk_being_archived_id; - if (!session->chunk_being_archived) { + if (!session.chunk_being_archived) { ret = 0; goto end; } chunk_status = - lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id); + lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64, - session->name, + session.name, chunk_being_archived_id); /* @@ -481,8 +247,8 @@ check_session_rotation_pending(struct ltt_session *session, goto check_ongoing_rotation; } - check_session_rotation_pending_on_consumers(session, &rotation_completed); - if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) { + check_session_rotation_pending_on_consumers(session, rotation_completed); + if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) { goto check_ongoing_rotation; } @@ -491,40 +257,41 @@ check_session_rotation_pending(struct ltt_session *session, * rotations can start now. */ chunk_status = lttng_trace_chunk_get_name( - session->chunk_being_archived, &archived_chunk_name, nullptr); + session.chunk_being_archived, &archived_chunk_name, nullptr); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - free(session->last_archived_chunk_name); - session->last_archived_chunk_name = strdup(archived_chunk_name); - if (!session->last_archived_chunk_name) { + free(session.last_archived_chunk_name); + session.last_archived_chunk_name = strdup(archived_chunk_name); + if (!session.last_archived_chunk_name) { PERROR("Failed to duplicate archived chunk name"); } + session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED); - if (!session->quiet_rotation) { - location = session_get_trace_archive_location(session); + if (!session.quiet_rotation) { + location = session_get_trace_archive_location(&session); ret = notification_thread_command_session_rotation_completed( - notification_thread_handle, - session->id, - session->last_archived_chunk_id.value, + ¬ification_thread_handle, + session.id, + session.last_archived_chunk_id.value, location); lttng_trace_archive_location_put(location); if (ret != LTTNG_OK) { ERR("Failed to notify notification thread of completed rotation for session %s", - session->name); + session.name); } } ret = 0; check_ongoing_rotation: - if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) { - chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, + if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) { + chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s", chunk_being_archived_id, - session->name); - ret = timer_session_rotation_pending_check_start(session, + session.name); + ret = timer_session_rotation_pending_check_start(&session, DEFAULT_ROTATE_PENDING_TIMER); if (ret) { ERR("Failed to re-enable rotation pending timer"); @@ -538,173 +305,317 @@ end: } /* Call with the session and session_list locks held. */ -static int launch_session_rotation(struct ltt_session *session) +int launch_session_rotation(ltt_session& session) { int ret; struct lttng_rotate_session_return rotation_return; - DBG("Launching scheduled time-based rotation on session \"%s\"", session->name); + DBG("Launching scheduled time-based rotation on session \"%s\"", session.name); - ret = cmd_rotate_session( - session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + ASSERT_SESSION_LIST_LOCKED(); + ASSERT_LOCKED(session.lock); + + ret = cmd_rotate_session(&session, + &rotation_return, + false, + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); if (ret == LTTNG_OK) { DBG("Scheduled time-based rotation successfully launched on session \"%s\"", - session->name); + session.name); } else { /* Don't consider errors as fatal. */ DBG("Scheduled time-based rotation aborted for session %s: %s", - session->name, + session.name, lttng_strerror(ret)); } + return 0; } -static int run_job(struct rotation_thread_job *job, - struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle) +int run_job(const rotation_thread_job& job, + ltt_session& session, + notification_thread_handle& notification_thread_handle) { int ret; - switch (job->type) { - case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION: + switch (job.type) { + case ls::rotation_thread_job_type::SCHEDULED_ROTATION: ret = launch_session_rotation(session); break; - case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION: + case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION: ret = check_session_rotation_pending(session, notification_thread_handle); break; default: abort(); } + return ret; } -static int handle_job_queue(struct rotation_thread_handle *handle, - struct rotation_thread *state __attribute__((unused)), - struct rotation_thread_timer_queue *queue) +bool shutdown_rotation_thread(void *thread_data) { - int ret = 0; + auto *handle = reinterpret_cast(thread_data); - for (;;) { - struct ltt_session *session; - struct rotation_thread_job *job; + return handle->shutdown(); +} +} /* namespace */ - /* Take the queue lock only to pop an element from the list. */ - pthread_mutex_lock(&queue->lock); - if (cds_list_empty(&queue->list)) { - pthread_mutex_unlock(&queue->lock); - break; +ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create() +{ + auto queue = zmalloc(); + if (!queue) { + PERROR("Failed to allocate timer rotate queue"); + goto end; + } + + queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK); + CDS_INIT_LIST_HEAD(&queue->list); + pthread_mutex_init(&queue->lock, nullptr); +end: + return queue; +} + +void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue) +{ + if (!queue) { + return; + } + + lttng_pipe_destroy(queue->event_pipe); + + { + lttng::pthread::lock_guard queue_lock(queue->lock); + + LTTNG_ASSERT(cds_list_empty(&queue->list)); + } + + pthread_mutex_destroy(&queue->lock); + free(queue); +} + +ls::rotation_thread::rotation_thread( + rotation_thread_timer_queue& rotation_timer_queue, + notification_thread_handle& notification_thread_handle) : + _rotation_timer_queue{ rotation_timer_queue }, + _notification_thread_handle{ notification_thread_handle } +{ + _quit_pipe.reset([]() { + auto raw_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!raw_pipe) { + LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno); } - job = cds_list_first_entry(&queue->list, typeof(*job), head); - cds_list_del(&job->head); - pthread_mutex_unlock(&queue->lock); - session_lock_list(); - session = job->session; - if (!session) { - DBG("Session \"%s\" not found", session->name != NULL ? session->name : ""); + return raw_pipe; + }()); + + _notification_channel.reset([]() { + auto channel = lttng_notification_channel_create( + lttng_session_daemon_notification_endpoint); + if (!channel) { + LTTNG_THROW_ERROR( + "Failed to create notification channel of rotation thread"); + } + + return channel; + }()); + + lttng_poll_init(&_events); + + /* + * Create pollset with size 4: + * - rotation thread quit pipe, + * - rotation thread timer queue pipe, + * - notification channel sock, + * - subscribtion change event fd + */ + if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) { + LTTNG_THROW_ERROR("Failed to create poll object for rotation thread"); + } + + if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) { + LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set"); + } + + if (lttng_poll_add(&_events, + lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe), + LPOLLIN) < 0) { + LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set"); + } + + if (lttng_poll_add(&_events, + _notification_channel_subscribtion_change_eventfd.fd(), + LPOLLIN) < 0) { + LTTNG_THROW_ERROR( + "Failed to add rotation thread notification channel subscription change eventfd to poll set"); + } + + if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) { + LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset"); + } +} + +ls::rotation_thread::~rotation_thread() +{ + lttng_poll_clean(&_events); +} + +void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue, + ls::rotation_thread_job_type job_type, + ltt_session *session) +{ + const char dummy = '!'; + struct rotation_thread_job *job = nullptr; + const char *job_type_str = get_job_type_str(job_type); + lttng::pthread::lock_guard queue_lock(queue->lock); + + if (timer_job_exists(queue, job_type, session)) { + /* + * This timer job is already pending, we don't need to add + * it. + */ + return; + } + + job = zmalloc(); + if (!job) { + PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"", + job_type_str, + session->name); + return; + } + + /* No reason for this to fail as the caller must hold a reference. */ + (void) session_get(session); + + job->session = session; + job->type = job_type; + cds_list_add_tail(&job->head, &queue->list); + + const int write_ret = + lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy)); + if (write_ret < 0) { + /* + * We do not want to block in the timer handler, the job has + * been enqueued in the list, the wakeup pipe is probably full, + * the job will be processed when the rotation_thread catches + * up. + */ + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP + if (errno == EAGAIN || errno == EWOULDBLOCK) { + DIAGNOSTIC_POP /* - * This is a non-fatal error, and we cannot report it to - * the user (timer), so just print the error and - * continue the processing. - * - * While the timer thread will purge pending signals for - * a session on the session's destruction, it is - * possible for a job targeting that session to have - * already been queued before it was destroyed. + * Not an error, but would be surprising and indicate + * that the rotation thread can't keep up with the + * current load. */ - free(job); - session_put(session); - session_unlock_list(); - continue; + DBG("Wake-up pipe of rotation thread job queue is full"); + return; } - session_lock(session); - ret = run_job(job, session, handle->notification_thread_handle); - session_unlock(session); - /* Release reference held by the job. */ - session_put(session); - session_unlock_list(); - free(job); - if (ret) { - goto end; - } + PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"", + job_type_str, + session->name); + return; } +} - ret = 0; +void ls::rotation_thread::_handle_job_queue() +{ + for (;;) { + rotation_thread_job::uptr job; + + { + /* Take the queue lock only to pop an element from the list. */ + lttng::pthread::lock_guard rotation_timer_queue_lock( + _rotation_timer_queue.lock); + if (cds_list_empty(&_rotation_timer_queue.list)) { + break; + } -end: - return ret; + job.reset(cds_list_first_entry( + &_rotation_timer_queue.list, typeof(rotation_thread_job), head)); + cds_list_del(&job->head); + } + + session_lock_list(); + const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); }); + + /* locked_ptr will unlock the session and release the ref held by the job. */ + session_lock(job->session); + auto session = ltt_session::locked_ptr(job->session); + + if (run_job(*job, *session, _notification_thread_handle)) { + return; + } + } } -static int handle_condition(const struct lttng_notification *notification, - struct notification_thread_handle *notification_thread_handle) +void ls::rotation_thread::_handle_notification(const lttng_notification ¬ification) { int ret = 0; const char *condition_session_name = nullptr; - enum lttng_condition_type condition_type; enum lttng_condition_status condition_status; enum lttng_evaluation_status evaluation_status; uint64_t consumed; - struct ltt_session *session; - const struct lttng_condition *condition = - lttng_notification_get_const_condition(notification); - const struct lttng_evaluation *evaluation = - lttng_notification_get_const_evaluation(notification); - - condition_type = lttng_condition_get_type(condition); + auto *condition = lttng_notification_get_const_condition(¬ification); + auto *evaluation = lttng_notification_get_const_evaluation(¬ification); + const auto condition_type = lttng_condition_get_type(condition); if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) { - ret = -1; - ERR("Condition type and session usage type are not the same"); - goto end; + LTTNG_THROW_ERROR("Unexpected condition type"); } - /* Fetch info to test */ + /* Fetch info to test. */ condition_status = lttng_condition_session_consumed_size_get_session_name( condition, &condition_session_name); if (condition_status != LTTNG_CONDITION_STATUS_OK) { - ERR("Session name could not be fetched"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Session name could not be fetched from notification"); } + evaluation_status = lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed); if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) { - ERR("Failed to get evaluation"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Failed to get consumed size from evaluation"); } + DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}", + condition_session_name, + consumed); + session_lock_list(); - session = session_find_by_name(condition_session_name); + const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); }); + + ltt_session::locked_ptr session{ [&condition_session_name]() { + auto raw_session_ptr = session_find_by_name(condition_session_name); + + if (raw_session_ptr) { + session_lock(raw_session_ptr); + } + + return raw_session_ptr; + }() }; if (!session) { - DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`", - lttng_condition_type_str(condition_type), - condition_session_name); + DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`", + lttng_condition_type_str(condition_type), + condition_session_name); /* * Not a fatal error: a session can be destroyed before we get * the chance to handle the notification. */ - ret = 0; - session_unlock_list(); - goto end; + return; } - session_lock(session); if (!lttng_trigger_is_equal(session->rotate_trigger, - lttng_notification_get_const_trigger(notification))) { - /* Notification does not originate from our rotation trigger. */ - ret = 0; - goto end_unlock; + lttng_notification_get_const_trigger(¬ification))) { + DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping"); + return; } - ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle); - if (ret) { - goto end_unlock; - } + unsubscribe_session_consumed_size_rotation(*session); ret = cmd_rotate_session( - session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); switch (ret) { case LTTNG_OK: break; @@ -718,35 +629,16 @@ static int handle_condition(const struct lttng_notification *notification, DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value"); break; default: - ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret)); - ret = -1; - goto end_unlock; - } - - ret = subscribe_session_consumed_size_rotation( - session, consumed + session->rotate_size, notification_thread_handle); - if (ret) { - ERR("Failed to subscribe to session consumed size condition"); - goto end_unlock; + LTTNG_THROW_CTL("Failed to rotate on consumed size notification", + static_cast(-ret)); } - ret = 0; -end_unlock: - session_unlock(session); - session_put(session); - session_unlock_list(); -end: - return ret; + subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size); } -static int handle_notification_channel(int fd __attribute__((unused)), - struct rotation_thread_handle *handle, - struct rotation_thread *state __attribute__((unused))) +void ls::rotation_thread::_handle_notification_channel_activity() { - int ret; bool notification_pending = true; - struct lttng_notification *notification = nullptr; - enum lttng_notification_channel_status status; /* * A notification channel may have multiple notifications queued-up internally in @@ -765,122 +657,120 @@ static int handle_notification_channel(int fd __attribute__((unused)), * the channel's internal buffers. */ while (notification_pending) { - status = lttng_notification_channel_has_pending_notification( - rotate_notification_channel, ¬ification_pending); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Error occurred while checking for pending notification"); - ret = -1; - goto end; + const auto pending_status = lttng_notification_channel_has_pending_notification( + _notification_channel.get(), ¬ification_pending); + if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + LTTNG_THROW_ERROR("Error occurred while checking for pending notification"); } if (!notification_pending) { - ret = 0; - goto end; + return; } /* Receive the next notification. */ - status = lttng_notification_channel_get_next_notification( - rotate_notification_channel, ¬ification); - switch (status) { + lttng_notification::uptr notification; + enum lttng_notification_channel_status next_notification_status; + + { + struct lttng_notification *raw_notification_ptr; + + next_notification_status = lttng_notification_channel_get_next_notification( + _notification_channel.get(), &raw_notification_ptr); + notification.reset(raw_notification_ptr); + } + + switch (next_notification_status) { case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: break; case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: WARN("Dropped notification detected on notification channel used by the rotation management thread."); - ret = 0; - goto end; + return; case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: - ERR("Notification channel was closed"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Notification channel was closed"); default: /* Unhandled conditions / errors. */ - ERR("Unknown notification channel status"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Unknown notification channel status"); } - ret = handle_condition(notification, handle->notification_thread_handle); - lttng_notification_destroy(notification); - if (ret) { - goto end; - } + _handle_notification(*notification); } -end: - return ret; } -static void *thread_rotation(void *data) +void ls::rotation_thread::_thread_function() noexcept { - int ret; - struct rotation_thread_handle *handle = (rotation_thread_handle *) data; - struct rotation_thread thread; - int queue_pipe_fd; - DBG("Started rotation thread"); + + try { + _run(); + } catch (const std::exception& e) { + ERR_FMT("Fatal rotation thread error: {}", e.what()); + } + + DBG("Thread exit"); +} + +void ls::rotation_thread::_run() +{ rcu_register_thread(); + const auto unregister_rcu_thread = + lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); }); + rcu_thread_online(); + const auto offline_rcu_thread = + lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); }); + health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION); health_code_update(); + const auto unregister_health = + lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); }); - if (!handle) { - ERR("Invalid thread context provided"); - goto end; - } - - queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe); - - ret = init_thread_state(handle, &thread); - if (ret) { - goto error; - } + const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe); while (true) { - int fd_count, i; - health_poll_entry(); DBG("Entering poll wait"); - ret = lttng_poll_wait(&thread.events, -1); - DBG("Poll wait returned (%i)", ret); + auto poll_wait_ret = lttng_poll_wait(&_events, -1); + DBG_FMT("Poll wait returned: ret={}", poll_wait_ret); health_poll_exit(); - if (ret < 0) { + if (poll_wait_ret < 0) { /* * Restart interrupted system call. */ if (errno == EINTR) { continue; } - ERR("Error encountered during lttng_poll_wait (%i)", ret); - goto error; + + LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno); } - fd_count = ret; - for (i = 0; i < fd_count; i++) { - int fd = LTTNG_POLL_GETFD(&thread.events, i); - uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i); + const auto fd_count = poll_wait_ret; + for (int i = 0; i < fd_count; i++) { + const auto fd = LTTNG_POLL_GETFD(&_events, i); + const auto revents = LTTNG_POLL_GETEV(&_events, i); - DBG("Handling fd (%i) activity (%u)", fd, revents); + DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents); if (revents & LPOLLERR) { - ERR("Polling returned an error on fd %i", fd); - goto error; + LTTNG_THROW_ERROR( + fmt::format("Polling returned an error on fd: fd={}", fd)); } - if (fd == rotate_notification_channel->socket || - fd == rotate_notification_channel_subscription_change_eventfd) { - ret = handle_notification_channel(fd, handle, &thread); - if (ret) { - ERR("Error occurred while handling activity on notification channel socket"); - goto error; + if (fd == _notification_channel->socket || + fd == _notification_channel_subscribtion_change_eventfd.fd()) { + try { + _handle_notification_channel_activity(); + } catch (const lttng::ctl::error& e) { + /* + * The only non-fatal error (rotation failed), others + * are caught at the top-level. + */ + DBG_FMT("Control error occurred while handling activity on notification channel socket: {}", + e.what()); + continue; } - if (fd == rotate_notification_channel_subscription_change_eventfd) { - uint64_t eventfd_value; - const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value)); - - if (read_ret != sizeof(eventfd_value)) { - PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd"); - goto error; - } + if (fd == _notification_channel_subscribtion_change_eventfd.fd()) { + _notification_channel_subscribtion_change_eventfd.decrement(); } } else { /* Job queue or quit pipe activity. */ @@ -891,59 +781,162 @@ static void *thread_rotation(void *data) * flushed and all references held in the queue * are released. */ - ret = handle_job_queue( - handle, &thread, handle->rotation_timer_queue); - if (ret) { - ERR("Failed to handle rotation timer pipe event"); - goto error; - } - + _handle_job_queue(); if (fd == queue_pipe_fd) { char buf; - ret = lttng_read(fd, &buf, 1); - if (ret != 1) { - ERR("Failed to read from wakeup pipe (fd = %i)", - fd); - goto error; + if (lttng_read(fd, &buf, 1) != 1) { + LTTNG_THROW_POSIX( + fmt::format( + "Failed to read from wakeup pipe: fd={}", + fd), + errno); } } else { DBG("Quit pipe activity"); - goto exit; + return; } } } } -exit: -error: - DBG("Thread exit"); - fini_thread_state(&thread); -end: - health_unregister(the_health_sessiond); - rcu_thread_offline(); - rcu_unregister_thread(); - return nullptr; } -static bool shutdown_rotation_thread(void *thread_data) +bool ls::rotation_thread::shutdown() const noexcept { - struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data; - const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe); + const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get()); return notify_thread_pipe(write_fd) == 1; } -bool launch_rotation_thread(struct rotation_thread_handle *handle) +void ls::rotation_thread::launch_thread() { - struct lttng_thread *thread; - - thread = lttng_thread_create( - "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle); + auto thread = lttng_thread_create( + "Rotation", + [](void *ptr) { + auto handle = reinterpret_cast(ptr); + + handle->_thread_function(); + return static_cast(nullptr); + }, + shutdown_rotation_thread, + nullptr, + this); if (!thread) { - goto error; + LTTNG_THROW_ERROR("Failed to launch rotation thread"); } + lttng_thread_put(thread); - return true; -error: - return false; +} + +void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session, + std::uint64_t size) +{ + const struct lttng_credentials session_creds = { + .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid), + }; + + ASSERT_LOCKED(session.lock); + + auto rotate_condition = lttng::make_unique_wrapper( + lttng_condition_session_consumed_size_create()); + if (!rotate_condition) { + LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno); + } + + auto condition_status = + lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size); + if (condition_status != LTTNG_CONDITION_STATUS_OK) { + LTTNG_THROW_ERROR(fmt::format( + "Could not set session consumed size condition threshold: size={}", size)); + } + + condition_status = lttng_condition_session_consumed_size_set_session_name(rotate_condition.get(), + session.name); + if (condition_status != LTTNG_CONDITION_STATUS_OK) { + LTTNG_THROW_ERROR(fmt::format( + "Could not set session consumed size condition session name: name=`{}`", + session.name)); + } + + auto notify_action = lttng::make_unique_wrapper( + lttng_action_notify_create()); + if (!notify_action) { + LTTNG_THROW_POSIX("Could not create notify action", errno); + } + + LTTNG_ASSERT(!session.rotate_trigger); + /* trigger acquires its own reference to condition and action on success. */ + auto trigger = lttng::make_unique_wrapper( + lttng_trigger_create(rotate_condition.get(), notify_action.get())); + if (!trigger) + { + LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno); + } + + /* Ensure this trigger is not visible to external users. */ + lttng_trigger_set_hidden(trigger.get()); + lttng_trigger_set_credentials(trigger.get(), &session_creds); + + auto nc_status = + lttng_notification_channel_subscribe(_notification_channel.get(), rotate_condition.get()); + if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification"); + } + + /* + * Ensure any notification queued during the subscription are consumed by queueing an + * event. + */ + _notification_channel_subscribtion_change_eventfd.increment(); + + const auto register_ret = notification_thread_command_register_trigger( + &_notification_thread_handle, trigger.get(), true); + if (register_ret != LTTNG_OK) { + LTTNG_THROW_CTL( + fmt::format( + "Failed to register trigger for automatic size-based rotation: session_name{}, size={}", + session.name, + size), + register_ret); + } + + /* Ownership transferred to the session. */ + session.rotate_trigger = trigger.release(); +} + +void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session) +{ + LTTNG_ASSERT(session.rotate_trigger); + + const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept { + lttng_trigger_put(session.rotate_trigger); + session.rotate_trigger = nullptr; + }); + + const auto unsubscribe_status = lttng_notification_channel_unsubscribe( + _notification_channel.get(), + lttng_trigger_get_const_condition(session.rotate_trigger)); + if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + LTTNG_THROW_ERROR(fmt::format( + "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}", + session.name, + static_cast(unsubscribe_status))); + } + + /* + * Ensure any notification queued during the un-subscription are consumed by queueing an + * event. + */ + _notification_channel_subscribtion_change_eventfd.increment(); + + const auto unregister_status = notification_thread_command_unregister_trigger( + &_notification_thread_handle, session.rotate_trigger); + if (unregister_status != LTTNG_OK) { + LTTNG_THROW_CTL( + fmt::format( + "Failed to unregister trigger for automatic size-based rotation: session_name{}", + session.name), + unregister_status); + } }