X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=fa5a7db86c393640c15fe9e8b3bf5f652da98111;hp=3a6bebaf7d6d3dc1ab5f7cf4fe76522d6f5c56fd;hb=6e7241fe92e7b5db872d164c3e1a0f75ce5a1463;hpb=528f2ffaebbc88b3fd541fa404b567b878aa5255 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 3a6bebaf7..fa5a7db86 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -882,11 +882,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); } @@ -933,7 +934,7 @@ void *relay_thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *new_conn = NULL; DBG("[thread] Relay dispatcher started"); @@ -956,7 +957,8 @@ void *relay_thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head, + &relay_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the relay command queue"); /* Continue thread execution */ @@ -1942,9 +1944,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); /* - * Only flag a stream inactive when it has already received data. + * Only flag a stream inactive when it has already received data + * and no indexes are in flight. */ - if (stream->total_index_received > 0) { + if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); } ret = 0; @@ -1961,6 +1964,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } index_created = 1; + stream->indexes_in_flight++; } copy_index_control_data(index, &index_info); @@ -1991,6 +1995,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } end_rcu_unlock: @@ -2154,6 +2160,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } index_created = 1; + stream->indexes_in_flight++; } if (rotate_index || stream->index_fd < 0) { @@ -2196,6 +2203,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } error: @@ -2762,7 +2771,7 @@ int main(int argc, char **argv) } /* Init relay command queue. */ - cds_wfq_init(&relay_conn_queue.queue); + cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); /* Set up max poll set size */ lttng_poll_set_max_size();