Cleanup: spaghetti function return path
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 93b08fcaa4a32cddfb4deef66682e8a1b8ae6135..f7c24b896ee24e54db183928992b9c4f88d494a1 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <getopt.h>
 #include <grp.h>
 #include <limits.h>
@@ -233,6 +234,11 @@ int set_option(int opt, const char *arg, const char *optname)
                break;
        case 'g':
                tracing_group_name = strdup(arg);
+               if (tracing_group_name == NULL) {
+                       ret = -errno;
+                       PERROR("strdup");
+                       goto end;
+               }
                tracing_group_name_override = 1;
                break;
        case 'h':
@@ -251,7 +257,10 @@ int set_option(int opt, const char *arg, const char *optname)
                if (arg) {
                        lttng_opt_verbose = config_parse_value(arg);
                } else {
-                       lttng_opt_verbose += 1;
+                       /* Only 3 level of verbosity (-vvv). */
+                       if (lttng_opt_verbose < 3) {
+                               lttng_opt_verbose += 1;
+                       }
                }
                break;
        default:
@@ -879,11 +888,12 @@ restart:
                                new_conn->sock = newsock;
 
                                /* Enqueue request for the dispatcher thread. */
-                               cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode);
+                               cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
+                                                &new_conn->qnode);
 
                                /*
                                 * Wake the dispatch queue futex. Implicit memory barrier with
-                                * the exchange in cds_wfq_enqueue.
+                                * the exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
                        }
@@ -930,7 +940,7 @@ void *relay_thread_dispatcher(void *data)
 {
        int err = -1;
        ssize_t ret;
-       struct cds_wfq_node *node;
+       struct cds_wfcq_node *node;
        struct relay_connection *new_conn = NULL;
 
        DBG("[thread] Relay dispatcher started");
@@ -953,7 +963,8 @@ void *relay_thread_dispatcher(void *data)
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue);
+                       node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
+                                                        &relay_conn_queue.tail);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the relay command queue");
                                /* Continue thread execution */
@@ -1202,6 +1213,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->session_id = session->id;
        stream->index_fd = -1;
        stream->read_index_fd = -1;
+       stream->ctf_stream_id = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
 
@@ -1339,7 +1351,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
        stream->close_flag = 1;
        session->stream_count--;
-       assert(session->stream_count >= 0);
 
        /* Check if we can close it or else the data will do it. */
        try_close_stream(session, stream);
@@ -1938,9 +1949,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1957,9 +1969,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
+       if (stream->ctf_stream_id == -1ULL) {
+               stream->ctf_stream_id = be64toh(index_info.stream_id);
+       }
 
        if (index_created) {
                /*
@@ -1984,6 +2000,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
@@ -2147,6 +2165,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2189,6 +2208,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
@@ -2412,6 +2433,7 @@ void *relay_thread_worker(void *data)
        struct lttcomm_relayd_hdr recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
        struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+       struct relay_index *index;
 
        DBG("[thread] Relay worker started");
 
@@ -2600,6 +2622,7 @@ restart:
 
                                if (revents & LPOLLIN) {
                                        if (conn->type != RELAY_DATA) {
+                                               rcu_read_unlock();
                                                continue;
                                        }
 
@@ -2643,6 +2666,14 @@ error:
        }
        rcu_read_unlock();
 error_poll_create:
+       rcu_read_lock();
+       cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+                       index_n.node) {
+               health_code_update();
+               relay_index_delete(index);
+               relay_index_free_safe(index);
+       }
+       rcu_read_unlock();
        lttng_ht_destroy(indexes_ht);
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
@@ -2755,7 +2786,7 @@ int main(int argc, char **argv)
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&relay_conn_queue.queue);
+       cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
 
        /* Set up max poll set size */
        lttng_poll_set_max_size();
This page took 0.025589 seconds and 4 git commands to generate.