Fix: various compat poll/epoll issues
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 3a6bebaf7d6d3dc1ab5f7cf4fe76522d6f5c56fd..1ec1eeaa4b14be22fa0a8cde2d708279a50bcf4b 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
 #include <limits.h>
@@ -233,6 +234,11 @@ int set_option(int opt, const char *arg, const char *optname)
                break;
        case 'g':
                tracing_group_name = strdup(arg);
+               if (tracing_group_name == NULL) {
+                       ret = -errno;
+                       PERROR("strdup");
+                       goto end;
+               }
                tracing_group_name_override = 1;
                break;
        case 'h':
@@ -330,7 +336,7 @@ end:
 static
 int set_options(int argc, char **argv)
 {
-       int c, ret = 0, option_index = 0;
+       int c, ret = 0, option_index = 0, retval = 0;
        int orig_optopt = optopt, orig_optind = optind;
        char *default_address, *optstring;
        const char *config_path = NULL;
@@ -338,7 +344,7 @@ int set_options(int argc, char **argv)
        optstring = utils_generate_optstring(long_options,
                        sizeof(long_options) / sizeof(struct option));
        if (!optstring) {
-               ret = -ENOMEM;
+               retval = -ENOMEM;
                goto exit;
        }
 
@@ -347,7 +353,7 @@ int set_options(int argc, char **argv)
        while ((c = getopt_long(argc, argv, optstring, long_options,
                                        &option_index)) != -1) {
                if (c == '?') {
-                       ret = -EINVAL;
+                       retval = -EINVAL;
                        goto exit;
                } else if (c != 'f') {
                        continue;
@@ -364,8 +370,8 @@ int set_options(int argc, char **argv)
        if (ret) {
                if (ret > 0) {
                        ERR("Invalid configuration option at line %i", ret);
-                       ret = -1;
                }
+               retval = -1;
                goto exit;
        }
 
@@ -380,6 +386,7 @@ int set_options(int argc, char **argv)
 
                ret = set_option(c, optarg, long_options[option_index].name);
                if (ret < 0) {
+                       retval = -1;
                        goto exit;
                }
        }
@@ -391,6 +398,7 @@ int set_options(int argc, char **argv)
                        DEFAULT_NETWORK_CONTROL_PORT);
                if (ret < 0) {
                        PERROR("asprintf default data address");
+                       retval = -1;
                        goto exit;
                }
 
@@ -398,6 +406,7 @@ int set_options(int argc, char **argv)
                free(default_address);
                if (ret < 0) {
                        ERR("Invalid control URI specified");
+                       retval = -1;
                        goto exit;
                }
        }
@@ -407,6 +416,7 @@ int set_options(int argc, char **argv)
                        DEFAULT_NETWORK_DATA_PORT);
                if (ret < 0) {
                        PERROR("asprintf default data address");
+                       retval = -1;
                        goto exit;
                }
 
@@ -414,6 +424,7 @@ int set_options(int argc, char **argv)
                free(default_address);
                if (ret < 0) {
                        ERR("Invalid data URI specified");
+                       retval = -1;
                        goto exit;
                }
        }
@@ -423,6 +434,7 @@ int set_options(int argc, char **argv)
                        DEFAULT_NETWORK_VIEWER_PORT);
                if (ret < 0) {
                        PERROR("asprintf default viewer control address");
+                       retval = -1;
                        goto exit;
                }
 
@@ -430,23 +442,32 @@ int set_options(int argc, char **argv)
                free(default_address);
                if (ret < 0) {
                        ERR("Invalid viewer control URI specified");
+                       retval = -1;
                        goto exit;
                }
        }
 
 exit:
        free(optstring);
-       return ret;
+       return retval;
 }
 
 /*
  * Cleanup the daemon
  */
 static
-void cleanup(void)
+void relayd_cleanup(struct relay_local_data *relay_ctx)
 {
        DBG("Cleaning up");
 
+       if (viewer_streams_ht)
+               lttng_ht_destroy(viewer_streams_ht);
+       if (relay_streams_ht)
+               lttng_ht_destroy(relay_streams_ht);
+       if (relay_ctx && relay_ctx->sessions_ht)
+               lttng_ht_destroy(relay_ctx->sessions_ht);
+       free(relay_ctx);
+
        /* free the dynamically allocated opt_output_path */
        free(opt_output_path);
 
@@ -473,41 +494,55 @@ int notify_thread_pipe(int wpipe)
        ret = lttng_write(wpipe, "!", 1);
        if (ret < 1) {
                PERROR("write poll pipe");
+               goto end;
        }
-
+       ret = 0;
+end:
        return ret;
 }
 
