/*
- * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
- * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <common/align.h>
#include <common/time.h>
#include <common/hashtable/utils.h>
+
+#include <inttypes.h>
+#include <signal.h>
#include <sys/eventfd.h>
#include <sys/stat.h>
#include <time.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <lttng/notification/channel-internal.h>
#include <lttng/rotate-internal.h>
+#include <lttng/location-internal.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/notification/notification-internal.h>
#include "rotation-thread.h"
#include "lttng-sessiond.h"
struct lttng_notification_channel *rotate_notification_channel = NULL;
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
+
struct rotation_thread {
struct lttng_poll_event events;
};
lttng_pipe_get_readfd(handle->quit_pipe),
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add quit pipe read fd to poll set");
+ ERR("Failed to add quit pipe read fd to poll set");
goto error;
}
lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add rotate_pending fd to poll set");
+ ERR("Failed to add rotate_pending fd to poll set");
goto error;
}
if (rotate_notification_channel) {
lttng_notification_channel_destroy(rotate_notification_channel);
}
+
+ if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+ const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+ if (close_ret) {
+ PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+ }
+ }
}
static
ret = init_poll_set(&state->events, handle);
if (ret) {
- ERR("[rotation-thread] Failed to initialize rotation thread poll set");
+ ERR("Failed to initialize rotation thread poll set");
goto end;
}
rotate_notification_channel = lttng_notification_channel_create(
lttng_session_daemon_notification_endpoint);
if (!rotate_notification_channel) {
- ERR("[rotation-thread] Could not create notification channel");
+ ERR("Could not create notification channel");
ret = -1;
goto end;
}
ret = lttng_poll_add(&state->events, rotate_notification_channel->socket,
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add notification fd to pollset");
+ ERR("Failed to add notification fd to pollset");
+ goto end;
+ }
+
+ rotate_notification_channel_subscription_change_eventfd =
+ eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+ if (rotate_notification_channel_subscription_change_eventfd < 0) {
+ PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+ ret = -1;
+ goto end;
+ }
+ ret = lttng_poll_add(
+ &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+ if (ret < 0) {
+ ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
goto end;
}
goto end;
}
- if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
pthread_mutex_unlock(socket->lock);
chunk_exists_on_peer = true;
goto end;
- }
+ }
pthread_mutex_unlock(socket->lock);
- }
+ }
skip_ust:
if (!session->kernel_session) {
goto end;
}
- if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
pthread_mutex_unlock(socket->lock);
chunk_exists_on_peer = true;
goto end;
- }
+ }
pthread_mutex_unlock(socket->lock);
}
skip_kernel:
session->chunk_being_archived,
&chunk_being_archived_id);
assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
- DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+ DBG("Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
chunk_being_archived_id,
session->name);
}
&chunk_being_archived_id);
assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
- DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
+ DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
session->name, chunk_being_archived_id);
/*
if (!session->quiet_rotation) {
location = session_get_trace_archive_location(session);
- /* Ownership of location is transferred. */
ret = notification_thread_command_session_rotation_completed(
notification_thread_handle,
session->name,
session->gid,
session->last_archived_chunk_id.value,
location);
+ lttng_trace_archive_location_put(location);
if (ret != LTTNG_OK) {
- ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ ERR("Failed to notify notification thread of completed rotation for session %s",
session->name);
}
}
ret = 0;
check_ongoing_rotation:
if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
- uint64_t chunk_being_archived_id;
-
chunk_status = lttng_trace_chunk_get_id(
session->chunk_being_archived,
&chunk_being_archived_id);
assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
- DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
+ DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
chunk_being_archived_id, session->name);
ret = timer_session_rotation_pending_check_start(session,
DEFAULT_ROTATE_PENDING_TIMER);
int ret;
struct lttng_rotate_session_return rotation_return;
- DBG("[rotation-thread] Launching scheduled time-based rotation on session \"%s\"",
+ DBG("Launching scheduled time-based rotation on session \"%s\"",
session->name);
- ret = cmd_rotate_session(session, &rotation_return, false);
+ ret = cmd_rotate_session(session, &rotation_return, false,
+ LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
if (ret == LTTNG_OK) {
- DBG("[rotation-thread] Scheduled time-based rotation successfully launched on session \"%s\"",
+ DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
session->name);
} else {
/* Don't consider errors as fatal. */
- DBG("[rotation-thread] Scheduled time-based rotation aborted for session %s: %s",
+ DBG("Scheduled time-based rotation aborted for session %s: %s",
session->name, lttng_strerror(ret));
}
return 0;
switch (job->type) {
case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
- ret = launch_session_rotation(session);
+ ret = launch_session_rotation(session);
break;
case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
ret = check_session_rotation_pending(session,
session_lock_list();
session = job->session;
if (!session) {
- DBG("[rotation-thread] Session \"%s\" not found",
- session->name);
/*
* This is a non-fatal error, and we cannot report it to
* the user (timer), so just print the error and
}
session_lock(session);
- ret = run_job(job, session, handle->notification_thread_handle);
+ ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
/* Release reference held by the job. */
session_put(session);
}
static
-int handle_condition(const struct lttng_condition *condition,
- const struct lttng_evaluation *evaluation,
+int handle_condition(const struct lttng_notification *notification,
struct notification_thread_handle *notification_thread_handle)
{
int ret = 0;
enum lttng_evaluation_status evaluation_status;
uint64_t consumed;
struct ltt_session *session;
+ const struct lttng_condition *condition =
+ lttng_notification_get_const_condition(notification);
+ const struct lttng_evaluation *evaluation =
+ lttng_notification_get_const_evaluation(notification);
condition_type = lttng_condition_get_type(condition);
if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
ret = -1;
- ERR("[rotation-thread] Condition type and session usage type are not the same");
+ ERR("Condition type and session usage type are not the same");
goto end;
}
condition_status = lttng_condition_session_consumed_size_get_session_name(
condition, &condition_session_name);
if (condition_status != LTTNG_CONDITION_STATUS_OK) {
- ERR("[rotation-thread] Session name could not be fetched");
+ ERR("Session name could not be fetched");
ret = -1;
goto end;
}
evaluation_status = lttng_evaluation_session_consumed_size_get_consumed_size(evaluation,
&consumed);
if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
- ERR("[rotation-thread] Failed to get evaluation");
+ ERR("Failed to get evaluation");
ret = -1;
goto end;
}
session_lock_list();
session = session_find_by_name(condition_session_name);
if (!session) {
- ret = -1;
- session_unlock_list();
- ERR("[rotation-thread] Session \"%s\" not found",
+ DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
+ lttng_condition_type_str(condition_type),
condition_session_name);
+ /*
+ * Not a fatal error: a session can be destroyed before we get
+ * the chance to handle the notification.
+ */
+ ret = 0;
+ session_unlock_list();
goto end;
}
session_lock(session);
+ if (!lttng_trigger_is_equal(session->rotate_trigger,
+ lttng_notification_get_const_trigger(notification))) {
+ /* Notification does not originate from our rotation trigger. */
+ ret = 0;
+ goto end_unlock;
+ }
+
ret = unsubscribe_session_consumed_size_rotation(session,
notification_thread_handle);
if (ret) {
goto end_unlock;
}
- ret = cmd_rotate_session(session, NULL, false);
- if (ret == -LTTNG_ERR_ROTATION_PENDING) {
+ ret = cmd_rotate_session(
+ session, NULL, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+ switch (ret) {
+ case LTTNG_OK:
+ break;
+ case -LTTNG_ERR_ROTATION_PENDING:
DBG("Rotate already pending, subscribe to the next threshold value");
- } else if (ret != LTTNG_OK) {
- ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
- lttng_strerror(ret));
+ break;
+ case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+ DBG("Rotation already happened since last stop, subscribe to the next threshold value");
+ break;
+ case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+ DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
+ break;
+ default:
+ ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
ret = -1;
goto end_unlock;
}
- ret = subscribe_session_consumed_size_rotation(session,
- consumed + session->rotate_size,
- notification_thread_handle);
+
+ ret = subscribe_session_consumed_size_rotation(
+ session, consumed + session->rotate_size, notification_thread_handle);
if (ret) {
- ERR("[rotation-thread] Failed to subscribe to session consumed size condition");
+ ERR("Failed to subscribe to session consumed size condition");
goto end_unlock;
}
ret = 0;
struct rotation_thread *state)
{
int ret;
- bool notification_pending;
+ bool notification_pending = true;
struct lttng_notification *notification = NULL;
enum lttng_notification_channel_status status;
- const struct lttng_evaluation *notification_evaluation;
- const struct lttng_condition *notification_condition;
- status = lttng_notification_channel_has_pending_notification(
+ /*
+ * A notification channel may have multiple notifications queued-up internally in
+ * its buffers. This is because a notification channel multiplexes command replies
+ * and notifications. The current protocol specifies that multiple notifications can be
+ * received before the reply to a command.
+ *
+ * In such cases, the notification channel client implementation internally queues them and
+ * provides them on the next calls to lttng_notification_channel_get_next_notification().
+ * This is correct with respect to the public API, which is intended to be used in "blocking
+ * mode".
+ *
+ * However, this internal user relies on poll/epoll to wake-up when data is available
+ * on the notification channel's socket. As such, it can't assume that a wake-up means only
+ * one notification is available for consumption since many of them may have been queued in
+ * the channel's internal buffers.
+ */
+ while (notification_pending) {
+ status = lttng_notification_channel_has_pending_notification(
rotate_notification_channel, ¬ification_pending);
- if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
- ERR("[rotation-thread ]Error occurred while checking for pending notification");
- ret = -1;
- goto end;
- }
+ if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+ ERR("Error occurred while checking for pending notification");
+ ret = -1;
+ goto end;
+ }
- if (!notification_pending) {
- ret = 0;
- goto end;
- }
+ if (!notification_pending) {
+ ret = 0;
+ goto end;
+ }
- /* Receive the next notification. */
- status = lttng_notification_channel_get_next_notification(
- rotate_notification_channel,
- ¬ification);
+ /* Receive the next notification. */
+ status = lttng_notification_channel_get_next_notification(
+ rotate_notification_channel, ¬ification);
+ switch (status) {
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+ break;
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+ WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+ ret = 0;
+ goto end;
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+ ERR("Notification channel was closed");
+ ret = -1;
+ goto end;
+ default:
+ /* Unhandled conditions / errors. */
+ ERR("Unknown notification channel status");
+ ret = -1;
+ goto end;
+ }
- switch (status) {
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
- break;
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
- /* Not an error, we will wait for the next one */
- ret = 0;
- goto end;;
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
- ERR("Notification channel was closed");
- ret = -1;
- goto end;
- default:
- /* Unhandled conditions / errors. */
- ERR("Unknown notification channel status");
- ret = -1;
- goto end;
+ ret = handle_condition(notification, handle->notification_thread_handle);
+ lttng_notification_destroy(notification);
+ if (ret) {
+ goto end;
+ }
}
-
- notification_condition = lttng_notification_get_condition(notification);
- notification_evaluation = lttng_notification_get_evaluation(notification);
-
- ret = handle_condition(notification_condition, notification_evaluation,
- handle->notification_thread_handle);
-
end:
- lttng_notification_destroy(notification);
return ret;
}
struct rotation_thread thread;
int queue_pipe_fd;
- DBG("[rotation-thread] Started rotation thread");
+ DBG("Started rotation thread");
rcu_register_thread();
rcu_thread_online();
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
+ health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
health_code_update();
if (!handle) {
- ERR("[rotation-thread] Invalid thread context provided");
+ ERR("Invalid thread context provided");
goto end;
}
int fd_count, i;
health_poll_entry();
- DBG("[rotation-thread] Entering poll wait");
+ DBG("Entering poll wait");
ret = lttng_poll_wait(&thread.events, -1);
- DBG("[rotation-thread] Poll wait returned (%i)", ret);
+ DBG("Poll wait returned (%i)", ret);
health_poll_exit();
if (ret < 0) {
/*
if (errno == EINTR) {
continue;
}
- ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret);
+ ERR("Error encountered during lttng_poll_wait (%i)", ret);
goto error;
}
int fd = LTTNG_POLL_GETFD(&thread.events, i);
uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
- DBG("[rotation-thread] Handling fd (%i) activity (%u)",
+ DBG("Handling fd (%i) activity (%u)",
fd, revents);
if (revents & LPOLLERR) {
- ERR("[rotation-thread] Polling returned an error on fd %i", fd);
+ ERR("Polling returned an error on fd %i", fd);
goto error;
}
- if (fd == rotate_notification_channel->socket) {
- ret = handle_notification_channel(fd, handle,
- &thread);
+ if (fd == rotate_notification_channel->socket ||
+ fd == rotate_notification_channel_subscription_change_eventfd) {
+ ret = handle_notification_channel(fd, handle, &thread);
if (ret) {
- ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
+ ERR("Error occurred while handling activity on notification channel socket");
goto error;
}
+
+ if (fd == rotate_notification_channel_subscription_change_eventfd) {
+ uint64_t eventfd_value;
+ const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+ if (read_ret != sizeof(eventfd_value)) {
+ PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+ goto error;
+ }
+ }
} else {
/* Job queue or quit pipe activity. */
ret = handle_job_queue(handle, &thread,
handle->rotation_timer_queue);
if (ret) {
- ERR("[rotation-thread] Failed to handle rotation timer pipe event");
+ ERR("Failed to handle rotation timer pipe event");
goto error;
}
ret = lttng_read(fd, &buf, 1);
if (ret != 1) {
- ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+ ERR("Failed to read from wakeup pipe (fd = %i)", fd);
goto error;
}
} else {
- DBG("[rotation-thread] Quit pipe activity");
+ DBG("Quit pipe activity");
goto exit;
}
}
}
exit:
error:
- DBG("[rotation-thread] Exit");
+ DBG("Thread exit");
fini_thread_state(&thread);
end:
- health_unregister(health_sessiond);
+ health_unregister(the_health_sessiond);
rcu_thread_offline();
rcu_unregister_thread();
return NULL;