Fix: flush the rotation thread's job queue on exit
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
index 900892bdb12bb8ad174f1248a4f4e2728278b3c0..043993a60484130450dbe3dd75818b72ea9f4b66 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
 #include "rotate.h"
 #include "cmd.h"
 #include "session.h"
-#include "sessiond-timer.h"
+#include "timer.h"
 #include "notification-thread-commands.h"
 
 #include <urcu.h>
 #include <urcu/list.h>
-#include <urcu/rculfhash.h>
-
-/*
- * Store a struct rotation_channel_info for each channel that is currently
- * being rotated by the consumer.
- */
-struct cds_lfht *channel_pending_rotate_ht;
 
 struct lttng_notification_channel *rotate_notification_channel = NULL;
 
-struct rotation_thread_state {
+struct rotation_thread {
        struct lttng_poll_event events;
 };
 
+struct rotation_thread_job {
+       enum rotation_thread_job_type type;
+       struct ltt_session *session;
+       /* List member in struct rotation_thread_timer_queue. */
+       struct cds_list_head head;
+};
+
+/*
+ * The timer thread enqueues jobs and wakes up the rotation thread.
+ * When the rotation thread wakes up, it empties the queue.
+ */
+struct rotation_thread_timer_queue {
+       struct lttng_pipe *event_pipe;
+       struct cds_list_head list;
+       pthread_mutex_t lock;
+};
+
+struct rotation_thread_handle {
+       struct rotation_thread_timer_queue *rotation_timer_queue;
+       /* Access to the notification thread cmd_queue */
+       struct notification_thread_handle *notification_thread_handle;
+};
+
 static
-void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
+const char *get_job_type_str(enum rotation_thread_job_type job_type)
 {
-       assert(channel_info);
-       free(channel_info);
+       switch (job_type) {
+       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+               return "CHECK_PENDING_ROTATION";
+       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+               return "SCHEDULED_ROTATION";
+       default:
+               abort();
+       }
 }
 
-static
-int match_channel_info(struct cds_lfht_node *node, const void *key)
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
 {
-       struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
-       struct rotation_channel_info *channel_info;
+       struct rotation_thread_timer_queue *queue = NULL;
 
-       channel_info = caa_container_of(node, struct rotation_channel_info,
-                       rotate_channels_ht_node);
+       queue = zmalloc(sizeof(*queue));
+       if (!queue) {
+               PERROR("Failed to allocate timer rotate queue");
+               goto end;
+       }
 
-       return !!((channel_key->key == channel_info->channel_key.key) &&
-                       (channel_key->domain == channel_info->channel_key.domain));
+       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
+       CDS_INIT_LIST_HEAD(&queue->list);
+       pthread_mutex_init(&queue->lock, NULL);
+end:
+       return queue;
 }
 
-static
-struct rotation_channel_info *lookup_channel_pending(uint64_t key,
-               enum lttng_domain_type domain)
+void log_job_destruction(const struct rotation_thread_job *job)
 {
-       struct cds_lfht_iter iter;
-       struct cds_lfht_node *node;
-       struct rotation_channel_info *channel_info = NULL;
-       struct rotation_channel_key channel_key = { .key = key,
-                       .domain = domain };
-
-       cds_lfht_lookup(channel_pending_rotate_ht,
-                       hash_channel_key(&channel_key),
-                       match_channel_info,
-                       &channel_key, &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (!node) {
-               goto end;
+       enum lttng_error_level log_level;
+       const char *job_type_str = get_job_type_str(job->type);
+
+       switch (job->type) {
+       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+               /*
+                * Not a problem, the scheduled rotation is racing with the teardown
+                * of the daemon. In this case, the rotation will not happen, which
+                * is not a problem (or at least, not important enough to delay
+                * the shutdown of the session daemon).
+                */
+               log_level = PRINT_DBG;
+               break;
+       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+               /* This is not expected to happen; warn the user. */
+               log_level = PRINT_WARN;
+               break;
+       default:
+               abort();
        }
 
-       channel_info = caa_container_of(node, struct rotation_channel_info,
-                       rotate_channels_ht_node);
-       cds_lfht_del(channel_pending_rotate_ht, node);
-end:
-       return channel_info;
+       LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session \"%s\" on destruction",
+                       job_type_str, job->session->name);
 }
 
-/*
- * Destroy the thread data previously created by the init function.
- */
-void rotation_thread_handle_destroy(
-               struct rotation_thread_handle *handle)
+void rotation_thread_timer_queue_destroy(
+               struct rotation_thread_timer_queue *queue)
 {
-       int ret;
+       struct rotation_thread_job *job, *tmp_job;
 
-       if (!handle) {
-               goto end;
+       if (!queue) {
+               return;
        }
 
-       if (handle->ust32_consumer >= 0) {
-               ret = close(handle->ust32_consumer);
-               if (ret) {
-                       PERROR("close 32-bit consumer channel rotation pipe");
-               }
-       }
-       if (handle->ust64_consumer >= 0) {
-               ret = close(handle->ust64_consumer);
-               if (ret) {
-                       PERROR("close 64-bit consumer channel rotation pipe");
-               }
-       }
-       if (handle->kernel_consumer >= 0) {
-               ret = close(handle->kernel_consumer);
-               if (ret) {
-                       PERROR("close kernel consumer channel rotation pipe");
-               }
+       lttng_pipe_destroy(queue->event_pipe);
+
+       pthread_mutex_lock(&queue->lock);
+       /* Empty wait queue. */
+       cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
+               log_job_destruction(job);
+               cds_list_del(&job->head);
+               free(job);
        }
+       pthread_mutex_unlock(&queue->lock);
+       pthread_mutex_destroy(&queue->lock);
+       free(queue);
+}
 
-end:
+/*
+ * Destroy the thread data previously created by the init function.
+ */
+void rotation_thread_handle_destroy(
+               struct rotation_thread_handle *handle)
+{
        free(handle);
 }
 
 struct rotation_thread_handle *rotation_thread_handle_create(
-               struct lttng_pipe *ust32_channel_rotate_pipe,
-               struct lttng_pipe *ust64_channel_rotate_pipe,
-               struct lttng_pipe *kernel_channel_rotate_pipe,
-               int thread_quit_pipe,
                struct rotation_thread_timer_queue *rotation_timer_queue,
-               struct notification_thread_handle *notification_thread_handle,
-               sem_t *notification_thread_ready)
+               struct notification_thread_handle *notification_thread_handle)
 {
        struct rotation_thread_handle *handle;
 
@@ -157,46 +177,91 @@ struct rotation_thread_handle *rotation_thread_handle_create(
                goto end;
        }
 
-       if (ust32_channel_rotate_pipe) {
-               handle->ust32_consumer =
-                               lttng_pipe_release_readfd(
-                                       ust32_channel_rotate_pipe);
-               if (handle->ust32_consumer < 0) {
-                       goto error;
+       handle->rotation_timer_queue = rotation_timer_queue;
+       handle->notification_thread_handle = notification_thread_handle;
+
+end:
+       return handle;
+}
+
+/*
+ * Called with the rotation_thread_timer_queue lock held.
+ * Return true if the same timer job already exists in the queue, false if not.
+ */
+static
+bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
+               enum rotation_thread_job_type job_type,
+               struct ltt_session *session)
+{
+       bool exists = false;
+       struct rotation_thread_job *job;
+
+       cds_list_for_each_entry(job, &queue->list, head) {
+               if (job->session == session && job->type == job_type) {
+                       exists = true;
+                       goto end;
                }
-       } else {
-               handle->ust32_consumer = -1;
        }
-       if (ust64_channel_rotate_pipe) {
-               handle->ust64_consumer =
-                               lttng_pipe_release_readfd(
-                                       ust64_channel_rotate_pipe);
-               if (handle->ust64_consumer < 0) {
-                       goto error;
-               }
-       } else {
-               handle->ust64_consumer = -1;
+end:
+       return exists;
+}
+
+void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
+               enum rotation_thread_job_type job_type,
+               struct ltt_session *session)
+{
+       int ret;
+       const char * const dummy = "!";
+       struct rotation_thread_job *job = NULL;
+       const char *job_type_str = get_job_type_str(job_type);
+
+       pthread_mutex_lock(&queue->lock);
+       if (timer_job_exists(queue, job_type, session)) {
+               /*
+                * This timer job is already pending, we don't need to add
+                * it.
+                */
+               goto end;
        }
-       if (kernel_channel_rotate_pipe) {
-               handle->kernel_consumer =
-                               lttng_pipe_release_readfd(
-                                       kernel_channel_rotate_pipe);
-               if (handle->kernel_consumer < 0) {
-                       goto error;
+
+       job = zmalloc(sizeof(struct rotation_thread_job));
+       if (!job) {
+               PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
+                               job_type_str, session->name);
+               goto end;
+       }
+       /* No reason for this to fail as the caller must hold a reference. */
+       (void) session_get(session);
+
+       job->session = session;
+       job->type = job_type;
+       cds_list_add_tail(&job->head, &queue->list);
+
+       ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
+                       1);
+       if (ret < 0) {
+               /*
+                * We do not want to block in the timer handler, the job has
+                * been enqueued in the list, the wakeup pipe is probably full,
+                * the job will be processed when the rotation_thread catches
+                * up.
+                */
+               if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                       /*
+                        * Not an error, but would be surprising and indicate
+                        * that the rotation thread can't keep up with the
+                        * current load.
+                        */
+                       DBG("Wake-up pipe of rotation thread job queue is full");
+                       goto end;
                }
-       } else {
-               handle->kernel_consumer = -1;
+               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
+                               job_type_str, session->name);
+               goto end;
        }
-       handle->thread_quit_pipe = thread_quit_pipe;
-       handle->rotation_timer_queue = rotation_timer_queue;
-       handle->notification_thread_handle = notification_thread_handle;
-       handle->notification_thread_ready = notification_thread_ready;
 
 end:
-       return handle;
-error:
-       rotation_thread_handle_destroy(handle);
-       return NULL;
+       pthread_mutex_unlock(&queue->lock);
 }
 
 static
@@ -206,22 +271,12 @@ int init_poll_set(struct lttng_poll_event *poll_set,
        int ret;
 
        /*
-        * Create pollset with size 5:
-        *      - sessiond quit pipe
-        *      - sessiond timer pipe,
-        *      - consumerd (32-bit user space) channel rotate pipe,
-        *      - consumerd (64-bit user space) channel rotate pipe,
-        *      - consumerd (kernel) channel rotate pipe,
+        * Create pollset with size 2:
+        *      - quit pipe,
+        *      - rotation thread timer queue pipe,
         */
-       ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
-       if (ret < 0) {
-               goto end;
-       }
-
-       ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
-                       LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
+       ret = sessiond_set_thread_pollset(poll_set, 2);
+       if (ret) {
                goto error;
        }
        ret = lttng_poll_add(poll_set,
@@ -231,28 +286,7 @@ int init_poll_set(struct lttng_poll_event *poll_set,
                ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
                goto error;
        }
-       ret = lttng_poll_add(poll_set, handle->ust32_consumer,
-                       LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
-               goto error;
-       }
-       ret = lttng_poll_add(poll_set, handle->ust64_consumer,
-                       LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
-               goto error;
-       }
-       if (handle->kernel_consumer >= 0) {
-               ret = lttng_poll_add(poll_set, handle->kernel_consumer,
-                               LPOLLIN | LPOLLERR);
-               if (ret < 0) {
-                       ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
-                       goto error;
-               }
-       }
 
-end:
        return ret;
 error:
        lttng_poll_clean(poll_set);
@@ -260,13 +294,9 @@ error:
 }
 
 static
-void fini_thread_state(struct rotation_thread_state *state)
+void fini_thread_state(struct rotation_thread *state)
 {
-       int ret;
-
        lttng_poll_clean(&state->events);
-       ret = cds_lfht_destroy(channel_pending_rotate_ht, NULL);
-       assert(!ret);
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
@@ -274,7 +304,7 @@ void fini_thread_state(struct rotation_thread_state *state)
 
 static
 int init_thread_state(struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state)
+               struct rotation_thread *state)
 {
        int ret;
 
@@ -287,19 +317,6 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
-       channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
-                       1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
-       if (!channel_pending_rotate_ht) {
-               ERR("[rotation-thread] Failed to create channel pending rotation hash table");
-               ret = -1;
-               goto end;
-       }
-
-       /*
-        * We wait until the notification thread is ready to create the
-        * notification channel and add it to the poll_set.
-        */
-       sem_wait(handle->notification_thread_ready);
        rotate_notification_channel = lttng_notification_channel_create(
                        lttng_session_daemon_notification_endpoint);
        if (!rotate_notification_channel) {
@@ -319,158 +336,305 @@ end:
 }
 
 static
-int handle_channel_rotation_pipe(int fd, uint32_t revents,
-               struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state)
+int check_session_rotation_pending_local_on_consumer(
+               const struct ltt_session *session,
+               struct consumer_socket *socket, bool *rotation_completed)
 {
-       int ret = 0;
-       enum lttng_domain_type domain;
-       struct rotation_channel_info *channel_info;
-       struct ltt_session *session = NULL;
-       uint64_t key;
-
-       if (fd == handle->ust32_consumer ||
-                       fd == handle->ust64_consumer) {
-               domain = LTTNG_DOMAIN_UST;
-       } else if (fd == handle->kernel_consumer) {
-               domain = LTTNG_DOMAIN_KERNEL;
+       int ret;
+
+       pthread_mutex_lock(socket->lock);
+       DBG("[rotation-thread] Checking for locally pending rotation on the %s consumer for session %s",
+                       lttng_consumer_type_str(socket->type),
+                       session->name);
+       ret = consumer_check_rotation_pending_local(socket,
+                       session->id,
+                       session->current_archive_id - 1);
+       pthread_mutex_unlock(socket->lock);
+
+       if (ret == 0) {
+               /* Rotation was completed on this consumer. */
+               DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" was completed on the %s consumer",
+                               session->current_archive_id - 1,
+                               session->name,
+                               lttng_consumer_type_str(socket->type));
+               *rotation_completed = true;
+       } else if (ret == 1) {
+               /* Rotation pending on this consumer. */
+               DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
+                               session->current_archive_id - 1,
+                               session->name,
+                               lttng_consumer_type_str(socket->type));
+               *rotation_completed = false;
+               ret = 0;
        } else {
-               ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
-                               fd);
-               abort();
+               /* Not a fatal error. */
+               ERR("[rotation-thread] Encountered an error when checking if local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
+                               session->current_archive_id - 1,
+                               session->name,
+                               lttng_consumer_type_str(socket->type));
+               *rotation_completed = false;
        }
+       return ret;
+}
 
-       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-               ret = lttng_poll_del(&state->events, fd);
-               if (ret) {
-                       ERR("[rotation-thread] Failed to remove consumer "
-                                       "rotation pipe from poll set");
+static
+int check_session_rotation_pending_local(struct ltt_session *session)
+{
+       int ret = 0;
+       struct consumer_socket *socket;
+       struct cds_lfht_iter iter;
+       bool rotation_completed = true;
+
+       /*
+        * Check for a local pending rotation on all consumers (32-bit
+        * user space, 64-bit user space, and kernel).
+        */
+       DBG("[rotation-thread] Checking for pending local rotation on session \"%s\", trace archive %" PRIu64,
+                       session->name, session->current_archive_id - 1);
+
+       rcu_read_lock();
+       if (!session->ust_session) {
+               goto skip_ust;
+       }
+       cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
+                       &iter, socket, node.node) {
+               ret = check_session_rotation_pending_local_on_consumer(session,
+                               socket, &rotation_completed);
+               if (ret || !rotation_completed) {
+                       goto end;
                }
-               goto end;
        }
 
-       do {
-               ret = read(fd, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret != sizeof(key)) {
-               ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
-                               fd);
-               ret = -1;
-               goto end;
+skip_ust:
+       if (!session->kernel_session) {
+               goto skip_kernel;
        }
+       cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
+                               &iter, socket, node.node) {
+               ret = check_session_rotation_pending_local_on_consumer(session,
+                               socket, &rotation_completed);
+               if (ret || !rotation_completed) {
+                       goto end;
+               }
+       }
+skip_kernel:
+end:
+       rcu_read_unlock();
 
-       DBG("[rotation-thread] Received notification for chan %" PRIu64
-                       ", domain %d", key, domain);
-
-       channel_info = lookup_channel_pending(key, domain);
-       if (!channel_info) {
-               ERR("[rotation-thread] Failed to find channel_info (key = %"
-                               PRIu64 ")", key);
-               ret = -1;
-               goto end;
+       if (rotation_completed) {
+               DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+                               session->current_archive_id - 1,
+                               session->name);
+               session->rotation_pending_local = false;
        }
-       rcu_read_lock();
-       session_lock_list();
-       session = session_find_by_id(channel_info->session_id);
-       if (!session) {
-               /*
-                * The session may have been destroyed before we had a chance to
-                * perform this action, return gracefully.
-                */
-               DBG("[rotation-thread] Session %" PRIu64 " not found",
-                               channel_info->session_id);
-               ret = 0;
-               goto end_unlock_session_list;
+       if (ret) {
+               ret = session_reset_rotation_state(session,
+                               LTTNG_ROTATION_STATE_ERROR);
+               if (ret) {
+                       ERR("Failed to reset rotation state of session \"%s\"",
+                                       session->name);
+               }
        }
+       return 0;
+}
 
-       session_lock(session);
-       if (--session->nr_chan_rotate_pending == 0) {
-               time_t now = time(NULL);
+static
+int check_session_rotation_pending_relay(struct ltt_session *session)
+{
+       int ret;
+       struct consumer_socket *socket;
+       struct cds_lfht_iter iter;
+       bool rotation_completed = true;
+       const struct consumer_output *output;
 
-               if (now == (time_t) -1) {
-                       session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
-                       ret = LTTNG_ERR_UNK;
-                       goto end_unlock_session;
-               }
+       /*
+        * Check for a pending rotation on any consumer as we only use
+        * it as a "tunnel" to the relayd.
+        */
 
-               ret = rename_complete_chunk(session, now);
-               if (ret < 0) {
-                       ERR("Failed to rename completed rotation chunk");
-                       goto end_unlock_session;
-               }
-               session->rotate_pending = false;
-               session->last_chunk_start_ts = session->current_chunk_start_ts;
-               if (session->rotate_pending_relay) {
-                       ret = sessiond_timer_rotate_pending_start(
-                                       session,
-                                       DEFAULT_ROTATE_PENDING_RELAY_TIMER);
-                       if (ret) {
-                               ERR("Failed to enable rotate pending timer");
-                               ret = -1;
-                               goto end_unlock_session;
-                       }
-               } else {
-                       struct lttng_trace_archive_location *location;
-
-                       session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-                       /* Ownership of location is transferred. */
-                       location = session_get_trace_archive_location(session);
-                       ret = notification_thread_command_session_rotation_completed(
-                                       notification_thread_handle,
-                                       session->name,
-                                       session->uid,
-                                       session->gid,
-                                       session->current_archive_id,
-                                       location);
-                       if (ret != LTTNG_OK) {
-                               ERR("Failed to notify notification thread that rotation is complete for session %s",
-                                               session->name);
-                       }
+       rcu_read_lock();
+       if (session->ust_session) {
+               cds_lfht_first(session->ust_session->consumer->socks->ht,
+                               &iter);
+               output = session->ust_session->consumer;
+       } else {
+               cds_lfht_first(session->kernel_session->consumer->socks->ht,
+                               &iter);
+               output = session->kernel_session->consumer;
+       }
+       assert(cds_lfht_iter_get_node(&iter));
 
+       socket = caa_container_of(cds_lfht_iter_get_node(&iter),
+                       typeof(*socket), node.node);
+
+       pthread_mutex_lock(socket->lock);
+       DBG("[rotation-thread] Checking for pending relay rotation on session \"%s\", trace archive %" PRIu64 " through the %s consumer",
+                       session->name, session->current_archive_id - 1,
+                       lttng_consumer_type_str(socket->type));
+       ret = consumer_check_rotation_pending_relay(socket,
+                       output,
+                       session->id,
+                       session->current_archive_id - 1);
+       pthread_mutex_unlock(socket->lock);
+
+       if (ret == 0) {
+               /* Rotation was completed on the relay. */
+               DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" was completed",
+                               session->current_archive_id - 1,
+                               session->name);
+       } else if (ret == 1) {
+               /* Rotation pending on relay. */
+               DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" is pending",
+                               session->current_archive_id - 1,
+                               session->name);
+               rotation_completed = false;
+       } else {
+               /* Not a fatal error. */
+               ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
+                               session->current_archive_id - 1,
+                               session->name);
+               ret = session_reset_rotation_state(session,
+                               LTTNG_ROTATION_STATE_ERROR);
+               if (ret) {
+                       ERR("Failed to reset rotation state of session \"%s\"",
+                                       session->name);
                }
-               DBG("Rotation completed for session %s", session->name);
+               rotation_completed = false;
        }
 
-       ret = 0;
-
-end_unlock_session:
-       channel_rotation_info_destroy(channel_info);
-       session_unlock(session);
-end_unlock_session_list:
-       session_unlock_list();
        rcu_read_unlock();
-end:
-       return ret;
+
+       if (rotation_completed) {
+               DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
+                               session->current_archive_id - 1,
+                               session->name);
+               session->rotation_pending_relay = false;
+       }
+       return 0;
 }
 
 /*
- * Process the rotate_pending check, called with session lock held.
+ * Check if the last rotation was completed, called with session lock held.
  */
 static
