Fix: make sure no index is in flight before using inactivity beacons
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 3a6bebaf7d6d3dc1ab5f7cf4fe76522d6f5c56fd..fa5a7db86c393640c15fe9e8b3bf5f652da98111 100644 (file)
@@ -882,11 +882,12 @@ restart:
                                new_conn->sock = newsock;
 
                                /* Enqueue request for the dispatcher thread. */
-                               cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode);
+                               cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
+                                                &new_conn->qnode);
 
                                /*
                                 * Wake the dispatch queue futex. Implicit memory barrier with
-                                * the exchange in cds_wfq_enqueue.
+                                * the exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
                        }
@@ -933,7 +934,7 @@ void *relay_thread_dispatcher(void *data)
 {
        int err = -1;
        ssize_t ret;
-       struct cds_wfq_node *node;
+       struct cds_wfcq_node *node;
        struct relay_connection *new_conn = NULL;
 
        DBG("[thread] Relay dispatcher started");
@@ -956,7 +957,8 @@ void *relay_thread_dispatcher(void *data)
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue);
+                       node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
+                                                        &relay_conn_queue.tail);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the relay command queue");
                                /* Continue thread execution */
@@ -1942,9 +1944,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1961,6 +1964,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
@@ -1991,6 +1995,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
@@ -2154,6 +2160,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2196,6 +2203,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
@@ -2762,7 +2771,7 @@ int main(int argc, char **argv)
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&relay_conn_queue.queue);
+       cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
 
        /* Set up max poll set size */
        lttng_poll_set_max_size();
This page took 0.024051 seconds and 4 git commands to generate.