-static void notify_health_quit_pipe(int *pipe)
+static
+int notify_health_quit_pipe(int *pipe)
 {
        ssize_t ret;
 
        ret = lttng_write(pipe[1], "4", 1);
        if (ret < 1) {
                PERROR("write relay health quit");
+               goto end;
        }
+       ret = 0;
+end:
+       return ret;
 }
 
 /*
- * Stop all threads by closing the thread quit pipe.
+ * Stop all relayd and relayd-live threads.
  */
-static
-void stop_threads(void)
+int lttng_relay_stop_threads(void)
 {
-       int ret;
+       int retval = 0;
 
        /* Stopping all threads */
        DBG("Terminating all threads");
-       ret = notify_thread_pipe(thread_quit_pipe[1]);
-       if (ret < 0) {
+       if (notify_thread_pipe(thread_quit_pipe[1])) {
                ERR("write error on thread quit pipe");
+               retval = -1;
        }
 
-       notify_health_quit_pipe(health_quit_pipe);
+       if (notify_health_quit_pipe(health_quit_pipe)) {
+               ERR("write error on health quit pipe");
+       }
 
        /* Dispatch thread */
        CMM_STORE_SHARED(dispatch_thread_exit, 1);
        futex_nto1_wake(&relay_conn_queue.futex);
+
+       if (relayd_live_stop()) {
+               ERR("Error stopping live threads");
+               retval = -1;
+       }
+       return retval;
 }
 
 /*
@@ -525,11 +560,15 @@ void sighandler(int sig)
                return;
        case SIGINT:
                DBG("SIGINT caught");
-               stop_threads();
+               if (lttng_relay_stop_threads()) {
+                       ERR("Error stopping threads");
+               }
                break;
        case SIGTERM:
                DBG("SIGTERM caught");
-               stop_threads();
+               if (lttng_relay_stop_threads()) {
+                       ERR("Error stopping threads");
+               }
                break;
        case SIGUSR1:
                CMM_STORE_SHARED(recv_child_signal, 1);
@@ -882,11 +921,12 @@ restart:
                                new_conn->sock = newsock;
 
                                /* Enqueue request for the dispatcher thread. */
-                               cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode);
+                               cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
+                                                &new_conn->qnode);
 
                                /*
                                 * Wake the dispatch queue futex. Implicit memory barrier with
-                                * the exchange in cds_wfq_enqueue.
+                                * the exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
                        }
@@ -921,7 +961,7 @@ error_sock_control:
        }
        health_unregister(health_relayd);
        DBG("Relay listener thread cleanup complete");
-       stop_threads();
+       lttng_relay_stop_threads();
        return NULL;
 }
 
@@ -933,7 +973,7 @@ void *relay_thread_dispatcher(void *data)
 {
        int err = -1;
        ssize_t ret;
-       struct cds_wfq_node *node;
+       struct cds_wfcq_node *node;
        struct relay_connection *new_conn = NULL;
 
        DBG("[thread] Relay dispatcher started");
@@ -956,7 +996,8 @@ void *relay_thread_dispatcher(void *data)
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue);
+                       node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
+                                                        &relay_conn_queue.tail);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the relay command queue");
                                /* Continue thread execution */
@@ -996,7 +1037,7 @@ error_testpoint:
        }
        health_unregister(health_relayd);
        DBG("Dispatch thread dying");
-       stop_threads();
+       lttng_relay_stop_threads();
        return NULL;
 }
 
@@ -1343,7 +1384,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
        stream->close_flag = 1;
        session->stream_count--;
-       assert(session->stream_count >= 0);
 
        /* Check if we can close it or else the data will do it. */
        try_close_stream(session, stream);
@@ -1942,9 +1982,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1961,6 +2002,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
@@ -1991,6 +2033,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
@@ -2154,6 +2198,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2196,6 +2241,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
@@ -2324,7 +2371,6 @@ int relay_process_data(struct relay_connection *conn)
                                stream->tracefile_size, stream->tracefile_count,
                                relayd_uid, relayd_gid, stream->fd,
                                &(stream->tracefile_count_current), &stream->fd);
