Fix: flush the rotation thread's job queue on exit
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
index f78e8ea25adf53bed9ad917e87338deaac328c95..043993a60484130450dbe3dd75818b72ea9f4b66 100644 (file)
@@ -56,7 +56,7 @@ struct rotation_thread {
 
 struct rotation_thread_job {
        enum rotation_thread_job_type type;
-       uint64_t session_id;
+       struct ltt_session *session;
        /* List member in struct rotation_thread_timer_queue. */
        struct cds_list_head head;
 };
@@ -75,7 +75,6 @@ struct rotation_thread_handle {
        struct rotation_thread_timer_queue *rotation_timer_queue;
        /* Access to the notification thread cmd_queue */
        struct notification_thread_handle *notification_thread_handle;
-       sem_t *notification_thread_ready;
 };
 
 static
@@ -131,8 +130,8 @@ void log_job_destruction(const struct rotation_thread_job *job)
                abort();
        }
 
-       LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session %" PRIu64 " on destruction",
-                       job_type_str, job->session_id);
+       LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session \"%s\" on destruction",
+                       job_type_str, job->session->name);
 }
 
 void rotation_thread_timer_queue_destroy(
@@ -169,8 +168,7 @@ void rotation_thread_handle_destroy(
 
 struct rotation_thread_handle *rotation_thread_handle_create(
                struct rotation_thread_timer_queue *rotation_timer_queue,
-               struct notification_thread_handle *notification_thread_handle,
-               sem_t *notification_thread_ready)
+               struct notification_thread_handle *notification_thread_handle)
 {
        struct rotation_thread_handle *handle;
 
@@ -181,7 +179,6 @@ struct rotation_thread_handle *rotation_thread_handle_create(
 
        handle->rotation_timer_queue = rotation_timer_queue;
        handle->notification_thread_handle = notification_thread_handle;
-       handle->notification_thread_ready = notification_thread_ready;
 
 end:
        return handle;
@@ -193,13 +190,14 @@ end:
  */
 static
 bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
-               enum rotation_thread_job_type job_type, uint64_t session_id)
+               enum rotation_thread_job_type job_type,
+               struct ltt_session *session)
 {
        bool exists = false;
        struct rotation_thread_job *job;
 
        cds_list_for_each_entry(job, &queue->list, head) {
-               if (job->session_id == session_id && job->type == job_type) {
+               if (job->session == session && job->type == job_type) {
                        exists = true;
                        goto end;
                }
@@ -209,7 +207,8 @@ end:
 }
 
 void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
-               enum rotation_thread_job_type job_type, uint64_t session_id)
+               enum rotation_thread_job_type job_type,
+               struct ltt_session *session)
 {
        int ret;
        const char * const dummy = "!";
@@ -217,7 +216,7 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
        const char *job_type_str = get_job_type_str(job_type);
 
        pthread_mutex_lock(&queue->lock);
-       if (timer_job_exists(queue, session_id, job_type)) {
+       if (timer_job_exists(queue, job_type, session)) {
                /*
                 * This timer job is already pending, we don't need to add
                 * it.
@@ -227,12 +226,15 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
 
        job = zmalloc(sizeof(struct rotation_thread_job));
        if (!job) {
-               PERROR("Failed to allocate rotation thread job of type \"%s\" for session id %" PRIu64,
-                               job_type_str, session_id);
+               PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
+                               job_type_str, session->name);
                goto end;
        }
+       /* No reason for this to fail as the caller must hold a reference. */
+       (void) session_get(session);
+
+       job->session = session;
        job->type = job_type;
-       job->session_id = session_id;
        cds_list_add_tail(&job->head, &queue->list);
 
        ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
@@ -253,8 +255,8 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
                        DBG("Wake-up pipe of rotation thread job queue is full");
                        goto end;
                }
-               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session id %" PRIu64,
-                               job_type_str, session_id);
+               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
+                               job_type_str, session->name);
                goto end;
        }
 
@@ -315,11 +317,6 @@ int init_thread_state(struct rotation_thread_handle *handle,
                goto end;
        }
 