-int rotate_pending_relay_timer(struct ltt_session *session)
+int check_session_rotation_pending(struct ltt_session *session,
+               struct notification_thread_handle *notification_thread_handle)
 {
        int ret;
+       struct lttng_trace_archive_location *location;
+       time_t now;
+
+       DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
+                       session->name, session->current_archive_id - 1);
 
-       DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
-                       session->id);
-       ret = relay_rotate_pending(session, session->current_archive_id - 1);
+       /*
+        * The rotation-pending check timer of a session is launched in
+        * one-shot mode. If the rotation is incomplete, the rotation
+        * thread will re-enable the pending-check timer.
+        *
+        * The timer thread can't stop the timer itself since it is involved
+        * in the check for the timer's quiescence.
+        */
+       ret = timer_session_rotation_pending_check_stop(session);
+       if (ret) {
+               goto end;
+       }
+
+       if (session->rotation_pending_local) {
+               /* Updates session->rotation_pending_local as needed. */
+               ret = check_session_rotation_pending_local(session);
+               if (ret) {
+                       goto end;
+               }
+
+               /*
+                * No need to check for a pending rotation on the relay
+                * since the rotation is not even completed locally yet.
+                */
+               if (session->rotation_pending_local) {
+                       goto end;
+               }
+       }
+
+       if (session->rotation_pending_relay) {
+               /* Updates session->rotation_pending_relay as needed. */
+               ret = check_session_rotation_pending_relay(session);
+               if (ret) {
+                       goto end;
+               }
+
+               if (session->rotation_pending_relay) {
+                       goto end;
+               }
+       }
+
+       DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " completed for "
+                       "session %s", session->current_archive_id - 1,
+                       session->name);
+
+       /* Rename the completed trace archive's location. */
+       now = time(NULL);
+       if (now == (time_t) -1) {
+               ret = session_reset_rotation_state(session,
+                               LTTNG_ROTATION_STATE_ERROR);
+               if (ret) {
+                       ERR("Failed to reset rotation state of session \"%s\"",
+                                       session->name);
+               }
+               ret = LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       ret = rename_completed_chunk(session, now);
        if (ret < 0) {
-               ERR("[rotation-thread] Check relay rotate pending");
+               ERR("Failed to rename completed rotation chunk");
                goto end;
        }
-       if (ret == 0) {
-               struct lttng_trace_archive_location *location;
+       session->last_chunk_start_ts = session->current_chunk_start_ts;
 
-               DBG("[rotation-thread] Rotation completed on the relay for "
-                               "session %" PRIu64, session->id);
+       /*
+        * Now we can clear the "ONGOING" state in the session. New
+        * rotations can start now.
+        */
+       session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
+
+       /* Ownership of location is transferred. */
+       location = session_get_trace_archive_location(session);
+       ret = notification_thread_command_session_rotation_completed(
+                       notification_thread_handle,
+                       session->name,
+                       session->uid,
+                       session->gid,
+                       session->current_archive_id,
+                       location);
+       if (ret != LTTNG_OK) {
+               ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+                               session->name);
+       }
+
+       if (!session->active) {
                /*
-                * Now we can clear the pending flag in the session. New
-                * rotations can start now.
+                * A stop command was issued during the rotation, it is
+                * up to the rotation completion check to perform the
+                * renaming of the last chunk that was produced.
                 */
-               session->rotate_pending_relay = false;
-               session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
+               ret = notification_thread_command_session_rotation_ongoing(
+                               notification_thread_handle,
+                               session->name,
+                               session->uid,
+                               session->gid,
+                               session->current_archive_id);
+               if (ret != LTTNG_OK) {
+                       ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+                                       session->name);
+               }
+
+               ret = rename_active_chunk(session);
+               if (ret < 0) {
+                       ERR("[rotation-thread] Failed to rename active rotation chunk");
+                       goto end;
+               }
 
-               session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
                /* Ownership of location is transferred. */
                location = session_get_trace_archive_location(session);
                ret = notification_thread_command_session_rotation_completed(
@@ -481,14 +645,18 @@ int rotate_pending_relay_timer(struct ltt_session *session)
                                session->current_archive_id,
                                location);
                if (ret != LTTNG_OK) {
-                       ERR("Failed to notify notification thread that rotation is complete for session %s",
+                       ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
                                        session->name);
                }
-       } else if (ret == 1) {
-               DBG("[rotation-thread] Rotation still pending on the relay for "
-                               "session %" PRIu64, session->id);
-               ret = sessiond_timer_rotate_pending_start(session,
-                               DEFAULT_ROTATE_PENDING_RELAY_TIMER);
+       }
+
+       ret = 0;
+end:
+       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+               DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
+                               session->current_archive_id - 1, session->name);
+               ret = timer_session_rotation_pending_check_start(session,
+                               DEFAULT_ROTATE_PENDING_TIMER);
                if (ret) {
                        ERR("Re-enabling rotate pending timer");
                        ret = -1;
@@ -496,133 +664,102 @@ int rotate_pending_relay_timer(struct ltt_session *session)
                }
        }
 