-               stream->total_index_received = 0;
                pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
                if (ret < 0) {
                        ERR("Rotating stream output file");
@@ -2419,6 +2465,7 @@ void *relay_thread_worker(void *data)
        struct lttcomm_relayd_hdr recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
        struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+       struct relay_index *index;
 
        DBG("[thread] Relay worker started");
 
@@ -2607,6 +2654,7 @@ restart:
 
                                if (revents & LPOLLIN) {
                                        if (conn->type != RELAY_DATA) {
+                                               rcu_read_unlock();
                                                continue;
                                        }
 
@@ -2650,6 +2698,14 @@ error:
        }
        rcu_read_unlock();
 error_poll_create:
+       rcu_read_lock();
+       cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+                       index_n.node) {
+               health_code_update();
+               relay_index_delete(index);
+               relay_index_free_safe(index);
+       }
+       rcu_read_unlock();
        lttng_ht_destroy(indexes_ht);
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
@@ -2668,7 +2724,7 @@ error_testpoint:
        }
        health_unregister(health_relayd);
        rcu_unregister_thread();
-       stop_threads();
+       lttng_relay_stop_threads();
        return NULL;
 }
 
@@ -2690,31 +2746,35 @@ static int create_relay_conn_pipe(void)
  */
 int main(int argc, char **argv)
 {
-       int ret = 0;
+       int ret = 0, retval = 0;
        void *status;
-       struct relay_local_data *relay_ctx;
+       struct relay_local_data *relay_ctx = NULL;
 
        /* Parse arguments */
        progname = argv[0];
-       if ((ret = set_options(argc, argv)) < 0) {
-               goto exit;
+       if (set_options(argc, argv)) {
+               retval = -1;
+               goto exit_options;
        }
 
-       if ((ret = set_signal_handler()) < 0) {
-               goto exit;
+       if (set_signal_handler()) {
+               retval = -1;
+               goto exit_options;
        }
 
        /* Try to create directory if -o, --output is specified. */
        if (opt_output_path) {
                if (*opt_output_path != '/') {
                        ERR("Please specify an absolute path for -o, --output PATH");
-                       goto exit;
+                       retval = -1;
+                       goto exit_options;
                }
 
                ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
                if (ret < 0) {
                        ERR("Unable to create %s", opt_output_path);
-                       goto exit;
+                       retval = -1;
+                       goto exit_options;
                }
        }
 
@@ -2725,7 +2785,8 @@ int main(int argc, char **argv)
                ret = lttng_daemonize(&child_ppid, &recv_child_signal,
                        !opt_background);
                if (ret < 0) {
-                       goto exit;
+                       retval = -1;
+                       goto exit_options;
                }
 
                /*
@@ -2738,9 +2799,19 @@ int main(int argc, char **argv)
                }
        }
 
+
+       /* Initialize thread health monitoring */
+       health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+       if (!health_relayd) {
+               PERROR("health_app_create error");
+               retval = -1;
+               goto exit_health_app_create;
+       }
+
        /* Create thread quit pipe */
-       if ((ret = init_thread_quit_pipe()) < 0) {
-               goto error;
+       if (init_thread_quit_pipe()) {
+               retval = -1;
+               goto exit_init_data;
        }
 
        /* We need those values for the file/dir creation. */
@@ -2751,18 +2822,19 @@ int main(int argc, char **argv)
        if (relayd_uid == 0) {
                if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
                        ERR("Need to be root to use ports < 1024");
-                       ret = -1;
-                       goto exit;
+                       retval = -1;
+                       goto exit_init_data;
                }
        }
 
        /* Setup the thread apps communication pipe. */
-       if ((ret = create_relay_conn_pipe()) < 0) {
-               goto exit;
+       if (create_relay_conn_pipe()) {
+               retval = -1;
+               goto exit_init_data;
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&relay_conn_queue.queue);
+       cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
 
        /* Set up max poll set size */
        lttng_poll_set_max_size();
@@ -2774,134 +2846,139 @@ int main(int argc, char **argv)
        relay_ctx = zmalloc(sizeof(struct relay_local_data));
        if (!relay_ctx) {
                PERROR("relay_ctx");
-               goto exit;
+               retval = -1;
+               goto exit_init_data;
        }
 
        /* tables of sessions indexed by session ID */
        relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!relay_ctx->sessions_ht) {
-               goto exit_relay_ctx_sessions;
+               retval = -1;
+               goto exit_init_data;
        }
 
        /* tables of streams indexed by stream ID */
        relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!relay_streams_ht) {
-               goto exit_relay_ctx_streams;
+               retval = -1;
+               goto exit_init_data;
        }
 
        /* tables of streams indexed by stream ID */
        viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!viewer_streams_ht) {
-               goto exit_relay_ctx_viewer_streams;
-       }
-
-       /* Initialize thread health monitoring */
-       health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
-       if (!health_relayd) {
-               PERROR("health_app_create error");
-               goto exit_health_app_create;
+               retval = -1;
+               goto exit_init_data;
        }
 
        ret = utils_create_pipe(health_quit_pipe);
-       if (ret < 0) {
-               goto error_health_pipe;
+       if (ret) {
+               retval = -1;
+               goto exit_health_quit_pipe;
        }
 
        /* Create thread to manage the client socket */
        ret = pthread_create(&health_thread, NULL,
                        thread_manage_health, (void *) NULL);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create health");
-               goto health_error;
+               retval = -1;
+               goto exit_health_thread;
        }
 
        /* Setup the dispatcher thread */
        ret = pthread_create(&dispatcher_thread, NULL,
                        relay_thread_dispatcher, (void *) NULL);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create dispatcher");
-               goto exit_dispatcher;
+               retval = -1;
+               goto exit_dispatcher_thread;
        }
 
        /* Setup the worker thread */
        ret = pthread_create(&worker_thread, NULL,
                        relay_thread_worker, (void *) relay_ctx);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create worker");
