Issue observed
==============
When tracing to multiple sessions with scheduled size-based rotations
that occur simultaneously (typically because they trace the same events
and use the same rotation schedule configuration), the start of some
rotations seems to be delayed indefinitely.
Cause
=====
The size-based rotations are implemented by piggy-backing onto the
channel monitoring facilities.
Essentially, a per-channel timer samples a number of statistics on the
consumer daemon end, which transmits them to the session daemon.
The session daemon's notification subsystem evaluates the statistics
against the various registered triggers bound to the channels being
monitored when a statistics sample is received.
To implement size-based rotations, internal triggers are registered with
the "consumed size" condition set to a given threshold. A session
rotation management thread (which also performs other tasks) uses a
notification channel to wait for sessions to reach their target size,
starts rotations as needed, and sets a new threshold according to the
sessions' configured rotation schedule.
The rotation thread uses liblttng-ctl's API to consume notifications
from a notification channel.
At any time, a notification channel may have multiple notifications
queued-up internally in its buffers. This is because a notification
channel multiplexes command replies and notifications over the same UNIX
socket. The current protocol specifies that multiple notifications can
be received before the reply to a command.
In such cases, the notification channel client implementation internally
queues them and provides them on the next calls to
lttng_notification_channel_get_next_notification().
This is correct with respect to the public API, which is intended to be
used in "blocking mode". However, this internal user uses the
notification channel's raw file descriptor to wake-up when a
notification is available.
This is problematic because notifications may be queued by the
notification channel (and thus removed from the socket) while waiting
for command replies (subscribing and unsubscribing from notification
conditions). In such a case, a notification is available but the
rotation thread does not wake-up to consume it as nothing is available
in the socket's buffer.
When this happens, a session that is supposed to rotate automatically
appears to grow indefinitely. It will typically eventually rotate as new
notifications become available and cause the rotation thread to wake-up.
However, a "lag" builds up as the notification that caused the wake-up
is not consumed. Instead, the last buffered notification is provided to
the rotation thread.
Solution
========
Use an event_fd to wake-up the rotation thread whenever a command
completes on the notification channel. This ensures that any
notification that was queued while waiting for a reply to the command is
eventually consumed.
Known drawbacks
===============
None.
Note
====
The use of C++ features is kept to a minimum in this patch in order to
make it easier to backport to the stable releases. A clean-up patch
follows and makes the code conform to the coding standards.
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I8974b10124704d1e66e8da32d495fee738e3d43f
int ret;
enum lttng_condition_status condition_status;
enum lttng_notification_channel_status nc_status;
int ret;
enum lttng_condition_status condition_status;
enum lttng_notification_channel_status nc_status;
- struct lttng_condition *rotate_condition = NULL;
- struct lttng_action *notify_action = NULL;
+ const uint64_t eventfd_increment_value = 1;
const struct lttng_credentials session_creds = {
.uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
.gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
};
const struct lttng_credentials session_creds = {
.uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
.gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
};
+ struct lttng_condition *rotate_condition;
+ struct lttng_action *notify_action = NULL;
rotate_condition = lttng_condition_session_consumed_size_create();
if (!rotate_condition) {
rotate_condition = lttng_condition_session_consumed_size_create();
if (!rotate_condition) {
+ ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+ &eventfd_increment_value,
+ sizeof(eventfd_increment_value));
+ if (ret != sizeof(eventfd_increment_value)) {
+ PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+ ret = -1;
+ goto end;
+ }
+
ret = notification_thread_command_register_trigger(
notification_thread_handle, session->rotate_trigger,
true);
ret = notification_thread_command_register_trigger(
notification_thread_handle, session->rotate_trigger,
true);
{
int ret = 0;
enum lttng_notification_channel_status status;
{
int ret = 0;
enum lttng_notification_channel_status status;
+ const uint64_t eventfd_increment_value = 1;
assert(session->rotate_trigger);
status = lttng_notification_channel_unsubscribe(
assert(session->rotate_trigger);
status = lttng_notification_channel_unsubscribe(
- ret = notification_thread_command_unregister_trigger(
- notification_thread_handle, session->rotate_trigger);
+ ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+ &eventfd_increment_value,
+ sizeof(eventfd_increment_value));
+ if (ret != sizeof(eventfd_increment_value)) {
+ PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+ ret = -1;
+ goto end;
+ }
+
+ ret = notification_thread_command_unregister_trigger(notification_thread_handle,
+ session->rotate_trigger);
if (ret != LTTNG_OK) {
ERR("Session unregister trigger error: %d", ret);
goto end;
if (ret != LTTNG_OK) {
ERR("Session unregister trigger error: %d", ret);
goto end;
#include <common/align.h>
#include <common/time.h>
#include <common/hashtable/utils.h>
#include <common/align.h>
#include <common/time.h>
#include <common/hashtable/utils.h>
+
+#include <inttypes.h>
+#include <signal.h>
+#include <sys/eventfd.h>
#include <sys/stat.h>
#include <time.h>
#include <signal.h>
#include <sys/stat.h>
#include <time.h>
#include <signal.h>
struct lttng_notification_channel *rotate_notification_channel = NULL;
struct lttng_notification_channel *rotate_notification_channel = NULL;
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
+
struct rotation_thread {
struct lttng_poll_event events;
};
struct rotation_thread {
struct lttng_poll_event events;
};
if (rotate_notification_channel) {
lttng_notification_channel_destroy(rotate_notification_channel);
}
if (rotate_notification_channel) {
lttng_notification_channel_destroy(rotate_notification_channel);
}
+
+ if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+ const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+ if (close_ret) {
+ PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+ }
+ }
+ rotate_notification_channel_subscription_change_eventfd =
+ eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+ if (rotate_notification_channel_subscription_change_eventfd < 0) {
+ PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+ ret = -1;
+ goto end;
+ }
+ ret = lttng_poll_add(
+ &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+ if (ret < 0) {
+ ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+ goto end;
+ }
+
struct rotation_thread *state)
{
int ret;
struct rotation_thread *state)
{
int ret;
- bool notification_pending;
+ bool notification_pending = true;
struct lttng_notification *notification = NULL;
enum lttng_notification_channel_status status;
struct lttng_notification *notification = NULL;
enum lttng_notification_channel_status status;
- status = lttng_notification_channel_has_pending_notification(
+ /*
+ * A notification channel may have multiple notifications queued-up internally in
+ * its buffers. This is because a notification channel multiplexes command replies
+ * and notifications. The current protocol specifies that multiple notifications can be
+ * received before the reply to a command.
+ *
+ * In such cases, the notification channel client implementation internally queues them and
+ * provides them on the next calls to lttng_notification_channel_get_next_notification().
+ * This is correct with respect to the public API, which is intended to be used in "blocking
+ * mode".
+ *
+ * However, this internal user relies on poll/epoll to wake-up when data is available
+ * on the notification channel's socket. As such, it can't assume that a wake-up means only
+ * one notification is available for consumption since many of them may have been queued in
+ * the channel's internal buffers.
+ */
+ while (notification_pending) {
+ status = lttng_notification_channel_has_pending_notification(
rotate_notification_channel, ¬ification_pending);
rotate_notification_channel, ¬ification_pending);
- if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
- ERR("Error occurred while checking for pending notification");
- ret = -1;
- goto end;
- }
+ if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+ ERR("Error occurred while checking for pending notification");
+ ret = -1;
+ goto end;
+ }
- if (!notification_pending) {
- ret = 0;
- goto end;
- }
+ if (!notification_pending) {
+ ret = 0;
+ goto end;
+ }
- /* Receive the next notification. */
- status = lttng_notification_channel_get_next_notification(
- rotate_notification_channel,
- ¬ification);
+ /* Receive the next notification. */
+ status = lttng_notification_channel_get_next_notification(
+ rotate_notification_channel, ¬ification);
+ switch (status) {
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+ break;
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+ WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+ ret = 0;
+ goto end;
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+ ERR("Notification channel was closed");
+ ret = -1;
+ goto end;
+ default:
+ /* Unhandled conditions / errors. */
+ ERR("Unknown notification channel status");
+ ret = -1;
+ goto end;
+ }
- switch (status) {
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
- break;
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
- /* Not an error, we will wait for the next one */
- ret = 0;
- goto end;;
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
- ERR("Notification channel was closed");
- ret = -1;
- goto end;
- default:
- /* Unhandled conditions / errors. */
- ERR("Unknown notification channel status");
- ret = -1;
- goto end;
+ ret = handle_condition(notification, handle->notification_thread_handle);
+ lttng_notification_destroy(notification);
+ if (ret) {
+ goto end;
+ }
-
- ret = handle_condition(notification,
- handle->notification_thread_handle);
-
- lttng_notification_destroy(notification);
- if (fd == rotate_notification_channel->socket) {
- ret = handle_notification_channel(fd, handle,
- &thread);
+ if (fd == rotate_notification_channel->socket ||
+ fd == rotate_notification_channel_subscription_change_eventfd) {
+ ret = handle_notification_channel(fd, handle, &thread);
if (ret) {
ERR("Error occurred while handling activity on notification channel socket");
goto error;
}
if (ret) {
ERR("Error occurred while handling activity on notification channel socket");
goto error;
}
+
+ if (fd == rotate_notification_channel_subscription_change_eventfd) {
+ uint64_t eventfd_value;
+ const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+ if (read_ret != sizeof(eventfd_value)) {
+ PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+ goto error;
+ }
+ }
} else {
/* Job queue or quit pipe activity. */
} else {
/* Job queue or quit pipe activity. */
#include "notification-thread.h"
extern struct lttng_notification_channel *rotate_notification_channel;
#include "notification-thread.h"
extern struct lttng_notification_channel *rotate_notification_channel;
+extern int rotate_notification_channel_subscription_change_eventfd;
enum rotation_thread_job_type {
ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
enum rotation_thread_job_type {
ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,