-       ret = 0;
-
-end:
        return ret;
 }
 
-/*
- * Process the rotate_timer, called with session lock held.
- */
+/* Call with the session and session_list locks held. */
 static
-int rotate_timer(struct ltt_session *session)
+int launch_session_rotation(struct ltt_session *session)
 {
        int ret;
+       struct lttng_rotate_session_return rotation_return;
 
-       /*
-        * Complete _at most_ one scheduled rotation on a stopped session.
-        */
-       if (!session->active && session->rotate_timer_enabled &&
-                       session->rotated_after_last_stop) {
-               ret = 0;
-               goto end;
-       }
+       DBG("[rotation-thread] Launching scheduled time-based rotation on session \"%s\"",
+                       session->name);
 
-       /* Ignore this timer if a rotation is already in progress. */
-       if (session->rotate_pending || session->rotate_pending_relay) {
-               ret = 0;
-               goto end;
+       ret = cmd_rotate_session(session, &rotation_return);
+       if (ret == LTTNG_OK) {
+               DBG("[rotation-thread] Scheduled time-based rotation successfully launched on session \"%s\"",
+                               session->name);
+       } else {
+               /* Don't consider errors as fatal. */
+               DBG("[rotation-thread] Scheduled time-based rotation aborted for session %s: %s",
+                               session->name, lttng_strerror(ret));
        }
+       return 0;
+}
 
-       DBG("[rotation-thread] Rotate timer on session %s", session->name);
+static
+int run_job(struct rotation_thread_job *job, struct ltt_session *session,
+               struct notification_thread_handle *notification_thread_handle)
+{
+       int ret;
 
-       ret = cmd_rotate_session(session, NULL);
-       if (ret == -LTTNG_ERR_ROTATION_PENDING) {
-               DBG("Scheduled rotation aborted since a rotation is already in progress");
-               ret = 0;
-               goto end;
-       } else if (ret != LTTNG_OK) {
-               ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret);
-               ret = -1;
-               goto end;
+       switch (job->type) {
+       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+               ret = launch_session_rotation(session);
+               break;
+       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+               ret = check_session_rotation_pending(session,
+                               notification_thread_handle);
+               break;
+       default:
+               abort();
        }
-
-       ret = 0;
-
-end:
        return ret;
 }
 
 static
-int handle_rotate_timer_pipe(uint32_t revents,
-               struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state,
+int handle_job_queue(struct rotation_thread_handle *handle,
+               struct rotation_thread *state,
                struct rotation_thread_timer_queue *queue)
 {
        int ret = 0;
-       int fd = lttng_pipe_get_readfd(queue->event_pipe);
-       struct ltt_session *session;
-       char buf[1];
-
-       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-               ret = lttng_poll_del(&state->events, fd);
-               if (ret) {
-                       ERR("[rotation-thread] Failed to remove consumer "
-                                       "rotate pending pipe from poll set");
-               }
-               goto end;
-       }
-
-       ret = lttng_read(fd, buf, 1);
-       if (ret != 1) {
-               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
-               ret = -1;
-               goto end;
-       }
 
        for (;;) {
-               struct sessiond_rotation_timer *timer_data;
+               struct ltt_session *session;
+               struct rotation_thread_job *job;
 
-               /*
-                * Take the queue lock only to pop elements from the list.
-                */
+               /* Take the queue lock only to pop an element from the list. */
                pthread_mutex_lock(&queue->lock);
                if (cds_list_empty(&queue->list)) {
                        pthread_mutex_unlock(&queue->lock);
                        break;
                }
-               timer_data = cds_list_first_entry(&queue->list,
-                               struct sessiond_rotation_timer, head);
-               cds_list_del(&timer_data->head);
+               job = cds_list_first_entry(&queue->list,
+                               typeof(*job), head);
+               cds_list_del(&job->head);
                pthread_mutex_unlock(&queue->lock);
 
-               /*
-                * session lock to lookup the session ID.
-                */
                session_lock_list();
-               session = session_find_by_id(timer_data->session_id);
+               session = job->session;
                if (!session) {
-                       DBG("[rotation-thread] Session %" PRIu64 " not found",
-                                       timer_data->session_id);
+                       DBG("[rotation-thread] Session \"%s\" not found",
+                                       session->name);
                        /*
-                        * This is a non-fatal error, and we cannot report it to the
-                        * user (timer), so just print the error and continue the
-                        * processing.
+                        * This is a non-fatal error, and we cannot report it to
+                        * the user (timer), so just print the error and
+                        * continue the processing.
+                        *
+                        * While the timer thread will purge pending signals for
+                        * a session on the session's destruction, it is
+                        * possible for a job targeting that session to have
+                        * already been queued before it was destroyed.
                         */
                        session_unlock_list();
-                       free(timer_data);
+                       free(job);
+                       session_put(session);
                        continue;
                }
 
-               /*
-                * Take the session lock and release the session_list lock.
-                */
                session_lock(session);
-               session_unlock_list();
-
-               if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
-                       ret = rotate_pending_relay_timer(session);
-               } else if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
-                       ret = rotate_timer(session);
-               } else {
-                       ERR("Unknown signal in rotate timer %d", timer_data->signal);
-                       ret = -1;
-               }
+               ret = run_job(job, session, handle->notification_thread_handle);
                session_unlock(session);
-               free(timer_data);
+               /* Release reference held by the job. */
+               session_put(session);
+               session_unlock_list();
+               free(job);
                if (ret) {
-                       ERR("Error processing timer");
                        goto end;
                }
        }