-               goto exit_worker;
+               retval = -1;
+               goto exit_worker_thread;
        }
 
        /* Setup the listener thread */
        ret = pthread_create(&listener_thread, NULL,
                        relay_thread_listener, (void *) NULL);
-       if (ret != 0) {
+       if (ret) {
+               errno = ret;
                PERROR("pthread_create listener");
-               goto exit_listener;
+               retval = -1;
+               goto exit_listener_thread;
        }
 
-       ret = live_start_threads(live_uri, relay_ctx);
-       if (ret != 0) {
+       ret = relayd_live_create(live_uri, relay_ctx);
+       if (ret) {
                ERR("Starting live viewer threads");
+               retval = -1;
                goto exit_live;
        }
 
+       /*
+        * This is where we start awaiting program completion (e.g. through
+        * signal that asks threads to teardown).
+        */
+
+       ret = relayd_live_join();
+       if (ret) {
+               retval = -1;
+       }
 exit_live:
+
        ret = pthread_join(listener_thread, &status);
-       if (ret != 0) {
-               PERROR("pthread_join");
-               goto error;     /* join error, exit without cleanup */
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_join listener_thread");
+               retval = -1;
        }
 
-exit_listener:
+exit_listener_thread:
        ret = pthread_join(worker_thread, &status);
-       if (ret != 0) {
-               PERROR("pthread_join");
-               goto error;     /* join error, exit without cleanup */
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_join worker_thread");
+               retval = -1;
        }
 
-exit_worker:
+exit_worker_thread:
        ret = pthread_join(dispatcher_thread, &status);
-       if (ret != 0) {
-               PERROR("pthread_join");
-               goto error;     /* join error, exit without cleanup */
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_join dispatcher_thread");
+               retval = -1;
        }
+exit_dispatcher_thread:
 
-exit_dispatcher:
        ret = pthread_join(health_thread, &status);
-       if (ret != 0) {
-               PERROR("pthread_join health thread");
-               goto error;     /* join error, exit without cleanup */
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_join health_thread");
+               retval = -1;
        }
+exit_health_thread:
 
-       /*
-        * Stop live threads only after joining other threads.
-        */
-       live_stop_threads();
-
-health_error:
        utils_close_pipe(health_quit_pipe);
+exit_health_quit_pipe:
 
-error_health_pipe:
+exit_init_data:
        health_app_destroy(health_relayd);
-
 exit_health_app_create:
-       lttng_ht_destroy(viewer_streams_ht);
-
-exit_relay_ctx_viewer_streams:
-       lttng_ht_destroy(relay_streams_ht);
-
-exit_relay_ctx_streams:
-       lttng_ht_destroy(relay_ctx->sessions_ht);
-
-exit_relay_ctx_sessions:
-       free(relay_ctx);
+exit_options:
+       relayd_cleanup(relay_ctx);
 
-exit:
-       cleanup();
-       if (!ret) {
+       if (!retval) {
                exit(EXIT_SUCCESS);
+       } else {
+               exit(EXIT_FAILURE);
        }
-
-error:
-       exit(EXIT_FAILURE);
 }
This page took 0.032245 seconds and 4 git commands to generate.