X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-sessiond%2Frotation-thread.cpp;h=815b4e95f1a5644d78fd18282c4f8cd0b52e6887;hb=8b75cd779ffe332281fec189cdf808e4ee452572;hp=7b0936f48fe70ffcb4607642d38f6dbc1ca96a98;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/rotation-thread.cpp b/src/bin/lttng-sessiond/rotation-thread.cpp index 7b0936f48..815b4e95f 100644 --- a/src/bin/lttng-sessiond/rotation-thread.cpp +++ b/src/bin/lttng-sessiond/rotation-thread.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -37,12 +38,20 @@ #include #include +#include #include #include #include #include -struct lttng_notification_channel *rotate_notification_channel = NULL; +struct lttng_notification_channel *rotate_notification_channel = nullptr; +/* + * This eventfd is used to wake-up the rotation thread whenever a command + * completes on the notification channel. This ensures that any notification + * that was queued while waiting for a reply to the command is eventually + * consumed. + */ +int rotate_notification_channel_subscription_change_eventfd = -1; struct rotation_thread { struct lttng_poll_event events; @@ -87,9 +96,9 @@ static const char *get_job_type_str(enum rotation_thread_job_type job_type) } } -struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void) +struct rotation_thread_timer_queue *rotation_thread_timer_queue_create() { - struct rotation_thread_timer_queue *queue = NULL; + struct rotation_thread_timer_queue *queue = nullptr; queue = zmalloc(); if (!queue) { @@ -99,7 +108,7 @@ struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void) queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK); CDS_INIT_LIST_HEAD(&queue->list); - pthread_mutex_init(&queue->lock, NULL); + pthread_mutex_init(&queue->lock, nullptr); end: return queue; } @@ -150,7 +159,7 @@ end: return handle; error: rotation_thread_handle_destroy(handle); - return NULL; + return nullptr; } /* @@ -180,7 +189,7 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue, { int ret; const char dummy = '!'; - struct rotation_thread_job *job = NULL; + struct rotation_thread_job *job = nullptr; const char *job_type_str = get_job_type_str(job_type); pthread_mutex_lock(&queue->lock); @@ -276,6 +285,14 @@ static void fini_thread_state(struct rotation_thread *state) if (rotate_notification_channel) { lttng_notification_channel_destroy(rotate_notification_channel); } + + if (rotate_notification_channel_subscription_change_eventfd >= 0) { + const int close_ret = close(rotate_notification_channel_subscription_change_eventfd); + + if (close_ret) { + PERROR("Failed to close rotation thread notification channel subscription change eventfd"); + } + } } static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state) @@ -304,6 +321,20 @@ static int init_thread_state(struct rotation_thread_handle *handle, struct rotat goto end; } + rotate_notification_channel_subscription_change_eventfd = + eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); + if (rotate_notification_channel_subscription_change_eventfd < 0) { + PERROR("Failed to create rotation thread notification channel subscription change eventfd"); + ret = -1; + goto end; + } + ret = lttng_poll_add( + &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN); + if (ret < 0) { + ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset"); + goto end; + } + end: return ret; } @@ -318,6 +349,7 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess uint64_t relayd_id; bool chunk_exists_on_peer = false; enum lttng_trace_chunk_status chunk_status; + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(session->chunk_being_archived); @@ -325,10 +357,10 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess * Check for a local pending rotation on all consumers (32-bit * user space, 64-bit user space, and kernel). */ - rcu_read_lock(); if (!session->ust_session) { goto skip_ust; } + cds_lfht_for_each_entry ( session->ust_session->consumer->socks->ht, &iter, socket, node.node) { relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ? @@ -386,7 +418,6 @@ skip_ust: } skip_kernel: end: - rcu_read_unlock(); if (!chunk_exists_on_peer) { uint64_t chunk_being_archived_id; @@ -460,7 +491,7 @@ check_session_rotation_pending(struct ltt_session *session, * rotations can start now. */ chunk_status = lttng_trace_chunk_get_name( - session->chunk_being_archived, &archived_chunk_name, NULL); + session->chunk_being_archived, &archived_chunk_name, nullptr); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); free(session->last_archived_chunk_name); session->last_archived_chunk_name = strdup(archived_chunk_name); @@ -609,7 +640,7 @@ static int handle_condition(const struct lttng_notification *notification, struct notification_thread_handle *notification_thread_handle) { int ret = 0; - const char *condition_session_name = NULL; + const char *condition_session_name = nullptr; enum lttng_condition_type condition_type; enum lttng_condition_status condition_status; enum lttng_evaluation_status evaluation_status; @@ -673,7 +704,7 @@ static int handle_condition(const struct lttng_notification *notification, } ret = cmd_rotate_session( - session, NULL, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); switch (ret) { case LTTNG_OK: break; @@ -713,50 +744,68 @@ static int handle_notification_channel(int fd __attribute__((unused)), struct rotation_thread *state __attribute__((unused))) { int ret; - bool notification_pending; - struct lttng_notification *notification = NULL; + bool notification_pending = true; + struct lttng_notification *notification = nullptr; enum lttng_notification_channel_status status; - status = lttng_notification_channel_has_pending_notification(rotate_notification_channel, - ¬ification_pending); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Error occurred while checking for pending notification"); - ret = -1; - goto end; - } + /* + * A notification channel may have multiple notifications queued-up internally in + * its buffers. This is because a notification channel multiplexes command replies + * and notifications. The current protocol specifies that multiple notifications can be + * received before the reply to a command. + * + * In such cases, the notification channel client implementation internally queues them and + * provides them on the next calls to lttng_notification_channel_get_next_notification(). + * This is correct with respect to the public API, which is intended to be used in "blocking + * mode". + * + * However, this internal user relies on poll/epoll to wake-up when data is available + * on the notification channel's socket. As such, it can't assume that a wake-up means only + * one notification is available for consumption since many of them may have been queued in + * the channel's internal buffers. + */ + while (notification_pending) { + status = lttng_notification_channel_has_pending_notification( + rotate_notification_channel, ¬ification_pending); + if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + ERR("Error occurred while checking for pending notification"); + ret = -1; + goto end; + } - if (!notification_pending) { - ret = 0; - goto end; - } + if (!notification_pending) { + ret = 0; + goto end; + } - /* Receive the next notification. */ - status = lttng_notification_channel_get_next_notification(rotate_notification_channel, - ¬ification); + /* Receive the next notification. */ + status = lttng_notification_channel_get_next_notification( + rotate_notification_channel, ¬ification); + switch (status) { + case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: + break; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: + WARN("Dropped notification detected on notification channel used by the rotation management thread."); + ret = 0; + goto end; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: + ERR("Notification channel was closed"); + ret = -1; + goto end; + default: + /* Unhandled conditions / errors. */ + ERR("Unknown notification channel status"); + ret = -1; + goto end; + } - switch (status) { - case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: - break; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: - /* Not an error, we will wait for the next one */ - ret = 0; - goto end; - ; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: - ERR("Notification channel was closed"); - ret = -1; - goto end; - default: - /* Unhandled conditions / errors. */ - ERR("Unknown notification channel status"); - ret = -1; - goto end; + ret = handle_condition(notification, handle->notification_thread_handle); + lttng_notification_destroy(notification); + if (ret) { + goto end; + } } - - ret = handle_condition(notification, handle->notification_thread_handle); - end: - lttng_notification_destroy(notification); return ret; } @@ -816,12 +865,23 @@ static void *thread_rotation(void *data) goto error; } - if (fd == rotate_notification_channel->socket) { + if (fd == rotate_notification_channel->socket || + fd == rotate_notification_channel_subscription_change_eventfd) { ret = handle_notification_channel(fd, handle, &thread); if (ret) { ERR("Error occurred while handling activity on notification channel socket"); goto error; } + + if (fd == rotate_notification_channel_subscription_change_eventfd) { + uint64_t eventfd_value; + const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value)); + + if (read_ret != sizeof(eventfd_value)) { + PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd"); + goto error; + } + } } else { /* Job queue or quit pipe activity. */ @@ -862,7 +922,7 @@ end: health_unregister(the_health_sessiond); rcu_thread_offline(); rcu_unregister_thread(); - return NULL; + return nullptr; } static bool shutdown_rotation_thread(void *thread_data) @@ -878,7 +938,7 @@ bool launch_rotation_thread(struct rotation_thread_handle *handle) struct lttng_thread *thread; thread = lttng_thread_create( - "Rotation", thread_rotation, shutdown_rotation_thread, NULL, handle); + "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle); if (!thread) { goto error; }