Fix: split index and data file rotation logic
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index f111b931c7792c3b3f627977bb7ed62e6999c1a4..1e2e9050fef2d6d77ca78e9767a2fea759ba99be 100644 (file)
@@ -1539,7 +1539,8 @@ end:
  * Return 0 on success, -1 on error.
  */
 static
-int create_rotate_index_file(struct relay_stream *stream)
+int create_rotate_index_file(struct relay_stream *stream,
+               const char *stream_path)
 {
        int ret;
        uint32_t major, minor;
@@ -1551,7 +1552,7 @@ int create_rotate_index_file(struct relay_stream *stream)
        }
        major = stream->trace->session->major;
        minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream->path_name,
+       stream->index_file = lttng_index_file_create(stream_path,
                        stream->channel_name,
                        -1, -1, stream->tracefile_size,
                        tracefile_array_get_file_index_head(stream->tfa),
@@ -1569,10 +1570,12 @@ end:
 }
 
 static
-int do_rotate_stream(struct relay_stream *stream)
+int do_rotate_stream_data(struct relay_stream *stream)
 {
        int ret;
 
+       DBG("Rotating stream %" PRIu64 " data file",
+                       stream->stream_handle);
        /* Perform the stream rotation. */
        ret = utils_rotate_stream_file(stream->path_name,
                        stream->channel_name, stream->tracefile_size,
@@ -1584,19 +1587,17 @@ int do_rotate_stream(struct relay_stream *stream)
                goto end;
        }
        stream->tracefile_size_current = 0;
-
-       /* Rotate also the index if the stream is not a metadata stream. */
-       if (!stream->is_metadata) {
-               ret = create_rotate_index_file(stream);
-               if (ret < 0) {
-                       ERR("Failed to rotate index file");
-                       goto end;
-               }
-       }
-
-       stream->rotate_at_seq_num = -1ULL;
        stream->pos_after_last_complete_data_index = 0;
+       stream->data_rotated = true;
 
+       if (stream->data_rotated && stream->index_rotated) {
+               /* Rotation completed; reset its state. */
+               DBG("Rotation completed for stream %" PRIu64,
+                               stream->stream_handle);
+               stream->rotate_at_seq_num = -1ULL;
+               stream->data_rotated = false;
+               stream->index_rotated = false;
+       }
 end:
        return ret;
 }
@@ -1608,9 +1609,7 @@ end:
  * connections are separate, the indexes as well as the commands arrive from
  * the control connection and we have no control over the order so we could be
  * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives. We don't need
- * to update the index because its order is guaranteed with the rotation
- * command message.
+ * before the rotation command on the control connection arrives.
  */
 static
 int rotate_truncate_stream(struct relay_stream *stream)
@@ -1709,12 +1708,6 @@ int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
-       ret = create_rotate_index_file(stream);
-       if (ret < 0) {
-               ERR("Rotate stream index file");
-               goto end;
-       }
-
        /*
         * Update the offset and FD of all the eventual indexes created by the
         * data connection before the rotation command arrived.
@@ -1737,34 +1730,98 @@ end:
 }
 
 /*
- * Check if a stream should perform a rotation (for session rotation).
+ * Check if a stream's index file should be rotated (for session rotation).
  * Must be called with the stream lock held.
  *
  * Return 0 on success, a negative value on error.
  */
 static
-int try_rotate_stream(struct relay_stream *stream)
+int try_rotate_stream_index(struct relay_stream *stream)
 {
        int ret = 0;
-       uint64_t trace_seq;
 
-       /* No rotation expected. */
        if (stream->rotate_at_seq_num == -1ULL) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->index_rotated) {
+               /* Rotation of the index has already occurred. */
                goto end;
        }
 