@@ -633,8 +770,8 @@ end:
        return ret;
 }
 
-int handle_condition(
-               const struct lttng_condition *condition,
+static
+int handle_condition(const struct lttng_condition *condition,
                const struct lttng_evaluation *evaluation,
                struct notification_thread_handle *notification_thread_handle)
 {
@@ -685,13 +822,12 @@ int handle_condition(
        ret = unsubscribe_session_consumed_size_rotation(session,
                        notification_thread_handle);
        if (ret) {
-               goto end;
+               goto end_unlock;
        }
 
        ret = cmd_rotate_session(session, NULL);
        if (ret == -LTTNG_ERR_ROTATION_PENDING) {
                DBG("Rotate already pending, subscribe to the next threshold value");
-               ret = 0;
        } else if (ret != LTTNG_OK) {
                ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
                                lttng_strerror(ret));
@@ -709,14 +845,15 @@ int handle_condition(
 
 end_unlock:
        session_unlock(session);
+       session_put(session);
 end:
        return ret;
 }
 
 static
-int handle_notification_channel(int fd, uint32_t revents,
+int handle_notification_channel(int fd,
                struct rotation_thread_handle *handle,
-               struct rotation_thread_state *state)
+               struct rotation_thread *state)
 {
        int ret;
        bool notification_pending;
@@ -728,7 +865,7 @@ int handle_notification_channel(int fd, uint32_t revents,
        status = lttng_notification_channel_has_pending_notification(
                        rotate_notification_channel, &notification_pending);
        if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("[rotation-thread ]Error occured while checking for pending notification");
+               ERR("[rotation-thread ]Error occurred while checking for pending notification");
                ret = -1;
                goto end;
        }
@@ -776,7 +913,7 @@ void *thread_rotation(void *data)
 {
        int ret;
        struct rotation_thread_handle *handle = data;
-       struct rotation_thread_state state;
+       struct rotation_thread thread;
 
        DBG("[rotation-thread] Started rotation thread");
 
@@ -791,9 +928,9 @@ void *thread_rotation(void *data)
        health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
        health_code_update();
 
-       ret = init_thread_state(handle, &state);
+       ret = init_thread_state(handle, &thread);
        if (ret) {
-               goto end;
+               goto error;
        }
 
        /* Ready to handle client connections. */
@@ -804,7 +941,7 @@ void *thread_rotation(void *data)
 
                health_poll_entry();
                DBG("[rotation-thread] Entering poll wait");
-               ret = lttng_poll_wait(&state.events, -1);
+               ret = lttng_poll_wait(&thread.events, -1);
                DBG("[rotation-thread] Poll wait returned (%i)", ret);
                health_poll_exit();
                if (ret < 0) {
@@ -820,45 +957,62 @@ void *thread_rotation(void *data)
 
                fd_count = ret;
                for (i = 0; i < fd_count; i++) {
-                       int fd = LTTNG_POLL_GETFD(&state.events, i);
-                       uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
+                       int fd = LTTNG_POLL_GETFD(&thread.events, i);
+                       uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
 
                        DBG("[rotation-thread] Handling fd (%i) activity (%u)",
                                        fd, revents);
 
-                       if (fd == handle->thread_quit_pipe) {
-                               DBG("[rotation-thread] Quit pipe activity");
-                               goto exit;
-                       } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
-                               ret = handle_rotate_timer_pipe(revents,
-                                               handle, &state, handle->rotation_timer_queue);
+                       if (revents & LPOLLERR) {
+                               ERR("[rotation-thread] Polling returned an error on fd %i", fd);
+                               goto error;
+                       }
+
+                       if (fd == rotate_notification_channel->socket) {
+                               ret = handle_notification_channel(fd, handle,
+                                               &thread);
                                if (ret) {
-                                       ERR("[rotation-thread] Failed to handle rotation timer pipe event");
+                                       ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
                                        goto error;
                                }
-                       } else if (fd == handle->ust32_consumer ||
-                                       fd == handle->ust64_consumer ||
-                                       fd == handle->kernel_consumer) {
-                               ret = handle_channel_rotation_pipe(fd,
-                                               revents, handle, &state);
-                               if (ret) {
-                                       ERR("[rotation-thread] Failed to handle channel rotation pipe");
-                                       goto error;
+                       } else {
+                               /* Job queue or quit pipe activity. */
+                               if (fd == lttng_pipe_get_readfd(
+                                               handle->rotation_timer_queue->event_pipe)) {
+                                       char buf;
+
+                                       ret = lttng_read(fd, &buf, 1);
+                                       if (ret != 1) {
+                                               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+                                               ret = -1;
+                                               goto error;
+                                       }
                                }
-                       } else if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, revents,
-                                               handle, &state);
+
+                               /*
+                                * The job queue is serviced if there is
+                                * activity on the quit pipe to ensure it is
+                                * flushed and all references held in the queue
+                                * are released.
+                                */
+                               ret = handle_job_queue(handle, &thread,
+                                               handle->rotation_timer_queue);
                                if (ret) {
-                                       ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
+                                       ERR("[rotation-thread] Failed to handle rotation timer pipe event");
                                        goto error;
                                }
+
+                               if (sessiond_check_thread_quit_pipe(fd, revents)) {
+                                       DBG("[rotation-thread] Quit pipe activity");
+                                       goto exit;
+                               }
                        }
                }
        }
 exit:
 error:
        DBG("[rotation-thread] Exit");
-       fini_thread_state(&state);
+       fini_thread_state(&thread);
        health_unregister(health_sessiond);
        rcu_thread_offline();
        rcu_unregister_thread();
This page took 0.037035 seconds and 4 git commands to generate.