X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=75f2d91931749dd78e72121a7130f0f78a324478;hb=e36fbffbebacb3d15f2b5b190f086bdde65766b1;hp=aeb061330dbc83d911df33fd1e428fb75c22c165;hpb=c510c1bb5ac7978a0967e52857c615924f7a0caa;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index aeb061330..75f2d9193 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1065,6 +1065,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; + stream->ctf_stream_id = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); @@ -1801,9 +1802,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); /* - * Only flag a stream inactive when it has already received data. + * Only flag a stream inactive when it has already received data + * and no indexes are in flight. */ - if (stream->total_index_received > 0) { + if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); } ret = 0; @@ -1820,9 +1822,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } index_created = 1; + stream->indexes_in_flight++; } copy_index_control_data(index, &index_info); + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = be64toh(index_info.stream_id); + } if (index_created) { /* @@ -1847,6 +1853,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } end_rcu_unlock: @@ -2010,6 +2018,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } index_created = 1; + stream->indexes_in_flight++; } if (rotate_index || stream->index_fd < 0) { @@ -2052,6 +2061,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } error: