Fix: make sure no index is in flight before using inactivity beacons
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 89454b1e34ea30955a50709758550bc37b7bad24..75f2d91931749dd78e72121a7130f0f78a324478 100644 (file)
@@ -1065,6 +1065,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->session_id = session->id;
        stream->index_fd = -1;
        stream->read_index_fd = -1;
+       stream->ctf_stream_id = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
 
@@ -1204,18 +1205,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
        session->stream_count--;
        assert(session->stream_count >= 0);
 
-       /*
-        * Remove the stream from the connection recv list since we are about to
-        * flag it invalid and thus might be freed. This has to be done here since
-        * only the control thread can do actions on that list.
-        *
-        * Note that this stream might NOT be in the list but we have to try to
-        * remove it here else this can race with the stream destruction freeing
-        * the object and the connection destroy doing a use after free when
-        * deleting the remaining nodes in this list.
-        */
-       cds_list_del(&stream->recv_list);
-
        /* Check if we can close it or else the data will do it. */
        try_close_stream(session, stream);
 
@@ -1813,9 +1802,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1832,9 +1822,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
+       if (stream->ctf_stream_id == -1ULL) {
+               stream->ctf_stream_id = be64toh(index_info.stream_id);
+       }
 
        if (index_created) {
                /*
@@ -1859,6 +1853,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
@@ -2022,6 +2018,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2064,6 +2061,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
This page took 0.02502 seconds and 4 git commands to generate.