Make the launch of the application registration thread blocking
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
index 64f958233cdc6ccb22ee671a5d5feb51fd6cae67..8e63e160f980c15f8e60af0ea8f74d88a89e758e 100644 (file)
@@ -44,6 +44,8 @@
 #include "session.h"
 #include "timer.h"
 #include "notification-thread-commands.h"
+#include "utils.h"
+#include "thread.h"
 
 #include <urcu.h>
 #include <urcu/list.h>
@@ -75,7 +77,8 @@ struct rotation_thread_handle {
        struct rotation_thread_timer_queue *rotation_timer_queue;
        /* Access to the notification thread cmd_queue */
        struct notification_thread_handle *notification_thread_handle;
-       sem_t *notification_thread_ready;
+       /* Thread-specific quit pipe. */
+       struct lttng_pipe *quit_pipe;
 };
 
 static
@@ -138,8 +141,6 @@ void log_job_destruction(const struct rotation_thread_job *job)
 void rotation_thread_timer_queue_destroy(
                struct rotation_thread_timer_queue *queue)
 {
-       struct rotation_thread_job *job, *tmp_job;
-
        if (!queue) {
                return;
        }
@@ -147,12 +148,7 @@ void rotation_thread_timer_queue_destroy(
        lttng_pipe_destroy(queue->event_pipe);
 
        pthread_mutex_lock(&queue->lock);
-       /* Empty wait queue. */
-       cds_list_for_each_entry_safe(job, tmp_job, &queue->list, head) {
-               log_job_destruction(job);
-               cds_list_del(&job->head);
-               free(job);
-       }
+       assert(cds_list_empty(&queue->list));
        pthread_mutex_unlock(&queue->lock);
        pthread_mutex_destroy(&queue->lock);
        free(queue);
@@ -164,13 +160,13 @@ void rotation_thread_timer_queue_destroy(
 void rotation_thread_handle_destroy(
                struct rotation_thread_handle *handle)
 {
+       lttng_pipe_destroy(handle->quit_pipe);
        free(handle);
 }
 
 struct rotation_thread_handle *rotation_thread_handle_create(
                struct rotation_thread_timer_queue *rotation_timer_queue,
-               struct notification_thread_handle *notification_thread_handle,
-               sem_t *notification_thread_ready)
+               struct notification_thread_handle *notification_thread_handle)
 {
        struct rotation_thread_handle *handle;
 
@@ -181,10 +177,16 @@ struct rotation_thread_handle *rotation_thread_handle_create(
 
        handle->rotation_timer_queue = rotation_timer_queue;
        handle->notification_thread_handle = notification_thread_handle;
-       handle->notification_thread_ready = notification_thread_ready;
+       handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
+       if (!handle->quit_pipe) {
+               goto error;
+       }
 
 end:
        return handle;
+error:
+       rotation_thread_handle_destroy(handle);
+       return NULL;
 }
 
 /*
@@ -274,19 +276,29 @@ int init_poll_set(struct lttng_poll_event *poll_set,
        int ret;
 
        /*
-        * Create pollset with size 2:
-        *      - quit pipe,
+        * Create pollset with size 3:
+        *      - rotation thread quit pipe,
         *      - rotation thread timer queue pipe,
+        *      - notification channel sock,
         */
-       ret = sessiond_set_thread_pollset(poll_set, 2);
-       if (ret) {
+       ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
+       if (ret < 0) {
                goto error;
        }
+
+       ret = lttng_poll_add(poll_set,
+                       lttng_pipe_get_readfd(handle->quit_pipe),
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[rotation-thread] Failed to add quit pipe read fd to poll set");
+               goto error;
+       }
+
        ret = lttng_poll_add(poll_set,
                        lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
                        LPOLLIN | LPOLLERR);
        if (ret < 0) {
-               ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+               ERR("[rotation-thread] Failed to add rotate_pending fd to poll set");
                goto error;
        }
 
@@ -320,11 +332,6 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
-       /*
-        * We wait until the notification thread is ready to create the
-        * notification channel and add it to the poll_set.
-        */
-       sem_wait(handle->notification_thread_ready);
        rotate_notification_channel = lttng_notification_channel_create(
                        lttng_session_daemon_notification_endpoint);
        if (!rotate_notification_channel) {
@@ -535,6 +542,19 @@ int check_session_rotation_pending(struct ltt_session *session,
        DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
                        session->name, session->current_archive_id - 1);
 
+       /*
+        * The rotation-pending check timer of a session is launched in
+        * one-shot mode. If the rotation is incomplete, the rotation
+        * thread will re-enable the pending-check timer.
+        *
+        * The timer thread can't stop the timer itself since it is involved
+        * in the check for the timer's quiescence.
+        */
+       ret = timer_session_rotation_pending_check_stop(session);
+       if (ret) {
+               goto end;
+       }
+
        if (session->rotation_pending_local) {
                /* Updates session->rotation_pending_local as needed. */
                ret = check_session_rotation_pending_local(session);
@@ -710,15 +730,6 @@ int handle_job_queue(struct rotation_thread_handle *handle,
                struct rotation_thread_timer_queue *queue)
 {
        int ret = 0;
-       int fd = lttng_pipe_get_readfd(queue->event_pipe);
-       char buf;
-
-       ret = lttng_read(fd, &buf, 1);
-       if (ret != 1) {
-               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
-               ret = -1;
-               goto end;
-       }
 
        for (;;) {
                struct ltt_session *session;
@@ -759,6 +770,7 @@ int handle_job_queue(struct rotation_thread_handle *handle,
                session_lock(session);
                ret = run_job(job, session, handle->notification_thread_handle);
                session_unlock(session);
+               /* Release reference held by the job. */
                session_put(session);
                session_unlock_list();
                free(job);
@@ -917,6 +929,8 @@ void *thread_rotation(void *data)
        int ret;
        struct rotation_thread_handle *handle = data;
        struct rotation_thread thread;
+       const int queue_pipe_fd = lttng_pipe_get_readfd(
+                       handle->rotation_timer_queue->event_pipe);
 
        DBG("[rotation-thread] Started rotation thread");
 
@@ -936,9 +950,6 @@ void *thread_rotation(void *data)
                goto error;
        }
 
-       /* Ready to handle client connections. */
-       sessiond_notify_ready();
-
        while (true) {
                int fd_count, i;
 
@@ -971,23 +982,41 @@ void *thread_rotation(void *data)
                                goto error;
                        }
 
-                       if (sessiond_check_thread_quit_pipe(fd, revents)) {
-                               DBG("[rotation-thread] Quit pipe activity");
-                               /* TODO flush the queue. */
-                               goto exit;
-                       } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
+                       if (fd == rotate_notification_channel->socket) {
+                               ret = handle_notification_channel(fd, handle,
+                                               &thread);
+                               if (ret) {
+                                       ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
+                                       goto error;
+                               }
+                       } else {
+                               /* Job queue or quit pipe activity. */
+
+                               /*
+                                * The job queue is serviced if there is
+                                * activity on the quit pipe to ensure it is
+                                * flushed and all references held in the queue
+                                * are released.
+                                */
                                ret = handle_job_queue(handle, &thread,
                                                handle->rotation_timer_queue);
                                if (ret) {
                                        ERR("[rotation-thread] Failed to handle rotation timer pipe event");
                                        goto error;
                                }
-                       } else if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, handle,
-                                               &thread);
-                               if (ret) {
-                                       ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
-                                       goto error;
+
+                               if (fd == queue_pipe_fd) {
+                                       char buf;
+
+                                       ret = lttng_read(fd, &buf, 1);
+                                       if (ret != 1) {
+                                               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+                                               ret = -1;
+                                               goto error;
+                                       }
+                               } else {
+                                       DBG("[rotation-thread] Quit pipe activity");
+                                       goto exit;
                                }
                        }
                }
@@ -1002,3 +1031,30 @@ error:
 end:
        return NULL;
 }
+
+static
+bool shutdown_rotation_thread(void *thread_data)
+{
+       struct rotation_thread_handle *handle = thread_data;
+       const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
+
+       return notify_thread_pipe(write_fd) == 1;
+}
+
+bool launch_rotation_thread(struct rotation_thread_handle *handle)
+{
+       struct lttng_thread *thread;
+
+       thread = lttng_thread_create("Rotation",
+                       thread_rotation,
+                       shutdown_rotation_thread,
+                       NULL,
+                       handle);
+       if (!thread) {
+               goto error;
+       }
+       lttng_thread_put(thread);
+       return true;
+error:
+       return false;
+}
This page took 0.026285 seconds and 4 git commands to generate.