Fix: sessiond: size-based notification occasionally not triggered
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
index 590b95dd224b0cc0f6e5351b4cd85f147a66d8ac..37ff6f72c41207ec3ff8c64d349c1369b3d43b19 100644 (file)
 #include <common/align.h>
 #include <common/time.h>
 #include <common/hashtable/utils.h>
+
+#include <inttypes.h>
+#include <signal.h>
+#include <sys/eventfd.h>
 #include <sys/stat.h>
 #include <time.h>
 #include <signal.h>
@@ -26,6 +30,7 @@
 #include <lttng/rotate-internal.h>
 #include <lttng/location-internal.h>
 #include <lttng/condition/condition-internal.h>
+#include <lttng/notification/notification-internal.h>
 
 #include "rotation-thread.h"
 #include "lttng-sessiond.h"
 
 struct lttng_notification_channel *rotate_notification_channel = NULL;
 
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
+
 struct rotation_thread {
        struct lttng_poll_event events;
 };
@@ -279,6 +292,14 @@ void fini_thread_state(struct rotation_thread *state)
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
+
+       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+               if (close_ret) {
+                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+               }
+       }
 }
 
 static
@@ -310,6 +331,20 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
+       rotate_notification_channel_subscription_change_eventfd =
+               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+       if (rotate_notification_channel_subscription_change_eventfd < 0) {
+               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+               ret = -1;
+               goto end;
+       }
+       ret = lttng_poll_add(
+               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+       if (ret < 0) {
+               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+               goto end;
+       }
+
 end:
        return ret;
 }
@@ -619,8 +654,7 @@ end:
 }
 
 static
