Fix: take index sequence number into account for data pending check
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 92f9a5600ac28b7280dcea9f1041b7725b13b748..c84ebaac1c2c66b3d6f282aa6e1d69d161e16f23 100644 (file)
@@ -1083,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index,
        return relay_index_set_data(index, &index_data);
 }
 
+static bool session_streams_have_index(const struct relay_session *session)
+{
+       return session->minor >= 4 && !session->snapshot;
+}
+
 /*
  * Handle the RELAYD_CREATE_SESSION command.
  *
@@ -1931,6 +1936,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
        struct relay_stream *stream;
        ssize_t send_ret;
        int ret;
+       uint64_t stream_seq;
 
        DBG("Data pending command received");
 
@@ -1958,12 +1964,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&stream->lock);
 
-       DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
-                       " and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, msg.last_net_seq_num);
+       if (session_streams_have_index(session)) {
+               /*
+                * Ensure that both the index and stream data have been
+                * flushed up to the requested point.
+                */
+               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+       } else {
+               stream_seq = stream->prev_seq;
+       }
+       DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+                       ", prev_index_seq %" PRIu64
+                       ", and last_seq %" PRIu64, msg.stream_id,
+                       stream->prev_seq, stream->prev_index_seq,
+                       msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
-       if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
+       if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
                /* Data has in fact been written and is NOT pending */
                ret = 0;
        } else {
@@ -2183,7 +2200,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                }
                pthread_mutex_lock(&stream->lock);
                if (!stream->data_pending_check_done) {
-                       if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+                       uint64_t stream_seq;
+
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_seq;
+                       }
+                       if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
                                DBG("Data is still in flight for stream %" PRIu64,
                                                stream->stream_handle);
@@ -3462,7 +3490,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        }
 
 
-       if (session->minor >= 4 && !session->snapshot) {
+       if (session_streams_have_index(session)) {
                ret = handle_index_data(stream, state->header.net_seq_num,
                                state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
                if (ret < 0) {
This page took 0.024596 seconds and 4 git commands to generate.