Accept uid and gid parameters in utils_mkdir()/utils_mkdir_recursive()
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index beb67b2a5654ed54cc5ef36a75408c2d6b667364..562a7fa522017ae4126d2d7b76847fbed9f81dc6 100644 (file)
@@ -326,42 +326,12 @@ error_unlock:
        return ret;
 }
 
-/*
- * Write to writable pipe used to notify a thread.
- */
-static
-int notify_thread_pipe(int wpipe)
-{
-       ssize_t ret;
-
-       ret = lttng_write(wpipe, "!", 1);
-       if (ret < 1) {
-               PERROR("write poll pipe");
-       }
-
-       return (int) ret;
-}
-
-/*
- * Stop all threads by closing the thread quit pipe.
- */
-static
-int stop_threads(void)
+int relayd_live_stop(void)
 {
-       int ret, retval = 0;
-
-       /* Stopping all threads */
-       DBG("Terminating all live threads");
-       ret = notify_thread_pipe(thread_quit_pipe[1]);
-       if (ret < 0) {
-               ERR("write error on thread quit pipe");
-               retval = -1;
-       }
-
-       /* Dispatch thread */
+       /* Stop dispatch thread */
        CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
        futex_nto1_wake(&viewer_conn_queue.futex);
-       return retval;
+       return 0;
 }
 
 /*
@@ -518,6 +488,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -593,8 +568,8 @@ error_sock_control:
        }
        health_unregister(health_relayd);
        DBG("Live viewer listener thread cleanup complete");
-       if (stop_threads()) {
-               ERR("Error stopping live threads");
+       if (lttng_relay_stop_threads()) {
+               ERR("Error stopping threads");
        }
        return NULL;
 }
@@ -672,8 +647,8 @@ error_testpoint:
        }
        health_unregister(health_relayd);
        DBG("Live viewer dispatch thread dying");
-       if (stop_threads()) {
-               ERR("Error stopping live threads");
+       if (lttng_relay_stop_threads()) {
+               ERR("Error stopping threads");
        }
        return NULL;
 }
@@ -737,7 +712,12 @@ int viewer_connect(struct relay_connection *conn)
        reply.major = htobe32(reply.major);
        reply.minor = htobe32(reply.minor);
        if (conn->type == RELAY_VIEWER_COMMAND) {
-               reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
+               /*
+                * Increment outside of htobe64 macro, because can be used more than once
+                * within the macro, and thus the operation may be undefined.
+                */
+               last_relay_viewer_session_id++;
+               reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
        }
 
        health_code_update();
@@ -1883,13 +1863,13 @@ void *thread_worker(void *data)
 {
        int ret, err = -1;
        uint32_t nb_fd;
-       struct relay_connection *conn;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
        struct lttng_ht_iter iter;
        struct lttng_viewer_cmd recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
        struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+       struct relay_connection *destroy_conn;
 
        DBG("[thread] Live viewer relay worker started");
 
@@ -1952,6 +1932,11 @@ restart:
 
                        health_code_update();
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -1965,6 +1950,8 @@ restart:
                                        ERR("Relay live pipe error");
                                        goto error;
                                } else if (revents & LPOLLIN) {
+                                       struct relay_connection *conn;
+
                                        ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
                                        if (ret < 0) {
                                                goto error;
@@ -1980,6 +1967,8 @@ restart:
                                        DBG("Connection socket %d added", conn->sock->fd);
                                }
                        } else {
+                               struct relay_connection *conn;
+
                                rcu_read_lock();
                                conn = connection_find_by_sock(relay_connections_ht, pollfd);
                                /* If not found, there is a synchronization issue. */
@@ -2017,10 +2006,11 @@ error:
 
        /* Cleanup reamaining connection object. */
        rcu_read_lock();
-       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
+                       destroy_conn,
                        sock_n.node) {
                health_code_update();
-               destroy_connection(relay_connections_ht, conn);
+               destroy_connection(relay_connections_ht, destroy_conn);
        }
        rcu_read_unlock();
 error_poll_create:
@@ -2038,8 +2028,8 @@ error_testpoint:
                ERR("Health error occurred in %s", __func__);
        }
        health_unregister(health_relayd);
-       if (stop_threads()) {
-               ERR("Error stopping live threads");
+       if (lttng_relay_stop_threads()) {
+               ERR("Error stopping threads");
        }
        rcu_unregister_thread();
        return NULL;
@@ -2054,11 +2044,6 @@ static int create_conn_pipe(void)
        return utils_create_pipe_cloexec(live_conn_pipe);
 }
 
-int relayd_live_stop(void)
-{
-       return stop_threads();
-}
-
 int relayd_live_join(void)
 {
        int ret, retval = 0;
@@ -2127,7 +2112,10 @@ int relayd_live_create(struct lttng_uri *uri,
        cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
 
        /* Set up max poll set size */
-       lttng_poll_set_max_size();
+       if (lttng_poll_set_max_size()) {
+               retval = -1;
+               goto exit_init_data;
+       }
 
        /* Setup the dispatcher thread */
        ret = pthread_create(&live_dispatcher_thread, NULL,
@@ -2164,13 +2152,11 @@ int relayd_live_create(struct lttng_uri *uri,
         */
        return retval;
 
+       /*
+        * Join on the live_listener_thread should anything be added after
+        * the live_listener thread's creation.
+        */
 
-       ret = pthread_join(live_listener_thread, &status);
-       if (ret) {
-               errno = ret;
-               PERROR("pthread_join live listener");
-               retval = -1;
-       }
 exit_listener_thread:
 
        ret = pthread_join(live_worker_thread, &status);
This page took 0.02654 seconds and 4 git commands to generate.