Fix: take index sequence number into account for data pending check
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 12 Oct 2018 22:22:35 +0000 (18:22 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 12 Oct 2018 23:56:28 +0000 (19:56 -0400)
The data pending checks are only performed on the sequence number of
the received data. However, it is expected that the index of the
stream (when applicable) has been written to disk by the time this
check returns that no data is pending.

This patch ensures that the minimum between the data and index
sequence numbers are used to perform this check.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.c

index 92f9a5600ac28b7280dcea9f1041b7725b13b748..c84ebaac1c2c66b3d6f282aa6e1d69d161e16f23 100644 (file)
@@ -1083,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index,
        return relay_index_set_data(index, &index_data);
 }
 
+static bool session_streams_have_index(const struct relay_session *session)
+{
+       return session->minor >= 4 && !session->snapshot;
+}
+
 /*
  * Handle the RELAYD_CREATE_SESSION command.
  *
@@ -1931,6 +1936,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
        struct relay_stream *stream;
        ssize_t send_ret;
        int ret;
+       uint64_t stream_seq;
 
        DBG("Data pending command received");
 
@@ -1958,12 +1964,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&stream->lock);
 
-       DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
-                       " and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, msg.last_net_seq_num);
+       if (session_streams_have_index(session)) {
+               /*
+                * Ensure that both the index and stream data have been
+                * flushed up to the requested point.
+                */
+               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+       } else {
+               stream_seq = stream->prev_seq;
+       }
+       DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+                       ", prev_index_seq %" PRIu64
+                       ", and last_seq %" PRIu64, msg.stream_id,
+                       stream->prev_seq, stream->prev_index_seq,
+                       msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
-       if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
+       if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
                /* Data has in fact been written and is NOT pending */
                ret = 0;
        } else {
@@ -2183,7 +2200,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                }
                pthread_mutex_lock(&stream->lock);
                if (!stream->data_pending_check_done) {
-                       if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+                       uint64_t stream_seq;
+
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_seq;
+                       }
+                       if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
                                DBG("Data is still in flight for stream %" PRIu64,
                                                stream->stream_handle);
@@ -3462,7 +3490,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        }
 
 
-       if (session->minor >= 4 && !session->snapshot) {
+       if (session_streams_have_index(session)) {
                ret = handle_index_data(stream, state->header.net_seq_num,
                                state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
                if (ret < 0) {
This page took 0.027202 seconds and 4 git commands to generate.