Fix: take index seq number into account for rotation pending check
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index a04303e4f5814a47a1b7bce3eec973179f6d3866..f111b931c7792c3b3f627977bb7ed62e6999c1a4 100644 (file)
@@ -1083,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index,
        return relay_index_set_data(index, &index_data);
 }
 
+static bool session_streams_have_index(const struct relay_session *session)
+{
+       return session->minor >= 4 && !session->snapshot;
+}
+
 /*
  * Handle the RELAYD_CREATE_SESSION command.
  *
@@ -1198,6 +1203,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
+       struct relay_stream_chunk_id stream_chunk_id = { 0 };
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1237,7 +1243,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-                       channel_name, tracefile_size, tracefile_count);
+               channel_name, tracefile_size, tracefile_count,
+               &stream_chunk_id);
        path_name = NULL;
        channel_name = NULL;
 
@@ -1739,18 +1746,27 @@ static
 int try_rotate_stream(struct relay_stream *stream)
 {
        int ret = 0;
+       uint64_t trace_seq;
 
        /* No rotation expected. */
        if (stream->rotate_at_seq_num == -1ULL) {
                goto end;
        }
 
-       if (stream->prev_seq < stream->rotate_at_seq_num ||
-                       stream->prev_seq == -1ULL) {
-               DBG("Stream %" PRIu64 " no yet ready for rotation",
-                               stream->stream_handle);
+       trace_seq = min(stream->prev_seq, stream->prev_index_seq);
+       if (stream->prev_seq == -1ULL || stream->prev_index_seq == -1ULL ||
+                       trace_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_seq,
+                               stream->prev_index_seq);
                goto end;
        } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+               /*
+                * prev_seq is checked here since indexes and rotation
+                * commands are serialized with respect to each other.
+                */
                DBG("Rotation after too much data has been written in tracefile "
                                "for stream %" PRIu64 ", need to truncate before "
                                "rotating", stream->stream_handle);
@@ -1760,7 +1776,20 @@ int try_rotate_stream(struct relay_stream *stream)
                        goto end;
                }
        } else {
-               /* stream->prev_seq == stream->rotate_at_seq_num */
+               if (trace_seq != stream->rotate_at_seq_num) {
+                       /*
+                        * Unexpected, protocol error/bug.
+                        * It could mean that we received a rotation position
+                        * that is in the past.
+                        */
+                       ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_seq,
+                               stream->prev_index_seq);
+                       ret = -1;
+                       goto end;
+               }
                DBG("Stream %" PRIu64 " ready for rotation",
                                stream->stream_handle);
                ret = do_rotate_stream(stream);
@@ -1929,6 +1958,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
        struct relay_stream *stream;
        ssize_t send_ret;
        int ret;
+       uint64_t stream_seq;
 
        DBG("Data pending command received");
 
@@ -1956,12 +1986,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&stream->lock);
 
-       DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
-                       " and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, msg.last_net_seq_num);
+       if (session_streams_have_index(session)) {
+               /*
+                * Ensure that both the index and stream data have been
+                * flushed up to the requested point.
+                */
+               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+       } else {
+               stream_seq = stream->prev_seq;
+       }
+       DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+                       ", prev_index_seq %" PRIu64
+                       ", and last_seq %" PRIu64, msg.stream_id,
+                       stream->prev_seq, stream->prev_index_seq,
+                       msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
-       if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
+       if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
                /* Data has in fact been written and is NOT pending */
                ret = 0;
        } else {
@@ -2181,7 +2222,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                }
                pthread_mutex_lock(&stream->lock);
                if (!stream->data_pending_check_done) {
-                       if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+                       uint64_t stream_seq;
+
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_seq;
+                       }
+                       if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
                                DBG("Data is still in flight for stream %" PRIu64,
                                                stream->stream_handle);
@@ -2312,12 +2364,18 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
+               stream->prev_index_seq = index_info.net_seq_num;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
        } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
                ERR("relay_index_try_flush error %d", ret);
-               relay_index_put(index);
                ret = -1;
        }
 
@@ -2483,7 +2541,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       stream->chunk_id = stream_info.new_chunk_id;
+       assert(stream->current_chunk_id.is_set);
+       stream->current_chunk_id.value = stream_info.new_chunk_id;
 
        if (stream->is_metadata) {
                /*
@@ -2817,7 +2876,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        chunk_id = be64toh(msg.chunk_id);
 
-       DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+       DBG("Evaluating rotate pending for session \"%s\" and  chunk id %" PRIu64,
+                       session->session_name, chunk_id);
 
        /*
         * Iterate over all the streams in the session and check if they are
@@ -2839,7 +2899,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                        rotate_pending = true;
                        DBG("Stream %" PRIu64 " is still rotating",
                                        stream->stream_handle);
-               } else if (stream->chunk_id < chunk_id) {
+               } else if (stream->current_chunk_id.value < chunk_id) {
                        /*
                         * Stream closed on the consumer but still active on the
                         * relay.
@@ -3213,9 +3273,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                /* No flush. */
                ret = 0;
        } else {
-               /* Put self-ref for this index due to error. */
-               relay_index_put(index);
-               index = NULL;
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
 end:
@@ -3448,7 +3512,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        }
 
 
-       if (session->minor >= 4 && !session->snapshot) {
+       if (session_streams_have_index(session)) {
                ret = handle_index_data(stream, state->header.net_seq_num,
                                state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
                if (ret < 0) {
@@ -3469,6 +3533,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        if (index_flushed) {
                stream->pos_after_last_complete_data_index =
                                stream->tracefile_size_current;
+               stream->prev_index_seq = state->header.net_seq_num;
        }
 
        stream->prev_seq = state->header.net_seq_num;
This page took 0.026522 seconds and 4 git commands to generate.