-int handle_condition(const struct lttng_condition *condition,
-               const struct lttng_evaluation *evaluation,
+int handle_condition(const struct lttng_notification *notification,
                struct notification_thread_handle *notification_thread_handle)
 {
        int ret = 0;
@@ -630,6 +664,10 @@ int handle_condition(const struct lttng_condition *condition,
        enum lttng_evaluation_status evaluation_status;
        uint64_t consumed;
        struct ltt_session *session;
+       const struct lttng_condition *condition =
+                       lttng_notification_get_const_condition(notification);
+       const struct lttng_evaluation *evaluation =
+                       lttng_notification_get_const_evaluation(notification);
 
        condition_type = lttng_condition_get_type(condition);
 
@@ -671,25 +709,41 @@ int handle_condition(const struct lttng_condition *condition,
        }
        session_lock(session);
 
+       if (!lttng_trigger_is_equal(session->rotate_trigger,
+                       lttng_notification_get_const_trigger(notification))) {
+               /* Notification does not originate from our rotation trigger. */
+               ret = 0;
+               goto end_unlock;
+       }
+
        ret = unsubscribe_session_consumed_size_rotation(session,
                        notification_thread_handle);
        if (ret) {
                goto end_unlock;
        }
 
-       ret = cmd_rotate_session(session, NULL, false,
-               LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
-       if (ret == -LTTNG_ERR_ROTATION_PENDING) {
+       ret = cmd_rotate_session(
+                       session, NULL, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+       switch (ret) {
+       case LTTNG_OK:
+               break;
+       case -LTTNG_ERR_ROTATION_PENDING:
                DBG("Rotate already pending, subscribe to the next threshold value");
-       } else if (ret != LTTNG_OK) {
-               ERR("Failed to rotate on size notification with error: %s",
-                               lttng_strerror(ret));
+               break;
+       case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+               DBG("Rotation already happened since last stop, subscribe to the next threshold value");
+               break;
+       case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+               DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
+               break;
+       default:
+               ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
                ret = -1;
                goto end_unlock;
        }
-       ret = subscribe_session_consumed_size_rotation(session,
-                       consumed + session->rotate_size,
-                       notification_thread_handle);
+
+       ret = subscribe_session_consumed_size_rotation(
+                       session, consumed + session->rotate_size, notification_thread_handle);
        if (ret) {
                ERR("Failed to subscribe to session consumed size condition");
                goto end_unlock;
@@ -710,56 +764,68 @@ int handle_notification_channel(int fd,
                struct rotation_thread *state)
 {
        int ret;
-       bool notification_pending;
+       bool notification_pending = true;
        struct lttng_notification *notification = NULL;
        enum lttng_notification_channel_status status;
-       const struct lttng_evaluation *notification_evaluation;
-       const struct lttng_condition *notification_condition;
 
-       status = lttng_notification_channel_has_pending_notification(
+       /*
+        * A notification channel may have multiple notifications queued-up internally in
+        * its buffers. This is because a notification channel multiplexes command replies
+        * and notifications. The current protocol specifies that multiple notifications can be
+        * received before the reply to a command.
+        *
+        * In such cases, the notification channel client implementation internally queues them and
+        * provides them on the next calls to lttng_notification_channel_get_next_notification().
+        * This is correct with respect to the public API, which is intended to be used in "blocking
+        * mode".
+        *
+        * However, this internal user relies on poll/epoll to wake-up when data is available
+        * on the notification channel's socket. As such, it can't assume that a wake-up means only
+        * one notification is available for consumption since many of them may have been queued in
+        * the channel's internal buffers.
+        */
+       while (notification_pending) {
+               status = lttng_notification_channel_has_pending_notification(
                        rotate_notification_channel, &notification_pending);
-       if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Error occurred while checking for pending notification");
-               ret = -1;
-               goto end;
-       }
+               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       ERR("Error occurred while checking for pending notification");
+                       ret = -1;
+                       goto end;
+               }
 
-       if (!notification_pending) {
-               ret = 0;
-               goto end;
-       }
+               if (!notification_pending) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Receive the next notification. */
-       status = lttng_notification_channel_get_next_notification(
-                       rotate_notification_channel,
-                       &notification);
+               /* Receive the next notification. */
+               status = lttng_notification_channel_get_next_notification(
+                       rotate_notification_channel, &notification);
+               switch (status) {
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+                       break;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+                       WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+                       ret = 0;
+                       goto end;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+                       ERR("Notification channel was closed");
+                       ret = -1;
+                       goto end;
+               default:
+                       /* Unhandled conditions / errors. */
+                       ERR("Unknown notification channel status");
+                       ret = -1;
+                       goto end;
+               }
 
-       switch (status) {
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
-               break;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
-               /* Not an error, we will wait for the next one */
-               ret = 0;
-               goto end;;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-               ERR("Notification channel was closed");
-               ret = -1;
-               goto end;
-       default:
-               /* Unhandled conditions / errors. */
-               ERR("Unknown notification channel status");
-               ret = -1;
-               goto end;
+               ret = handle_condition(notification, handle->notification_thread_handle);
+               lttng_notification_destroy(notification);
+               if (ret) {
+                       goto end;
+               }
        }
-
-       notification_condition = lttng_notification_get_condition(notification);
-       notification_evaluation = lttng_notification_get_evaluation(notification);
-
-       ret = handle_condition(notification_condition, notification_evaluation,
-                       handle->notification_thread_handle);
-
 end:
-       lttng_notification_destroy(notification);
        return ret;
 }
 
@@ -823,13 +889,23 @@ void *thread_rotation(void *data)
                                goto error;
                        }
 
-                       if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, handle,
-                                               &thread);
+                       if (fd == rotate_notification_channel->socket ||
+                           fd == rotate_notification_channel_subscription_change_eventfd) {
+                               ret = handle_notification_channel(fd, handle, &thread);
                                if (ret) {
                                        ERR("Error occurred while handling activity on notification channel socket");
                                        goto error;
                                }
+
+                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
+                                       uint64_t eventfd_value;
+                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+                                       if (read_ret != sizeof(eventfd_value)) {
+                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+                                               goto error;
+                                       }
+                               }
                        } else {
                                /* Job queue or quit pipe activity. */
 
This page took 0.026212 seconds and 4 git commands to generate.