X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Frotation-thread.c;h=37ff6f72c41207ec3ff8c64d349c1369b3d43b19;hb=0331d3e51793490c8dd511022823bb6908a19dce;hp=590b95dd224b0cc0f6e5351b4cd85f147a66d8ac;hpb=cb2e451438cacef211d42ab9d46aa20af9fc598c;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/rotation-thread.c b/src/bin/lttng-sessiond/rotation-thread.c index 590b95dd2..37ff6f72c 100644 --- a/src/bin/lttng-sessiond/rotation-thread.c +++ b/src/bin/lttng-sessiond/rotation-thread.c @@ -16,6 +16,10 @@ #include #include #include + +#include +#include +#include #include #include #include @@ -26,6 +30,7 @@ #include #include #include +#include #include "rotation-thread.h" #include "lttng-sessiond.h" @@ -43,6 +48,14 @@ struct lttng_notification_channel *rotate_notification_channel = NULL; +/* + * This eventfd is used to wake-up the rotation thread whenever a command + * completes on the notification channel. This ensures that any notification + * that was queued while waiting for a reply to the command is eventually + * consumed. + */ +int rotate_notification_channel_subscription_change_eventfd = -1; + struct rotation_thread { struct lttng_poll_event events; }; @@ -279,6 +292,14 @@ void fini_thread_state(struct rotation_thread *state) if (rotate_notification_channel) { lttng_notification_channel_destroy(rotate_notification_channel); } + + if (rotate_notification_channel_subscription_change_eventfd >= 0) { + const int close_ret = close(rotate_notification_channel_subscription_change_eventfd); + + if (close_ret) { + PERROR("Failed to close rotation thread notification channel subscription change eventfd"); + } + } } static @@ -310,6 +331,20 @@ int init_thread_state(struct rotation_thread_handle *handle, goto end; } + rotate_notification_channel_subscription_change_eventfd = + eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); + if (rotate_notification_channel_subscription_change_eventfd < 0) { + PERROR("Failed to create rotation thread notification channel subscription change eventfd"); + ret = -1; + goto end; + } + ret = lttng_poll_add( + &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN); + if (ret < 0) { + ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset"); + goto end; + } + end: return ret; } @@ -619,8 +654,7 @@ end: } static -int handle_condition(const struct lttng_condition *condition, - const struct lttng_evaluation *evaluation, +int handle_condition(const struct lttng_notification *notification, struct notification_thread_handle *notification_thread_handle) { int ret = 0; @@ -630,6 +664,10 @@ int handle_condition(const struct lttng_condition *condition, enum lttng_evaluation_status evaluation_status; uint64_t consumed; struct ltt_session *session; + const struct lttng_condition *condition = + lttng_notification_get_const_condition(notification); + const struct lttng_evaluation *evaluation = + lttng_notification_get_const_evaluation(notification); condition_type = lttng_condition_get_type(condition); @@ -671,25 +709,41 @@ int handle_condition(const struct lttng_condition *condition, } session_lock(session); + if (!lttng_trigger_is_equal(session->rotate_trigger, + lttng_notification_get_const_trigger(notification))) { + /* Notification does not originate from our rotation trigger. */ + ret = 0; + goto end_unlock; + } + ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle); if (ret) { goto end_unlock; } - ret = cmd_rotate_session(session, NULL, false, - LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); - if (ret == -LTTNG_ERR_ROTATION_PENDING) { + ret = cmd_rotate_session( + session, NULL, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + switch (ret) { + case LTTNG_OK: + break; + case -LTTNG_ERR_ROTATION_PENDING: DBG("Rotate already pending, subscribe to the next threshold value"); - } else if (ret != LTTNG_OK) { - ERR("Failed to rotate on size notification with error: %s", - lttng_strerror(ret)); + break; + case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP: + DBG("Rotation already happened since last stop, subscribe to the next threshold value"); + break; + case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR: + DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value"); + break; + default: + ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret)); ret = -1; goto end_unlock; } - ret = subscribe_session_consumed_size_rotation(session, - consumed + session->rotate_size, - notification_thread_handle); + + ret = subscribe_session_consumed_size_rotation( + session, consumed + session->rotate_size, notification_thread_handle); if (ret) { ERR("Failed to subscribe to session consumed size condition"); goto end_unlock; @@ -710,56 +764,68 @@ int handle_notification_channel(int fd, struct rotation_thread *state) { int ret; - bool notification_pending; + bool notification_pending = true; struct lttng_notification *notification = NULL; enum lttng_notification_channel_status status; - const struct lttng_evaluation *notification_evaluation; - const struct lttng_condition *notification_condition; - status = lttng_notification_channel_has_pending_notification( + /* + * A notification channel may have multiple notifications queued-up internally in + * its buffers. This is because a notification channel multiplexes command replies + * and notifications. The current protocol specifies that multiple notifications can be + * received before the reply to a command. + * + * In such cases, the notification channel client implementation internally queues them and + * provides them on the next calls to lttng_notification_channel_get_next_notification(). + * This is correct with respect to the public API, which is intended to be used in "blocking + * mode". + * + * However, this internal user relies on poll/epoll to wake-up when data is available + * on the notification channel's socket. As such, it can't assume that a wake-up means only + * one notification is available for consumption since many of them may have been queued in + * the channel's internal buffers. + */ + while (notification_pending) { + status = lttng_notification_channel_has_pending_notification( rotate_notification_channel, ¬ification_pending); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Error occurred while checking for pending notification"); - ret = -1; - goto end; - } + if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + ERR("Error occurred while checking for pending notification"); + ret = -1; + goto end; + } - if (!notification_pending) { - ret = 0; - goto end; - } + if (!notification_pending) { + ret = 0; + goto end; + } - /* Receive the next notification. */ - status = lttng_notification_channel_get_next_notification( - rotate_notification_channel, - ¬ification); + /* Receive the next notification. */ + status = lttng_notification_channel_get_next_notification( + rotate_notification_channel, ¬ification); + switch (status) { + case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: + break; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: + WARN("Dropped notification detected on notification channel used by the rotation management thread."); + ret = 0; + goto end; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: + ERR("Notification channel was closed"); + ret = -1; + goto end; + default: + /* Unhandled conditions / errors. */ + ERR("Unknown notification channel status"); + ret = -1; + goto end; + } - switch (status) { - case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: - break; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: - /* Not an error, we will wait for the next one */ - ret = 0; - goto end;; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: - ERR("Notification channel was closed"); - ret = -1; - goto end; - default: - /* Unhandled conditions / errors. */ - ERR("Unknown notification channel status"); - ret = -1; - goto end; + ret = handle_condition(notification, handle->notification_thread_handle); + lttng_notification_destroy(notification); + if (ret) { + goto end; + } } - - notification_condition = lttng_notification_get_condition(notification); - notification_evaluation = lttng_notification_get_evaluation(notification); - - ret = handle_condition(notification_condition, notification_evaluation, - handle->notification_thread_handle); - end: - lttng_notification_destroy(notification); return ret; } @@ -823,13 +889,23 @@ void *thread_rotation(void *data) goto error; } - if (fd == rotate_notification_channel->socket) { - ret = handle_notification_channel(fd, handle, - &thread); + if (fd == rotate_notification_channel->socket || + fd == rotate_notification_channel_subscription_change_eventfd) { + ret = handle_notification_channel(fd, handle, &thread); if (ret) { ERR("Error occurred while handling activity on notification channel socket"); goto error; } + + if (fd == rotate_notification_channel_subscription_change_eventfd) { + uint64_t eventfd_value; + const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value)); + + if (read_ret != sizeof(eventfd_value)) { + PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd"); + goto error; + } + } } else { /* Job queue or quit pipe activity. */