X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=a3b8016db46d157dc44f478553011f489b60c591;hp=d82a3412d7b8578be8f5b39abf0a9fd949ff07bc;hb=8bdee6e2bac74a577147046126628ff3b1b34930;hpb=cd2ef1ef1d54ced9e4d0d03b865bb7fc6a905f80 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index d82a3412d..a3b8016db 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -250,7 +251,10 @@ int set_option(int opt, const char *arg, const char *optname) if (arg) { lttng_opt_verbose = config_parse_value(arg); } else { - lttng_opt_verbose += 1; + /* Only 3 level of verbosity (-vvv). */ + if (lttng_opt_verbose < 3) { + lttng_opt_verbose += 1; + } } break; default: @@ -878,11 +882,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); } @@ -929,7 +934,7 @@ void *relay_thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *new_conn = NULL; DBG("[thread] Relay dispatcher started"); @@ -952,7 +957,8 @@ void *relay_thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head, + &relay_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the relay command queue"); /* Continue thread execution */ @@ -1201,6 +1207,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; + stream->ctf_stream_id = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); @@ -1340,18 +1347,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, session->stream_count--; assert(session->stream_count >= 0); - /* - * Remove the stream from the connection recv list since we are about to - * flag it invalid and thus might be freed. This has to be done here since - * only the control thread can do actions on that list. - * - * Note that this stream might NOT be in the list but we have to try to - * remove it here else this can race with the stream destruction freeing - * the object and the connection destroy doing a use after free when - * deleting the remaining nodes in this list. - */ - cds_list_del(&stream->recv_list); - /* Check if we can close it or else the data will do it. */ try_close_stream(session, stream); @@ -1971,6 +1966,9 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } copy_index_control_data(index, &index_info); + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = be64toh(index_info.stream_id); + } if (index_created) { /* @@ -2766,7 +2764,7 @@ int main(int argc, char **argv) } /* Init relay command queue. */ - cds_wfq_init(&relay_conn_queue.queue); + cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); /* Set up max poll set size */ lttng_poll_set_max_size();