#include "session.h"
#include "timer.h"
#include "notification-thread-commands.h"
+#include "utils.h"
+#include "thread.h"
#include <urcu.h>
#include <urcu/list.h>
struct rotation_thread_timer_queue *rotation_timer_queue;
/* Access to the notification thread cmd_queue */
struct notification_thread_handle *notification_thread_handle;
- sem_t *notification_thread_ready;
+ /* Thread-specific quit pipe. */
+ struct lttng_pipe *quit_pipe;
};
static
void rotation_thread_timer_queue_destroy(
struct rotation_thread_timer_queue *queue)
{
- struct rotation_thread_job *job, *tmp_job;
-
if (!queue) {
return;
}
lttng_pipe_destroy(queue->event_pipe);
pthread_mutex_lock(&queue->lock);
- /* Empty wait queue. */
- cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
- log_job_destruction(job);
- cds_list_del(&job->head);
- free(job);
- }
+ assert(cds_list_empty(&queue->list));
pthread_mutex_unlock(&queue->lock);
pthread_mutex_destroy(&queue->lock);
free(queue);
void rotation_thread_handle_destroy(
struct rotation_thread_handle *handle)
{
+ lttng_pipe_destroy(handle->quit_pipe);
free(handle);
}
struct rotation_thread_handle *rotation_thread_handle_create(
struct rotation_thread_timer_queue *rotation_timer_queue,
- struct notification_thread_handle *notification_thread_handle,
- sem_t *notification_thread_ready)
+ struct notification_thread_handle *notification_thread_handle)
{
struct rotation_thread_handle *handle;
handle->rotation_timer_queue = rotation_timer_queue;
handle->notification_thread_handle = notification_thread_handle;
- handle->notification_thread_ready = notification_thread_ready;
+ handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
+ if (!handle->quit_pipe) {
+ goto error;
+ }
end:
return handle;
+error:
+ rotation_thread_handle_destroy(handle);
+ return NULL;
}
/*
int ret;
/*
- * Create pollset with size 2:
- * - quit pipe,
+ * Create pollset with size 3:
+ * - rotation thread quit pipe,
* - rotation thread timer queue pipe,
+ * - notification channel sock,
*/
- ret = sessiond_set_thread_pollset(poll_set, 2);
- if (ret) {
+ ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
+ if (ret < 0) {
goto error;
}
+
+ ret = lttng_poll_add(poll_set,
+ lttng_pipe_get_readfd(handle->quit_pipe),
+ LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to add quit pipe read fd to poll set");
+ goto error;
+ }
+
ret = lttng_poll_add(poll_set,
lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
LPOLLIN | LPOLLERR);
if (ret < 0) {
- ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+ ERR("[rotation-thread] Failed to add rotate_pending fd to poll set");
goto error;
}
goto end;
}
- /*
- * We wait until the notification thread is ready to create the
- * notification channel and add it to the poll_set.
- */
- sem_wait(handle->notification_thread_ready);
rotate_notification_channel = lttng_notification_channel_create(
lttng_session_daemon_notification_endpoint);
if (!rotate_notification_channel) {
}
static
-int check_session_rotation_pending_local_on_consumer(
- const struct ltt_session *session,
- struct consumer_socket *socket, bool *rotation_completed)
-{
- int ret;
-
- pthread_mutex_lock(socket->lock);
- DBG("[rotation-thread] Checking for locally pending rotation on the %s consumer for session %s",
- lttng_consumer_type_str(socket->type),
- session->name);
- ret = consumer_check_rotation_pending_local(socket,
- session->id,
- session->current_archive_id - 1);
- pthread_mutex_unlock(socket->lock);
-
- if (ret == 0) {
- /* Rotation was completed on this consumer. */
- DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" was completed on the %s consumer",
- session->current_archive_id - 1,
- session->name,
- lttng_consumer_type_str(socket->type));
- *rotation_completed = true;
- } else if (ret == 1) {
- /* Rotation pending on this consumer. */
- DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
- session->current_archive_id - 1,
- session->name,
- lttng_consumer_type_str(socket->type));
- *rotation_completed = false;
- ret = 0;
- } else {
- /* Not a fatal error. */
- ERR("[rotation-thread] Encountered an error when checking if local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
- session->current_archive_id - 1,
- session->name,
- lttng_consumer_type_str(socket->type));
- *rotation_completed = false;
- }
- return ret;
-}
-
-static
-int check_session_rotation_pending_local(struct ltt_session *session)
+void check_session_rotation_pending_on_consumers(struct ltt_session *session,
+ bool *_rotation_completed)
{
int ret = 0;
struct consumer_socket *socket;
struct cds_lfht_iter iter;
- bool rotation_completed = true;
+ enum consumer_trace_chunk_exists_status exists_status;
+ uint64_t relayd_id;
+ bool chunk_exists_on_peer = false;
+ enum lttng_trace_chunk_status chunk_status;
+
+ assert(session->chunk_being_archived);
/*
* Check for a local pending rotation on all consumers (32-bit
* user space, 64-bit user space, and kernel).
*/
- DBG("[rotation-thread] Checking for pending local rotation on session \"%s\", trace archive %" PRIu64,
- session->name, session->current_archive_id - 1);
-
rcu_read_lock();
if (!session->ust_session) {
goto skip_ust;
}
cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
&iter, socket, node.node) {
- ret = check_session_rotation_pending_local_on_consumer(session,
- socket, &rotation_completed);
- if (ret || !rotation_completed) {
+ relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+ -1ULL :
+ session->ust_session->consumer->net_seq_index;
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_trace_chunk_exists(socket,
+ relayd_id,
+ session->id, session->chunk_being_archived,
+ &exists_status);
+ if (ret) {
+ pthread_mutex_unlock(socket->lock);
+ ERR("Error occured while checking rotation status on consumer daemon");
goto end;
}
- }
+
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ pthread_mutex_unlock(socket->lock);
+ chunk_exists_on_peer = true;
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ }
skip_ust:
if (!session->kernel_session) {
}
cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
&iter, socket, node.node) {
- ret = check_session_rotation_pending_local_on_consumer(session,
- socket, &rotation_completed);
- if (ret || !rotation_completed) {
+ pthread_mutex_lock(socket->lock);
+ relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+ -1ULL :
+ session->kernel_session->consumer->net_seq_index;
+
+ ret = consumer_trace_chunk_exists(socket,
+ relayd_id,
+ session->id, session->chunk_being_archived,
+ &exists_status);
+ if (ret) {
+ pthread_mutex_unlock(socket->lock);
+ ERR("Error occured while checking rotation status on consumer daemon");
goto end;
}
+
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ pthread_mutex_unlock(socket->lock);
+ chunk_exists_on_peer = true;
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
}
skip_kernel:
end:
rcu_read_unlock();
- if (rotation_completed) {
- DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
- session->current_archive_id - 1,
+ if (!chunk_exists_on_peer) {
+ uint64_t chunk_being_archived_id;
+
+ chunk_status = lttng_trace_chunk_get_id(
+ session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+ chunk_being_archived_id,
session->name);
- session->rotation_pending_local = false;
}
+ *_rotation_completed = !chunk_exists_on_peer;
if (ret) {
ret = session_reset_rotation_state(session,
LTTNG_ROTATION_STATE_ERROR);
session->name);
}
}
- return 0;
-}
-
-static
-int check_session_rotation_pending_relay(struct ltt_session *session)
-{
- int ret;
- struct consumer_socket *socket;
- struct cds_lfht_iter iter;
- bool rotation_completed = true;
- const struct consumer_output *output;
-
- /*
- * Check for a pending rotation on any consumer as we only use
- * it as a "tunnel" to the relayd.
- */
-
- rcu_read_lock();
- if (session->ust_session) {
- cds_lfht_first(session->ust_session->consumer->socks->ht,
- &iter);
- output = session->ust_session->consumer;
- } else {
- cds_lfht_first(session->kernel_session->consumer->socks->ht,
- &iter);
- output = session->kernel_session->consumer;
- }
- assert(cds_lfht_iter_get_node(&iter));
-
- socket = caa_container_of(cds_lfht_iter_get_node(&iter),
- typeof(*socket), node.node);
-
- pthread_mutex_lock(socket->lock);
- DBG("[rotation-thread] Checking for pending relay rotation on session \"%s\", trace archive %" PRIu64 " through the %s consumer",
- session->name, session->current_archive_id - 1,
- lttng_consumer_type_str(socket->type));
- ret = consumer_check_rotation_pending_relay(socket,
- output,
- session->id,
- session->current_archive_id - 1);
- pthread_mutex_unlock(socket->lock);
-
- if (ret == 0) {
- /* Rotation was completed on the relay. */
- DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" was completed",
- session->current_archive_id - 1,
- session->name);
- } else if (ret == 1) {
- /* Rotation pending on relay. */
- DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" is pending",
- session->current_archive_id - 1,
- session->name);
- rotation_completed = false;
- } else {
- /* Not a fatal error. */
- ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
- session->current_archive_id - 1,
- session->name);
- ret = session_reset_rotation_state(session,
- LTTNG_ROTATION_STATE_ERROR);
- if (ret) {
- ERR("Failed to reset rotation state of session \"%s\"",
- session->name);
- }
- rotation_completed = false;
- }
-
- rcu_read_unlock();
-
- if (rotation_completed) {
- DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
- session->current_archive_id - 1,
- session->name);
- session->rotation_pending_relay = false;
- }
- return 0;
}
/*
* Check if the last rotation was completed, called with session lock held.
+ * Should only return non-zero in the event of a fatal error. Doing so will
+ * shutdown the thread.
*/
static
int check_session_rotation_pending(struct ltt_session *session,
{
int ret;
struct lttng_trace_archive_location *location;
- time_t now;
-
- DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
- session->name, session->current_archive_id - 1);
+ enum lttng_trace_chunk_status chunk_status;
+ bool rotation_completed = false;
+ const char *archived_chunk_name;
+ uint64_t chunk_being_archived_id;
- if (session->rotation_pending_local) {
- /* Updates session->rotation_pending_local as needed. */
- ret = check_session_rotation_pending_local(session);
- if (ret) {
- goto end;
- }
+ chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
- /*
- * No need to check for a pending rotation on the relay
- * since the rotation is not even completed locally yet.
- */
- if (session->rotation_pending_local) {
- goto end;
- }
- }
-
- if (session->rotation_pending_relay) {
- /* Updates session->rotation_pending_relay as needed. */
- ret = check_session_rotation_pending_relay(session);
- if (ret) {
- goto end;
- }
+ DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
+ session->name, chunk_being_archived_id);
- if (session->rotation_pending_relay) {
- goto end;
- }
+ if (!session->chunk_being_archived) {
+ ret = 0;
+ goto end;
}
- DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " completed for "
- "session %s", session->current_archive_id - 1,
- session->name);
-
- /* Rename the completed trace archive's location. */
- now = time(NULL);
- if (now == (time_t) -1) {
- ret = session_reset_rotation_state(session,
- LTTNG_ROTATION_STATE_ERROR);
- if (ret) {
- ERR("Failed to reset rotation state of session \"%s\"",
- session->name);
- }
- ret = LTTNG_ERR_UNK;
+ /*
+ * The rotation-pending check timer of a session is launched in
+ * one-shot mode. If the rotation is incomplete, the rotation
+ * thread will re-enable the pending-check timer.
+ *
+ * The timer thread can't stop the timer itself since it is involved
+ * in the check for the timer's quiescence.
+ */
+ ret = timer_session_rotation_pending_check_stop(session);
+ if (ret) {
goto end;
}
- ret = rename_completed_chunk(session, now);
- if (ret < 0) {
- ERR("Failed to rename completed rotation chunk");
+ check_session_rotation_pending_on_consumers(session,
+ &rotation_completed);
+
+ if (!rotation_completed ||
+ session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
goto end;
}
- session->last_chunk_start_ts = session->current_chunk_start_ts;
/*
* Now we can clear the "ONGOING" state in the session. New
* rotations can start now.
*/
- session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
+ chunk_status = lttng_trace_chunk_get_name(session->chunk_being_archived,
+ &archived_chunk_name, NULL);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ free(session->last_archived_chunk_name);
+ session->last_archived_chunk_name = strdup(archived_chunk_name);
+ if (!session->last_archived_chunk_name) {
+ PERROR("Failed to duplicate archived chunk name");
+ }
+ session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
- /* Ownership of location is transferred. */
location = session_get_trace_archive_location(session);
+ /* Ownership of location is transferred. */
ret = notification_thread_command_session_rotation_completed(
notification_thread_handle,
session->name,
session->uid,
session->gid,
- session->current_archive_id,
+ session->last_archived_chunk_id.value,
location);
if (ret != LTTNG_OK) {
ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
session->name,
session->uid,
session->gid,
- session->current_archive_id);
+ session->most_recent_chunk_id.value);
if (ret != LTTNG_OK) {
ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
session->name);
}
- ret = rename_active_chunk(session);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to rename active rotation chunk");
- goto end;
- }
-
/* Ownership of location is transferred. */
location = session_get_trace_archive_location(session);
ret = notification_thread_command_session_rotation_completed(
session->name,
session->uid,
session->gid,
- session->current_archive_id,
+ session->most_recent_chunk_id.value,
location);
if (ret != LTTNG_OK) {
ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
ret = 0;
end:
if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+ uint64_t chunk_being_archived_id;
+
+ chunk_status = lttng_trace_chunk_get_id(
+ session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+
DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
- session->current_archive_id - 1, session->name);
+ chunk_being_archived_id, session->name);
ret = timer_session_rotation_pending_check_start(session,
DEFAULT_ROTATE_PENDING_TIMER);
if (ret) {
- ERR("Re-enabling rotate pending timer");
+ ERR("Failed to re-enable rotation pending timer");
ret = -1;
goto end;
}
struct rotation_thread_timer_queue *queue)
{
int ret = 0;
- int fd = lttng_pipe_get_readfd(queue->event_pipe);
- char buf;
-
- ret = lttng_read(fd, &buf, 1);
- if (ret != 1) {
- ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
- ret = -1;
- goto end;
- }
for (;;) {
struct ltt_session *session;
* possible for a job targeting that session to have
* already been queued before it was destroyed.
*/
- session_unlock_list();
free(job);
session_put(session);
+ session_unlock_list();
continue;
}
session_lock(session);
ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
+ /* Release reference held by the job. */
session_put(session);
session_unlock_list();
free(job);
goto end;
}
session_lock(session);
- session_unlock_list();
ret = unsubscribe_session_consumed_size_rotation(session,
notification_thread_handle);
end_unlock:
session_unlock(session);
session_put(session);
+ session_unlock_list();
end:
return ret;
}
int ret;
struct rotation_thread_handle *handle = data;
struct rotation_thread thread;
+ const int queue_pipe_fd = lttng_pipe_get_readfd(
+ handle->rotation_timer_queue->event_pipe);
DBG("[rotation-thread] Started rotation thread");
goto error;
}
- /* Ready to handle client connections. */
- sessiond_notify_ready();
-
while (true) {
int fd_count, i;
goto error;
}
- if (sessiond_check_thread_quit_pipe(fd, revents)) {
- DBG("[rotation-thread] Quit pipe activity");
- /* TODO flush the queue. */
- goto exit;
- } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
+ if (fd == rotate_notification_channel->socket) {
+ ret = handle_notification_channel(fd, handle,
+ &thread);
+ if (ret) {
+ ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
+ goto error;
+ }
+ } else {
+ /* Job queue or quit pipe activity. */
+
+ /*
+ * The job queue is serviced if there is
+ * activity on the quit pipe to ensure it is
+ * flushed and all references held in the queue
+ * are released.
+ */
ret = handle_job_queue(handle, &thread,
handle->rotation_timer_queue);
if (ret) {
ERR("[rotation-thread] Failed to handle rotation timer pipe event");
goto error;
}
- } else if (fd == rotate_notification_channel->socket) {
- ret = handle_notification_channel(fd, handle,
- &thread);
- if (ret) {
- ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
- goto error;
+
+ if (fd == queue_pipe_fd) {
+ char buf;
+
+ ret = lttng_read(fd, &buf, 1);
+ if (ret != 1) {
+ ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+ ret = -1;
+ goto error;
+ }
+ } else {
+ DBG("[rotation-thread] Quit pipe activity");
+ goto exit;
}
}
}
end:
return NULL;
}
+
+static
+bool shutdown_rotation_thread(void *thread_data)
+{
+ struct rotation_thread_handle *handle = thread_data;
+ const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
+
+ return notify_thread_pipe(write_fd) == 1;
+}
+
+bool launch_rotation_thread(struct rotation_thread_handle *handle)
+{
+ struct lttng_thread *thread;
+
+ thread = lttng_thread_create("Rotation",
+ thread_rotation,
+ shutdown_rotation_thread,
+ NULL,
+ handle);
+ if (!thread) {
+ goto error;
+ }
+ lttng_thread_put(thread);
+ return true;
+error:
+ return false;
+}