+
+ ASSERT_LOCKED(stream->lock);
+ /*
+ * Acquire a reference to the current trace chunk to ensure
+ * it is not reclaimed when `stream_rotate_data_file` is called.
+ * Failing to do so would violate the contract of the trace
+ * chunk API as an active file descriptor would outlive the
+ * trace chunk.
+ */
+ acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
+ LTTNG_ASSERT(acquired_reference);
+ previous_chunk = stream->trace_chunk;
+
+ /*
+ * Steal the stream's reference to its stream_fd. A new
+ * stream_fd will be created when the rotation completes and
+ * the orinal stream_fd will be used to copy the "extra" data
+ * to the new file.
+ */
+ LTTNG_ASSERT(stream->file);
+ previous_stream_file = stream->file;
+ stream->file = NULL;
+
+ LTTNG_ASSERT(!stream->is_metadata);
+ LTTNG_ASSERT(stream->tracefile_size_current >
+ stream->pos_after_last_complete_data_index);
+ misplaced_data_size = stream->tracefile_size_current -
+ stream->pos_after_last_complete_data_index;
+ copy_bytes_left = misplaced_data_size;
+ previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
+
+ ret = stream_rotate_data_file(stream);
+ if (ret) {
+ goto end;
+ }
+
+ LTTNG_ASSERT(stream->file);
+ /*
+ * Seek the current tracefile to the position at which the rotation
+ * should have occurred.
+ */
+ lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
+ if (lseek_ret < 0) {
+ PERROR("Failed to seek to offset %" PRIu64
+ " while copying extra data received before a stream rotation",
+ (uint64_t) previous_stream_copy_origin);
+ ret = -1;
+ goto end;
+ }
+
+ /* Move data from the old file to the new file. */
+ while (copy_bytes_left) {
+ ssize_t io_ret;
+ char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
+ const off_t copy_size_this_pass = min_t(
+ off_t, copy_bytes_left, sizeof(copy_buffer));
+
+ io_ret = fs_handle_read(previous_stream_file, copy_buffer,
+ copy_size_this_pass);
+ if (io_ret < (ssize_t) copy_size_this_pass) {
+ if (io_ret == -1) {
+ PERROR("Failed to read %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ } else {
+ ERR("Failed to read %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ io_ret = fs_handle_write(
+ stream->file, copy_buffer, copy_size_this_pass);
+ if (io_ret < (ssize_t) copy_size_this_pass) {
+ if (io_ret == -1) {
+ PERROR("Failed to write %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ } else {
+ ERR("Failed to write %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ }
+ ret = -1;
+ goto end;
+ }
+ copy_bytes_left -= copy_size_this_pass;
+ }
+
+ /* Truncate the file to get rid of the excess data. */
+ ret = fs_handle_truncate(
+ previous_stream_file, previous_stream_copy_origin);
+ if (ret) {
+ PERROR("Failed to truncate current stream file to offset %" PRIu64,
+ previous_stream_copy_origin);
+ goto end;
+ }
+
+ /*
+ * Update the offset and FD of all the eventual indexes created by the
+ * data connection before the rotation command arrived.
+ */
+ ret = relay_index_switch_all_files(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end;
+ }
+
+ stream->tracefile_size_current = misplaced_data_size;
+ /* Index and data contents are back in sync. */
+ stream->pos_after_last_complete_data_index = 0;
+ ret = 0;
+end:
+ lttng_trace_chunk_put(previous_chunk);
+ return ret;
+}
+
+/*
+ * Check if a stream's data file (as opposed to index) should be rotated
+ * (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static int try_rotate_stream_data(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ if (caa_likely(!stream->ongoing_rotation.is_set)) {
+ /* No rotation expected. */
+ goto end;
+ }
+
+ if (stream->ongoing_rotation.value.data_rotated) {
+ /* Rotation of the data file has already occurred. */
+ goto end;
+ }
+
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
+ stream->prev_data_seq);
+
+ if (stream->prev_data_seq == -1ULL ||
+ stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+ stream->prev_data_seq <
+ stream->ongoing_rotation.value.prev_data_net_seq) {
+ /*
+ * The next packet that will be written is not part of the next
+ * chunk yet.
+ */
+ DBG("Stream %" PRIu64 " data not yet ready for rotation "
+ "(rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
+ stream->prev_data_seq);
+ goto end;
+ } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
+ /*
+ * prev_data_seq is checked here since indexes and rotation
+ * commands are serialized with respect to each other.
+ */
+ DBG("Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating", stream->stream_handle);
+ ret = rotate_truncate_stream(stream);
+ if (ret) {
+ ERR("Failed to truncate stream");
+ goto end;
+ }
+ } else {
+ ret = stream_rotate_data_file(stream);
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Close the current index file if it is open, and create a new one.
+ *
+ * Return 0 on success, -1 on error.
+ */
+static int create_index_file(struct relay_stream *stream,
+ struct lttng_trace_chunk *chunk)
+{
+ int ret;
+ uint32_t major, minor;
+ char *index_subpath = NULL;
+ enum lttng_trace_chunk_status status;
+
+ ASSERT_LOCKED(stream->lock);
+
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+
+ if (!chunk) {
+ ret = 0;
+ goto end;
+ }
+ ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
+ DEFAULT_INDEX_DIR);
+ if (ret < 0) {
+ goto end;
+ }
+
+ status = lttng_trace_chunk_create_subdirectory(chunk,
+ index_subpath);
+ free(index_subpath);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+ status = lttng_index_file_create_from_trace_chunk(
+ chunk, stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_current_index,
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor), true,
+ &stream->index_file);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if a stream's index file should be rotated (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static int try_rotate_stream_index(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ if (!stream->ongoing_rotation.is_set) {
+ /* No rotation expected. */
+ goto end;
+ }
+
+ if (stream->ongoing_rotation.value.index_rotated) {
+ /* Rotation of the index has already occurred. */
+ goto end;
+ }
+
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
+
+ if (!stream->received_packet_seq_num.is_set ||
+ LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+ stream->ongoing_rotation.value.packet_seq_num) {
+ DBG("Stream %" PRIu64 " index not yet ready for rotation "
+ "(rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+ stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
+ goto end;
+ } else {
+ /*
+ * The next index belongs to the new trace chunk; rotate.
+ * In overwrite mode, the packet seq num may jump over the
+ * rotation position.
+ */
+ LTTNG_ASSERT(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+ stream->ongoing_rotation.value.packet_seq_num);
+ DBG("Rotating stream %" PRIu64 " index file",
+ stream->stream_handle);
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ stream->ongoing_rotation.value.index_rotated = true;
+
+ /*
+ * Set the rotation pivot position for the data, now that we have the
+ * net_seq_num matching the packet_seq_num index pivot position.
+ */
+ stream->ongoing_rotation.value.prev_data_net_seq =
+ stream->prev_index_seq;
+ if (stream->ongoing_rotation.value.data_rotated &&
+ stream->ongoing_rotation.value.index_rotated) {
+ /* Rotation completed; reset its state. */
+ DBG("Rotation completed for stream %" PRIu64,
+ stream->stream_handle);
+ stream_complete_rotation(stream);
+ }
+ }
+