}
status = lttng_trace_chunk_open_file(
- trace_chunk, stream_path, flags, mode, &fd);
+ trace_chunk, stream_path, flags, mode, &fd, false);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
{
int ret = 0;
- DBG("Rotating stream %" PRIu64 " data file",
- stream->stream_handle);
+ DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
+ stream->stream_handle, stream->tracefile_size_current);
if (stream->stream_fd) {
stream_fd_put(stream->stream_fd);
goto end;
}
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->pos_after_last_complete_data_index = 0;
stream->ongoing_rotation.value.data_rotated = true;
struct stream_fd *previous_stream_fd = NULL;
struct lttng_trace_chunk *previous_chunk = NULL;
- if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+ if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
ERR("Protocol error encoutered in %s(): stream rotation "
"sequence number is before the current sequence number "
"and the next trace chunk is unset. Honoring this "
goto end;
}
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
+ stream->prev_data_seq);
+
if (stream->prev_data_seq == -1ULL ||
- stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+ stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+ stream->prev_data_seq <
+ stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* The next packet that will be written is not part of the next
* chunk yet.
*/
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+ DBG("Stream %" PRIu64 " data not yet ready for rotation "
+ "(rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
", prev_data_seq = %" PRIu64 ")",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
stream->prev_data_seq);
goto end;
- } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+ } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* prev_data_seq is checked here since indexes and rotation
* commands are serialized with respect to each other.
ret = -1;
goto end;
}
- stream->index_file = lttng_index_file_create_from_trace_chunk(
+ status = lttng_index_file_create_from_trace_chunk(
chunk, stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->tracefile_current_index,
lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor), true);
- if (!stream->index_file) {
+ lttng_to_index_minor(major, minor), true,
+ &stream->index_file);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end;
}
goto end;
}
- if (stream->prev_index_seq == -1ULL ||
- stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
- DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
+
+ if (!stream->received_packet_seq_num.is_set ||
+ LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+ stream->ongoing_rotation.value.packet_seq_num) {
+ DBG("Stream %" PRIu64 " index not yet ready for rotation "
+ "(rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
- stream->prev_index_seq);
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
goto end;
} else {
- /* The next index belongs to the new trace chunk; rotate. */
- assert(stream->prev_index_seq + 1 ==
- stream->ongoing_rotation.value.seq_num);
+ /*
+ * The next index belongs to the new trace chunk; rotate.
+ * In overwrite mode, the packet seq num may jump over the
+ * rotation position.
+ */
+ assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+ stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
ret = create_index_file(stream,
stream->ongoing_rotation.value.next_trace_chunk);
stream->ongoing_rotation.value.index_rotated = true;
+ /*
+ * Set the rotation pivot position for the data, now that we have the
+ * net_seq_num matching the packet_seq_num index pivot position.
+ */
+ stream->ongoing_rotation.value.prev_data_net_seq =
+ stream->prev_index_seq;
if (stream->ongoing_rotation.value.data_rotated &&
stream->ongoing_rotation.value.index_rotated) {
/* Rotation completed; reset its state. */
{
int ret = 0;
const struct relay_stream_rotation rotation = {
- .seq_num = rotation_sequence_number,
+ .data_rotated = false,
+ .index_rotated = false,
+ .packet_seq_num = rotation_sequence_number,
+ .prev_data_net_seq = -1ULL,
.next_trace_chunk = next_trace_chunk,
};
}
LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
- DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+ DBG("Setting pending rotation: stream_id = %" PRIu64
+ ", rotate_at_packet_seq_num = %" PRIu64,
stream->stream_handle, rotation_sequence_number);
if (stream->is_metadata) {
/*
stream->ongoing_rotation.value.index_rotated = true;
ret = stream_rotate_data_file(stream);
} else {
- ret = try_rotate_stream_data(stream);
+ ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
- ret = try_rotate_stream_index(stream);
+ ret = try_rotate_stream_data(stream);
if (ret < 0) {
goto end;
}
* Reset current size because we just performed a stream
* rotation.
*/
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
*file_rotated = true;
} else {
}
if (stream->is_metadata) {
- stream->metadata_received += packet ? packet->size : 0;
- stream->metadata_received += padding_len;
+ size_t recv_len;
+
+ recv_len = packet ? packet->size : 0;
+ recv_len += padding_len;
+ stream->metadata_received += recv_len;
+ if (recv_len) {
+ stream->no_new_metadata_notified = false;
+ }
}
DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
ret = relay_index_try_flush(index);
if (ret == 0) {
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ be64toh(index->index_data.packet_seq_num));
*flushed = true;
} else if (ret > 0) {
index->total_size = total_size;
ASSERT_LOCKED(stream->lock);
+ DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
+
/* Live beacon handling */
if (index_info->packet_size == 0) {
DBG("Received live beacon for stream %" PRIu64,
ret = relay_index_try_flush(index);
if (ret == 0) {
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
stream->prev_index_seq = index_info->net_seq_num;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ index_info->packet_seq_num);
ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
stream->stream_fd = NULL;
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->prev_data_seq = 0;
stream->prev_index_seq = 0;