-       trace_seq = min(stream->prev_seq, stream->prev_index_seq);
-       if (stream->prev_seq == -1ULL || stream->prev_index_seq == -1ULL ||
-                       trace_seq < stream->rotate_at_seq_num) {
-               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+       if (stream->prev_index_seq == -1ULL ||
+                       stream->prev_index_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
                                stream->stream_handle,
                                stream->rotate_at_seq_num,
-                               stream->prev_seq,
                                stream->prev_index_seq);
                goto end;
-       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+       } else if (stream->prev_index_seq != stream->rotate_at_seq_num) {
                /*
-                * prev_seq is checked here since indexes and rotation
+                * Unexpected, protocol error/bug.
+                * It could mean that we received a rotation position
+                * that is in the past.
+                */
+               ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq,
+                               stream->prev_index_seq);
+               ret = -1;
+               goto end;
+       } else {
+               DBG("Rotating stream %" PRIu64 " index file",
+                               stream->stream_handle);
+               ret = create_rotate_index_file(stream, stream->path_name);
+               stream->index_rotated = true;
+
+               if (stream->data_rotated && stream->index_rotated) {
+                       /* Rotation completed; reset its state. */
+                       DBG("Rotation completed for stream %" PRIu64,
+                                       stream->stream_handle);
+                       stream->rotate_at_seq_num = -1ULL;
+                       stream->data_rotated = false;
+                       stream->index_rotated = false;
+               }
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream's data file (as opposed to index) should be rotated
+ * (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int try_rotate_stream_data(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       if (stream->rotate_at_seq_num == -1ULL) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->data_rotated) {
+               /* Rotation of the data file has already occurred. */
+               goto end;
+       }
+
+       if (stream->prev_data_seq == -1ULL ||
+                       stream->prev_data_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq);
+               goto end;
+       } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
+               /*
+                * prev_data_seq is checked here since indexes and rotation
                 * commands are serialized with respect to each other.
                 */
                DBG("Rotation after too much data has been written in tracefile "
@@ -1775,24 +1832,20 @@ int try_rotate_stream(struct relay_stream *stream)
                        ERR("Failed to truncate stream");
                        goto end;
                }
-       } else {
-               if (trace_seq != stream->rotate_at_seq_num) {
-                       /*
-                        * Unexpected, protocol error/bug.
-                        * It could mean that we received a rotation position
-                        * that is in the past.
-                        */
-                       ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+       } else if (stream->prev_data_seq != stream->rotate_at_seq_num) {
+               /*
+                * Unexpected, protocol error/bug.
+                * It could mean that we received a rotation position
+                * that is in the past.
+                */
+               ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
                                stream->stream_handle,
                                stream->rotate_at_seq_num,
-                               stream->prev_seq,
-                               stream->prev_index_seq);
-                       ret = -1;
-                       goto end;
-               }
-               DBG("Stream %" PRIu64 " ready for rotation",
-                               stream->stream_handle);
-               ret = do_rotate_stream(stream);
+                               stream->prev_data_seq);
+               ret = -1;
+               goto end;
+       } else {
+               ret = do_rotate_stream_data(stream);
        }
 
 end:
@@ -1863,7 +1916,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
-       ret = try_rotate_stream(metadata_stream);
+       ret = try_rotate_stream_data(metadata_stream);
        if (ret < 0) {
                goto end_put;
        }
@@ -1991,14 +2044,14 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                 * Ensure that both the index and stream data have been
                 * flushed up to the requested point.
                 */
-               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
        } else {
-               stream_seq = stream->prev_seq;
+               stream_seq = stream->prev_data_seq;
        }
-       DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+       DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64
                        ", prev_index_seq %" PRIu64
                        ", and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, stream->prev_index_seq,
+                       stream->prev_data_seq, stream->prev_index_seq,
                        msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
@@ -2229,9 +2282,9 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                                 * Ensure that both the index and stream data have been
                                 * flushed up to the requested point.
                                 */
-                               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+                               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
                        } else {
-                               stream_seq = stream->prev_seq;
+                               stream_seq = stream->prev_data_seq;
                        }
                        if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
