Fix: undefined operation on last_relay_viewer_session_id
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 8c716dbde9852e1de1b9a04261fb59c22035e767..ed52012407be36ef76a04d1c494a028d148bf6cd 100644 (file)
@@ -17,6 +17,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
 #include <limits.h>
@@ -94,7 +95,7 @@ static uint64_t last_relay_viewer_session_id;
  * Cleanup the daemon
  */
 static
-void cleanup(void)
+void cleanup_relayd_live(void)
 {
        DBG("Cleaning up");
 
@@ -325,40 +326,12 @@ error_unlock:
        return ret;
 }
 
-/*
- * Write to writable pipe used to notify a thread.
- */
-static
-int notify_thread_pipe(int wpipe)
+int relayd_live_stop(void)
 {
-       ssize_t ret;
-
-       ret = lttng_write(wpipe, "!", 1);
-       if (ret < 1) {
-               PERROR("write poll pipe");
-       }
-
-       return (int) ret;
-}
-
-/*
- * Stop all threads by closing the thread quit pipe.
- */
-static
-void stop_threads(void)
-{
-       int ret;
-
-       /* Stopping all threads */
-       DBG("Terminating all live threads");
-       ret = notify_thread_pipe(thread_quit_pipe[1]);
-       if (ret < 0) {
-               ERR("write error on thread quit pipe");
-       }
-
-       /* Dispatch thread */
+       /* Stop dispatch thread */
        CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
        futex_nto1_wake(&viewer_conn_queue.futex);
+       return 0;
 }
 
 /*
@@ -515,6 +488,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -558,11 +536,12 @@ restart:
                                new_conn->sock = newsock;
 
                                /* Enqueue request for the dispatcher thread. */
-                               cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode);
+                               cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
+                                                &new_conn->qnode);
 
                                /*
                                 * Wake the dispatch queue futex. Implicit memory barrier with
-                                * the exchange in cds_wfq_enqueue.
+                                * the exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&viewer_conn_queue.futex);
                        }
@@ -589,7 +568,9 @@ error_sock_control:
        }
        health_unregister(health_relayd);
        DBG("Live viewer listener thread cleanup complete");
-       stop_threads();
+       if (lttng_relay_stop_threads()) {
+               ERR("Error stopping threads");
+       }
        return NULL;
 }
 
@@ -601,7 +582,7 @@ void *thread_dispatcher(void *data)
 {
        int err = -1;
        ssize_t ret;
-       struct cds_wfq_node *node;
+       struct cds_wfcq_node *node;
        struct relay_connection *conn = NULL;
 
        DBG("[thread] Live viewer relay dispatcher started");
@@ -624,7 +605,8 @@ void *thread_dispatcher(void *data)
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue);
+                       node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
+                                                        &viewer_conn_queue.tail);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the live-viewer "
                                                "relay command queue");
@@ -665,7 +647,9 @@ error_testpoint:
        }
        health_unregister(health_relayd);
        DBG("Live viewer dispatch thread dying");
-       stop_threads();
+       if (lttng_relay_stop_threads()) {
+               ERR("Error stopping threads");
+       }
        return NULL;
 }
 
@@ -728,7 +712,12 @@ int viewer_connect(struct relay_connection *conn)
        reply.major = htobe32(reply.major);
        reply.minor = htobe32(reply.minor);
        if (conn->type == RELAY_VIEWER_COMMAND) {
-               reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
+               /*
+                * Increment outside of htobe64 macro, because can be used more than once
+                * within the macro, and thus the operation may be undefined.
+                */
+               last_relay_viewer_session_id++;
+               reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
        }
 
        health_code_update();
@@ -801,14 +790,9 @@ int viewer_list_sessions(struct relay_connection *conn)
        }
        health_code_update();
 
-       rcu_read_unlock();
        ret = 0;
-       goto end;
-
 end_unlock:
        rcu_read_unlock();
-
-end:
        return ret;
 }
 
