X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Frotation-thread.cpp;fp=src%2Fbin%2Flttng-sessiond%2Frotation-thread.cpp;h=815b4e95f1a5644d78fd18282c4f8cd0b52e6887;hp=4a0865eb8940a4faec5bb288ac61e50e1bca79ad;hb=dc65dda314fcd676fabfe73942c34cb93b7fea40;hpb=20c734f5b9940033f4ab96e47523c7d9e3d299bb diff --git a/src/bin/lttng-sessiond/rotation-thread.cpp b/src/bin/lttng-sessiond/rotation-thread.cpp index 4a0865eb8..815b4e95f 100644 --- a/src/bin/lttng-sessiond/rotation-thread.cpp +++ b/src/bin/lttng-sessiond/rotation-thread.cpp @@ -38,12 +38,20 @@ #include #include +#include #include #include #include #include struct lttng_notification_channel *rotate_notification_channel = nullptr; +/* + * This eventfd is used to wake-up the rotation thread whenever a command + * completes on the notification channel. This ensures that any notification + * that was queued while waiting for a reply to the command is eventually + * consumed. + */ +int rotate_notification_channel_subscription_change_eventfd = -1; struct rotation_thread { struct lttng_poll_event events; @@ -277,6 +285,14 @@ static void fini_thread_state(struct rotation_thread *state) if (rotate_notification_channel) { lttng_notification_channel_destroy(rotate_notification_channel); } + + if (rotate_notification_channel_subscription_change_eventfd >= 0) { + const int close_ret = close(rotate_notification_channel_subscription_change_eventfd); + + if (close_ret) { + PERROR("Failed to close rotation thread notification channel subscription change eventfd"); + } + } } static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state) @@ -305,6 +321,20 @@ static int init_thread_state(struct rotation_thread_handle *handle, struct rotat goto end; } + rotate_notification_channel_subscription_change_eventfd = + eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); + if (rotate_notification_channel_subscription_change_eventfd < 0) { + PERROR("Failed to create rotation thread notification channel subscription change eventfd"); + ret = -1; + goto end; + } + ret = lttng_poll_add( + &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN); + if (ret < 0) { + ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset"); + goto end; + } + end: return ret; } @@ -714,50 +744,68 @@ static int handle_notification_channel(int fd __attribute__((unused)), struct rotation_thread *state __attribute__((unused))) { int ret; - bool notification_pending; + bool notification_pending = true; struct lttng_notification *notification = nullptr; enum lttng_notification_channel_status status; - status = lttng_notification_channel_has_pending_notification(rotate_notification_channel, - ¬ification_pending); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Error occurred while checking for pending notification"); - ret = -1; - goto end; - } + /* + * A notification channel may have multiple notifications queued-up internally in + * its buffers. This is because a notification channel multiplexes command replies + * and notifications. The current protocol specifies that multiple notifications can be + * received before the reply to a command. + * + * In such cases, the notification channel client implementation internally queues them and + * provides them on the next calls to lttng_notification_channel_get_next_notification(). + * This is correct with respect to the public API, which is intended to be used in "blocking + * mode". + * + * However, this internal user relies on poll/epoll to wake-up when data is available + * on the notification channel's socket. As such, it can't assume that a wake-up means only + * one notification is available for consumption since many of them may have been queued in + * the channel's internal buffers. + */ + while (notification_pending) { + status = lttng_notification_channel_has_pending_notification( + rotate_notification_channel, ¬ification_pending); + if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + ERR("Error occurred while checking for pending notification"); + ret = -1; + goto end; + } - if (!notification_pending) { - ret = 0; - goto end; - } + if (!notification_pending) { + ret = 0; + goto end; + } - /* Receive the next notification. */ - status = lttng_notification_channel_get_next_notification(rotate_notification_channel, - ¬ification); + /* Receive the next notification. */ + status = lttng_notification_channel_get_next_notification( + rotate_notification_channel, ¬ification); + switch (status) { + case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: + break; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: + WARN("Dropped notification detected on notification channel used by the rotation management thread."); + ret = 0; + goto end; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: + ERR("Notification channel was closed"); + ret = -1; + goto end; + default: + /* Unhandled conditions / errors. */ + ERR("Unknown notification channel status"); + ret = -1; + goto end; + } - switch (status) { - case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: - break; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: - /* Not an error, we will wait for the next one */ - ret = 0; - goto end; - ; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: - ERR("Notification channel was closed"); - ret = -1; - goto end; - default: - /* Unhandled conditions / errors. */ - ERR("Unknown notification channel status"); - ret = -1; - goto end; + ret = handle_condition(notification, handle->notification_thread_handle); + lttng_notification_destroy(notification); + if (ret) { + goto end; + } } - - ret = handle_condition(notification, handle->notification_thread_handle); - end: - lttng_notification_destroy(notification); return ret; } @@ -817,12 +865,23 @@ static void *thread_rotation(void *data) goto error; } - if (fd == rotate_notification_channel->socket) { + if (fd == rotate_notification_channel->socket || + fd == rotate_notification_channel_subscription_change_eventfd) { ret = handle_notification_channel(fd, handle, &thread); if (ret) { ERR("Error occurred while handling activity on notification channel socket"); goto error; } + + if (fd == rotate_notification_channel_subscription_change_eventfd) { + uint64_t eventfd_value; + const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value)); + + if (read_ret != sizeof(eventfd_value)) { + PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd"); + goto error; + } + } } else { /* Job queue or quit pipe activity. */