@@ -2365,6 +2418,11 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info.net_seq_num;
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_put;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2526,7 +2584,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
         * Update the trace path (just the folder, the stream name does not
         * change).
         */
-       free(stream->path_name);
+       free(stream->prev_path_name);
+       stream->prev_path_name = stream->path_name;
        stream->path_name = create_output_path(new_path_view.data);
        if (!stream->path_name) {
                ERR("Failed to create a new output path");
@@ -2545,18 +2604,28 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
        stream->current_chunk_id.value = stream_info.new_chunk_id;
 
        if (stream->is_metadata) {
+               /*
+                * Metadata streams have no index; consider its rotation
+                * complete.
+                */
+               stream->index_rotated = true;
                /*
                 * The metadata stream is sent only over the control connection
                 * so we know we have all the data to perform the stream
                 * rotation.
                 */
-               ret = do_rotate_stream(stream);
+               ret = do_rotate_stream_data(stream);
        } else {
                stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
-               ret = try_rotate_stream(stream);
-       }
-       if (ret < 0) {
-               goto end_stream_unlock;
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
        }
 
 end_stream_unlock:
@@ -2650,6 +2719,7 @@ static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path);
        ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
        if (ret < 0) {
                ERR("relay creating output directory");
@@ -2773,6 +2843,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"",
+                       old_path_view.data, new_path_view.data);
        complete_old_path = create_output_path(old_path_view.data);
        if (!complete_old_path) {
                ERR("Failed to build old output path in rotate_rename command");
@@ -2786,6 +2858,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+       DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"",
+                       complete_old_path, complete_new_path);
 
        ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
                        -1, -1);
@@ -3245,7 +3319,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        }
 
        if (rotate_index || !stream->index_file) {
-               ret = create_rotate_index_file(stream);
+               const char *stream_path;
+
+               /*
+                * The data connection creates the stream's first index file.
+                *
+                * This can happen _after_ a ROTATE_STREAM command. In
+                * other words, the data of the first packet of this stream
+                * can be received after a ROTATE_STREAM command.
+                *
+                * The ROTATE_STREAM command changes the stream's path_name
+                * to point to the "next" chunk. If a rotation is pending for
+                * this stream, as indicated by "rotate_at_seq_num != -1ULL",
+                * it means that we are still receiving data that belongs in the
+                * stream's former path.
+                *
+                * In this very specific case, we must ensure that the index
+                * file is created in the streams's former path,
+                * "prev_path_name".
+                *
+                * All other rotations beyond the first one are not affected
+                * by this problem since the actual rotation operation creates
+                * the new chunk's index file.
+                */
+               stream_path = stream->rotate_at_seq_num == -1ULL ?
+                               stream->path_name:
+                               stream->prev_path_name;
+
+               ret = create_rotate_index_file(stream, stream_path);
                if (ret < 0) {
                        ERR("Failed to rotate index");
                        /* Put self-ref for this index due to error. */
@@ -3527,16 +3628,20 @@ static enum relay_connection_status relay_process_data_receive_payload(
        stream->tracefile_size_current += state->header.data_size +
                        state->header.padding_size;
 
-       if (stream->prev_seq == -1ULL) {
+       if (stream->prev_data_seq == -1ULL) {
                new_stream = true;
        }
        if (index_flushed) {
                stream->pos_after_last_complete_data_index =
                                stream->tracefile_size_current;
                stream->prev_index_seq = state->header.net_seq_num;
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end_stream_unlock;
+               }
        }
 
-       stream->prev_seq = state->header.net_seq_num;
+       stream->prev_data_seq = state->header.net_seq_num;
 
        /*
         * Resetting the protocol state (to RECEIVE_HEADER) will trash the
@@ -3546,7 +3651,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        connection_reset_protocol_state(conn);
        state = NULL;
 
-       ret = try_rotate_stream(stream);
+       ret = try_rotate_stream_data(stream);
        if (ret < 0) {
                status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
This page took 0.02832 seconds and 4 git commands to generate.