X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=c84ebaac1c2c66b3d6f282aa6e1d69d161e16f23;hp=5a3bcb43fbc8aea671e4cc7b0a3c15451522181e;hb=298a25cae73bda8d8ad6fa986f7d0b822645559b;hpb=ce6877c939c5530e919011f7e34fe14f3d1fc090 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 5a3bcb43f..c84ebaac1 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1083,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index, return relay_index_set_data(index, &index_data); } +static bool session_streams_have_index(const struct relay_session *session) +{ + return session->minor >= 4 && !session->snapshot; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -1931,6 +1936,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_stream *stream; ssize_t send_ret; int ret; + uint64_t stream_seq; DBG("Data pending command received"); @@ -1958,12 +1964,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&stream->lock); - DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 - " and last_seq %" PRIu64, msg.stream_id, - stream->prev_seq, msg.last_net_seq_num); + if (session_streams_have_index(session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_seq; + } + DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64 + ", prev_index_seq %" PRIu64 + ", and last_seq %" PRIu64, msg.stream_id, + stream->prev_seq, stream->prev_index_seq, + msg.last_net_seq_num); /* Avoid wrapping issue */ - if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) { + if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) { /* Data has in fact been written and is NOT pending */ ret = 0; } else { @@ -2183,7 +2200,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&stream->lock); if (!stream->data_pending_check_done) { - if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_seq; + } + if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, stream->stream_handle); @@ -2314,12 +2342,18 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info.net_seq_num; } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -3217,9 +3251,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: @@ -3452,7 +3490,7 @@ static enum relay_connection_status relay_process_data_receive_payload( } - if (session->minor >= 4 && !session->snapshot) { + if (session_streams_have_index(session)) { ret = handle_index_data(stream, state->header.net_seq_num, state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size); if (ret < 0) { @@ -3473,6 +3511,7 @@ static enum relay_connection_status relay_process_data_receive_payload( if (index_flushed) { stream->pos_after_last_complete_data_index = stream->tracefile_size_current; + stream->prev_index_seq = state->header.net_seq_num; } stream->prev_seq = state->header.net_seq_num;