-       /*
-        * We wait until the notification thread is ready to create the
-        * notification channel and add it to the poll_set.
-        */
-       sem_wait(handle->notification_thread_ready);
        rotate_notification_channel = lttng_notification_channel_create(
                        lttng_session_daemon_notification_endpoint);
        if (!rotate_notification_channel) {
@@ -530,6 +527,19 @@ int check_session_rotation_pending(struct ltt_session *session,
        DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
                        session->name, session->current_archive_id - 1);
 
+       /*
+        * The rotation-pending check timer of a session is launched in
+        * one-shot mode. If the rotation is incomplete, the rotation
+        * thread will re-enable the pending-check timer.
+        *
+        * The timer thread can't stop the timer itself since it is involved
+        * in the check for the timer's quiescence.
+        */
+       ret = timer_session_rotation_pending_check_stop(session);
+       if (ret) {
+               goto end;
+       }
+
        if (session->rotation_pending_local) {
                /* Updates session->rotation_pending_local as needed. */
                ret = check_session_rotation_pending_local(session);
@@ -705,18 +715,9 @@ int handle_job_queue(struct rotation_thread_handle *handle,
                struct rotation_thread_timer_queue *queue)
 {
        int ret = 0;
-       int fd = lttng_pipe_get_readfd(queue->event_pipe);
-       struct ltt_session *session;
-       char buf;
-
-       ret = lttng_read(fd, &buf, 1);
-       if (ret != 1) {
-               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
-               ret = -1;
-               goto end;
-       }
 
        for (;;) {
+               struct ltt_session *session;
                struct rotation_thread_job *job;
 
                /* Take the queue lock only to pop an element from the list. */
@@ -731,10 +732,10 @@ int handle_job_queue(struct rotation_thread_handle *handle,
                pthread_mutex_unlock(&queue->lock);
 
                session_lock_list();
-               session = session_find_by_id(job->session_id);
+               session = job->session;
                if (!session) {
-                       DBG("[rotation-thread] Session %" PRIu64 " not found",
-                                       job->session_id);
+                       DBG("[rotation-thread] Session \"%s\" not found",
+                                       session->name);
                        /*
                         * This is a non-fatal error, and we cannot report it to
                         * the user (timer), so just print the error and
@@ -747,12 +748,15 @@ int handle_job_queue(struct rotation_thread_handle *handle,
                         */
                        session_unlock_list();
                        free(job);
+                       session_put(session);
                        continue;
                }
 
                session_lock(session);
                ret = run_job(job, session, handle->notification_thread_handle);
                session_unlock(session);
+               /* Release reference held by the job. */
+               session_put(session);
                session_unlock_list();
                free(job);
                if (ret) {
@@ -841,6 +845,7 @@ int handle_condition(const struct lttng_condition *condition,
 
 end_unlock:
        session_unlock(session);
+       session_put(session);
 end:
        return ret;
 }
@@ -963,23 +968,43 @@ void *thread_rotation(void *data)
                                goto error;
                        }
 
-                       if (sessiond_check_thread_quit_pipe(fd, revents)) {
-                               DBG("[rotation-thread] Quit pipe activity");
-                               /* TODO flush the queue. */
-                               goto exit;
-                       } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
+                       if (fd == rotate_notification_channel->socket) {
+                               ret = handle_notification_channel(fd, handle,
+                                               &thread);
+                               if (ret) {
+                                       ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
+                                       goto error;
+                               }
+                       } else {
+                               /* Job queue or quit pipe activity. */
+                               if (fd == lttng_pipe_get_readfd(
+                                               handle->rotation_timer_queue->event_pipe)) {
+                                       char buf;
+
+                                       ret = lttng_read(fd, &buf, 1);
+                                       if (ret != 1) {
+                                               ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
+                                               ret = -1;
+                                               goto error;
+                                       }
+                               }
+
+                               /*
+                                * The job queue is serviced if there is
+                                * activity on the quit pipe to ensure it is
+                                * flushed and all references held in the queue
+                                * are released.
+                                */
                                ret = handle_job_queue(handle, &thread,
                                                handle->rotation_timer_queue);
                                if (ret) {
                                        ERR("[rotation-thread] Failed to handle rotation timer pipe event");
                                        goto error;
                                }
-                       } else if (fd == rotate_notification_channel->socket) {
-                               ret = handle_notification_channel(fd, handle,
-                                               &thread);
-                               if (ret) {
-                                       ERR("[rotation-thread] Error occurred while handling activity on notification channel socket");
-                                       goto error;
+
+                               if (sessiond_check_thread_quit_pipe(fd, revents)) {
+                                       DBG("[rotation-thread] Quit pipe activity");
+                                       goto exit;
                                }
                        }
                }
This page took 0.026876 seconds and 4 git commands to generate.