Fix: sessiond: size-based notification occasionally not triggered
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 4 Apr 2023 15:03:03 +0000 (11:03 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 1 Jun 2023 00:32:32 +0000 (20:32 -0400)
Issue observed
==============

When tracing to multiple sessions with scheduled size-based rotations
that occur simultaneously (typically because they trace the same events
and use the same rotation schedule configuration), the start of some
rotations seems to be delayed indefinitely.

Cause
=====

The size-based rotations are implemented by piggy-backing onto the
channel monitoring facilities.

Essentially, a per-channel timer samples a number of statistics on the
consumer daemon end, which transmits them to the session daemon.

The session daemon's notification subsystem evaluates the statistics
against the various registered triggers bound to the channels being
monitored when a statistics sample is received.

To implement size-based rotations, internal triggers are registered with
the "consumed size" condition set to a given threshold. A session
rotation management thread (which also performs other tasks) uses a
notification channel to wait for sessions to reach their target size,
starts rotations as needed, and sets a new threshold according to the
sessions' configured rotation schedule.

The rotation thread uses liblttng-ctl's API to consume notifications
from a notification channel.

At any time, a notification channel may have multiple notifications
queued-up internally in its buffers. This is because a notification
channel multiplexes command replies and notifications over the same UNIX
socket. The current protocol specifies that multiple notifications can
be received before the reply to a command.

In such cases, the notification channel client implementation internally
queues them and provides them on the next calls to
lttng_notification_channel_get_next_notification().

This is correct with respect to the public API, which is intended to be
used in "blocking mode". However, this internal user uses the
notification channel's raw file descriptor to wake-up when a
notification is available.

This is problematic because notifications may be queued by the
notification channel (and thus removed from the socket) while waiting
for command replies (subscribing and unsubscribing from notification
conditions). In such a case, a notification is available but the
rotation thread does not wake-up to consume it as nothing is available
in the socket's buffer.

When this happens, a session that is supposed to rotate automatically
appears to grow indefinitely. It will typically eventually rotate as new
notifications become available and cause the rotation thread to wake-up.
However, a "lag" builds up as the notification that caused the wake-up
is not consumed. Instead, the last buffered notification is provided to
the rotation thread.

Solution
========

Use an event_fd to wake-up the rotation thread whenever a command
completes on the notification channel. This ensures that any
notification that was queued while waiting for a reply to the command is
eventually consumed.

Known drawbacks
===============

None.

Note
====

The use of C++ features is kept to a minimum in this patch in order to
make it easier to backport to the stable releases. A clean-up patch
follows and makes the code conform to the coding standards.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I8974b10124704d1e66e8da32d495fee738e3d43f

src/bin/lttng-sessiond/rotate.c
src/bin/lttng-sessiond/rotation-thread.c
src/bin/lttng-sessiond/rotation-thread.h

index 9e1126bdf03e6b86b523f659af3ef1359ba847b4..1cbff999a283b14ea1b8dd06c1448a3ad58c47c2 100644 (file)
@@ -47,12 +47,13 @@ int subscribe_session_consumed_size_rotation(struct ltt_session *session, uint64
        int ret;
        enum lttng_condition_status condition_status;
        enum lttng_notification_channel_status nc_status;
-       struct lttng_condition *rotate_condition = NULL;
-       struct lttng_action *notify_action = NULL;
+       const uint64_t eventfd_increment_value = 1;
        const struct lttng_credentials session_creds = {
                .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
                .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
        };
+       struct lttng_condition *rotate_condition;
+       struct lttng_action *notify_action = NULL;
 
        rotate_condition = lttng_condition_session_consumed_size_create();
        if (!rotate_condition) {
@@ -109,6 +110,15 @@ int subscribe_session_consumed_size_rotation(struct ltt_session *session, uint64
                goto end;
        }
 
+       ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+                         &eventfd_increment_value,
+                         sizeof(eventfd_increment_value));
+       if (ret != sizeof(eventfd_increment_value)) {
+               PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+               ret = -1;
+               goto end;
+       }
+
        ret = notification_thread_command_register_trigger(
                        notification_thread_handle, session->rotate_trigger,
                        true);
@@ -134,6 +144,7 @@ int unsubscribe_session_consumed_size_rotation(struct ltt_session *session,
 {
        int ret = 0;
        enum lttng_notification_channel_status status;
+       const uint64_t eventfd_increment_value = 1;
 
        assert(session->rotate_trigger);
        status = lttng_notification_channel_unsubscribe(
@@ -145,8 +156,17 @@ int unsubscribe_session_consumed_size_rotation(struct ltt_session *session,
                goto end;
        }
 
-       ret = notification_thread_command_unregister_trigger(
-                       notification_thread_handle, session->rotate_trigger);
+       ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+                         &eventfd_increment_value,
+                         sizeof(eventfd_increment_value));
+       if (ret != sizeof(eventfd_increment_value)) {
+               PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+               ret = -1;
+               goto end;
+       }
+
+       ret = notification_thread_command_unregister_trigger(notification_thread_handle,
+                                                            session->rotate_trigger);
        if (ret != LTTNG_OK) {
                ERR("Session unregister trigger error: %d", ret);
                goto end;
index 297912c1197a50f2841b3156fdbf0b1a807ba58e..37ff6f72c41207ec3ff8c64d349c1369b3d43b19 100644 (file)
 #include <common/align.h>
 #include <common/time.h>
 #include <common/hashtable/utils.h>
+
+#include <inttypes.h>
+#include <signal.h>
+#include <sys/eventfd.h>
 #include <sys/stat.h>
 #include <time.h>
 #include <signal.h>
 
 struct lttng_notification_channel *rotate_notification_channel = NULL;
 
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
+
 struct rotation_thread {
        struct lttng_poll_event events;
 };
@@ -280,6 +292,14 @@ void fini_thread_state(struct rotation_thread *state)
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
+
+       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+               if (close_ret) {
+                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+               }
+       }
 }
 
 static
@@ -311,6 +331,20 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
+       rotate_notification_channel_subscription_change_eventfd =
+               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+       if (rotate_notification_channel_subscription_change_eventfd < 0) {
+               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+               ret = -1;
+               goto end;
+       }
+       ret = lttng_poll_add(
+               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+       if (ret < 0) {
+               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+               goto end;
+       }
+
 end:
        return ret;
 }
@@ -730,51 +764,68 @@ int handle_notification_channel(int fd,
                struct rotation_thread *state)
 {
        int ret;
-       bool notification_pending;
+       bool notification_pending = true;
        struct lttng_notification *notification = NULL;
        enum lttng_notification_channel_status status;
 
-       status = lttng_notification_channel_has_pending_notification(
+       /*
+        * A notification channel may have multiple notifications queued-up internally in
+        * its buffers. This is because a notification channel multiplexes command replies
+        * and notifications. The current protocol specifies that multiple notifications can be
+        * received before the reply to a command.
+        *
+        * In such cases, the notification channel client implementation internally queues them and
+        * provides them on the next calls to lttng_notification_channel_get_next_notification().
+        * This is correct with respect to the public API, which is intended to be used in "blocking
+        * mode".
+        *
+        * However, this internal user relies on poll/epoll to wake-up when data is available
+        * on the notification channel's socket. As such, it can't assume that a wake-up means only
+        * one notification is available for consumption since many of them may have been queued in
+        * the channel's internal buffers.
+        */
+       while (notification_pending) {
+               status = lttng_notification_channel_has_pending_notification(
                        rotate_notification_channel, &notification_pending);
-       if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Error occurred while checking for pending notification");
-               ret = -1;
-               goto end;
-       }
+               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       ERR("Error occurred while checking for pending notification");
+                       ret = -1;
+                       goto end;
+               }
 
-       if (!notification_pending) {
-               ret = 0;
-               goto end;
-       }
+               if (!notification_pending) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Receive the next notification. */
-       status = lttng_notification_channel_get_next_notification(
-                       rotate_notification_channel,
-                       &notification);
+               /* Receive the next notification. */
+               status = lttng_notification_channel_get_next_notification(
+                       rotate_notification_channel, &notification);
+               switch (status) {
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+                       break;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+                       WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+                       ret = 0;
+                       goto end;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+                       ERR("Notification channel was closed");
+                       ret = -1;
+                       goto end;
+               default:
+                       /* Unhandled conditions / errors. */
+                       ERR("Unknown notification channel status");
+                       ret = -1;
+                       goto end;
+               }
 
-       switch (status) {
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
-               break;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
-               /* Not an error, we will wait for the next one */
-               ret = 0;
-               goto end;;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-               ERR("Notification channel was closed");
-               ret = -1;
-               goto end;
-       default:
-               /* Unhandled conditions / errors. */
-               ERR("Unknown notification channel status");
-               ret = -1;
-               goto end;
+               ret = handle_condition(notification, handle->notification_thread_handle);
+               lttng_notification_destroy(notification);
+               if (ret) {
+                       goto end;
+               }
        }
-
-       ret = handle_condition(notification,
-                       handle->notification_thread_handle);
-
 end:
-       lttng_notification_destroy(notification);
        return ret;
 }
 
@@ -838,13 +889,23 @@ void *thread_rotation(void *data)
                                goto error;
                        }
 
-                       if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, handle,
-                                               &thread);
+                       if (fd == rotate_notification_channel->socket ||
+                           fd == rotate_notification_channel_subscription_change_eventfd) {
+                               ret = handle_notification_channel(fd, handle, &thread);
                                if (ret) {
                                        ERR("Error occurred while handling activity on notification channel socket");
                                        goto error;
                                }
+
+                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
+                                       uint64_t eventfd_value;
+                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+                                       if (read_ret != sizeof(eventfd_value)) {
+                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+                                               goto error;
+                                       }
+                               }
                        } else {
                                /* Job queue or quit pipe activity. */
 
index 27ef3954e5051520fdba794f900217626effab48..411a806d994e4a2563625ab6fe6f099d7585faa4 100644 (file)
@@ -22,6 +22,7 @@
 #include "notification-thread.h"
 
 extern struct lttng_notification_channel *rotate_notification_channel;
+extern int rotate_notification_channel_subscription_change_eventfd;
 
 enum rotation_thread_job_type {
        ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
This page took 0.029991 seconds and 4 git commands to generate.