@@ -1320,7 +1304,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
        pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
        if (ret < 0) {
-               goto end;
+               goto end_unlock;
        } else if (ret == 1) {
                /*
                 * This means the viewer index data structure has been populated by the
@@ -1948,6 +1932,11 @@ restart:
 
                        health_code_update();
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -2034,7 +2023,9 @@ error_testpoint:
                ERR("Health error occurred in %s", __func__);
        }
        health_unregister(health_relayd);
-       stop_threads();
+       if (lttng_relay_stop_threads()) {
+               ERR("Error stopping threads");
+       }
        rcu_unregister_thread();
        return NULL;
 }
@@ -2045,55 +2036,54 @@ error_testpoint:
  */
 static int create_conn_pipe(void)
 {
-       int ret;
-
-       ret = utils_create_pipe_cloexec(live_conn_pipe);
-
-       return ret;
+       return utils_create_pipe_cloexec(live_conn_pipe);
 }
 
-void live_stop_threads(void)
+int relayd_live_join(void)
 {
-       int ret;
+       int ret, retval = 0;
        void *status;
 
-       stop_threads();
-
        ret = pthread_join(live_listener_thread, &status);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_join live listener");
-               goto error;     /* join error, exit without cleanup */
+               retval = -1;
        }
 
        ret = pthread_join(live_worker_thread, &status);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_join live worker");
-               goto error;     /* join error, exit without cleanup */
+               retval = -1;
        }
 
        ret = pthread_join(live_dispatcher_thread, &status);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_join live dispatcher");
-               goto error;     /* join error, exit without cleanup */
+               retval = -1;
        }
 
-       cleanup();
+       cleanup_relayd_live();
 
-error:
-       return;
+       return retval;
 }
 
 /*
  * main
  */
-int live_start_threads(struct lttng_uri *uri,
+int relayd_live_create(struct lttng_uri *uri,
                struct relay_local_data *relay_ctx)
 {
-       int ret = 0;
+       int ret = 0, retval = 0;
        void *status;
        int is_root;
 
-       assert(uri);
+       if (!uri) {
+               retval = -1;
+               goto exit_init_data;
+       }
        live_uri = uri;
 
        /* Check if daemon is UID = 0 */
@@ -2102,74 +2092,86 @@ int live_start_threads(struct lttng_uri *uri,
        if (!is_root) {
                if (live_uri->port < 1024) {
                        ERR("Need to be root to use ports < 1024");
-                       ret = -1;
-                       goto exit;
+                       retval = -1;
+                       goto exit_init_data;
                }
        }
 
        /* Setup the thread apps communication pipe. */
-       if ((ret = create_conn_pipe()) < 0) {
-               goto exit;
+       if (create_conn_pipe()) {
+               retval = -1;
+               goto exit_init_data;
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&viewer_conn_queue.queue);
+       cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
 
        /* Set up max poll set size */
-       lttng_poll_set_max_size();
+       if (lttng_poll_set_max_size()) {
+               retval = -1;
+               goto exit_init_data;
+       }
 
        /* Setup the dispatcher thread */
        ret = pthread_create(&live_dispatcher_thread, NULL,
                        thread_dispatcher, (void *) NULL);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create viewer dispatcher");
-               goto exit_dispatcher;
+               retval = -1;
+               goto exit_dispatcher_thread;
        }
 
        /* Setup the worker thread */
        ret = pthread_create(&live_worker_thread, NULL,
                        thread_worker, relay_ctx);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create viewer worker");
-               goto exit_worker;
+               retval = -1;
+               goto exit_worker_thread;
        }
 
        /* Setup the listener thread */
        ret = pthread_create(&live_listener_thread, NULL,
                        thread_listener, (void *) NULL);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create viewer listener");
-               goto exit_listener;
+               retval = -1;
+               goto exit_listener_thread;
        }
 
-       ret = 0;
-       goto end;
+       /*
+        * All OK, started all threads.
+        */
+       return retval;
 
-exit_listener:
-       ret = pthread_join(live_listener_thread, &status);
-       if (ret != 0) {
-               PERROR("pthread_join live listener");
-               goto error;     /* join error, exit without cleanup */
-       }
+       /*
+        * Join on the live_listener_thread should anything be added after
+        * the live_listener thread's creation.
+        */
+
+exit_listener_thread:
 
-exit_worker:
        ret = pthread_join(live_worker_thread, &status);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_join live worker");
-               goto error;     /* join error, exit without cleanup */
+               retval = -1;
        }
+exit_worker_thread:
 
-exit_dispatcher:
        ret = pthread_join(live_dispatcher_thread, &status);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_join live dispatcher");
-               goto error;     /* join error, exit without cleanup */
+               retval = -1;
        }
+exit_dispatcher_thread:
 
-exit:
-       cleanup();
+exit_init_data:
+       cleanup_relayd_live();
 
-end:
-error:
-       return ret;
+       return retval;
 }
This page took 0.027691 seconds and 4 git commands to generate.