-/*
- * Append padding to the file pointed by the file descriptor fd.
- */
-static int write_padding_to_file(int fd, uint32_t size)
-{
- ssize_t ret = 0;
- char *zeros;
-
- if (size == 0) {
- goto end;
- }
-
- zeros = zmalloc(size);
- if (zeros == NULL) {
- PERROR("zmalloc zeros for padding");
- ret = -1;
- goto end;
- }
-
- ret = lttng_write(fd, zeros, size);
- if (ret < size) {
- PERROR("write padding to file");
- }
-
- free(zeros);
-
-end:
- return ret;
-}
-
-/*
- * Close the current index file if it is open, and create a new one.
- *
- * Return 0 on success, -1 on error.
- */
-static
-int create_rotate_index_file(struct relay_stream *stream,
- const char *stream_path)
-{
- int ret;
- uint32_t major, minor;
-
- /* Put ref on previous index_file. */
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
- }
- major = stream->trace->session->major;
- minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream_path,
- stream->channel_name,
- -1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa),
- lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor));
- if (!stream->index_file) {
- ret = -1;
- goto end;
- }
-
- ret = 0;
-
-end:
- return ret;
-}
-
-static
-int do_rotate_stream_data(struct relay_stream *stream)
-{
- int ret;
-
- DBG("Rotating stream %" PRIu64 " data file",
- stream->stream_handle);
- /* Perform the stream rotation. */
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- NULL, &stream->stream_fd->fd);
- if (ret < 0) {
- ERR("Rotating stream output file");
- goto end;
- }
- stream->tracefile_size_current = 0;
- stream->pos_after_last_complete_data_index = 0;
- stream->data_rotated = true;
-
- if (stream->data_rotated && stream->index_rotated) {
- /* Rotation completed; reset its state. */
- DBG("Rotation completed for stream %" PRIu64,
- stream->stream_handle);
- stream->rotate_at_seq_num = -1ULL;
- stream->data_rotated = false;
- stream->index_rotated = false;
- }
-end:
- return ret;
-}
-
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives.
- */
-static
-int rotate_truncate_stream(struct relay_stream *stream)
-{
- int ret, new_fd;
- off_t lseek_ret;
- uint64_t diff, pos = 0;
- char buf[FILE_COPY_BUFFER_SIZE];
-
- assert(!stream->is_metadata);
-
- assert(stream->tracefile_size_current >
- stream->pos_after_last_complete_data_index);
- diff = stream->tracefile_size_current -
- stream->pos_after_last_complete_data_index;
-
- /* Create the new tracefile. */
- new_fd = utils_create_stream_file(stream->path_name,
- stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
- if (new_fd < 0) {
- ERR("Failed to create new stream file at path %s for channel %s",
- stream->path_name, stream->channel_name);
- ret = -1;
- goto end;
- }
-
- /*
- * Rewind the current tracefile to the position at which the rotation
- * should have occured.
- */
- lseek_ret = lseek(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index, SEEK_SET);
- if (lseek_ret < 0) {
- PERROR("seek truncate stream");
- ret = -1;
- goto end;
- }
-
- /* Move data from the old file to the new file. */
- while (pos < diff) {
- uint64_t count, bytes_left;
- ssize_t io_ret;
-
- bytes_left = diff - pos;
- count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
- assert(count <= SIZE_MAX);
-
- io_ret = lttng_read(stream->stream_fd->fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, stream->stream_fd->fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- io_ret = lttng_write(new_fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, new_fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- pos += count;
- }
-
- /* Truncate the file to get rid of the excess data. */
- ret = ftruncate(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index);
- if (ret) {
- PERROR("ftruncate");
- goto end;
- }
-
- ret = close(stream->stream_fd->fd);
- if (ret < 0) {
- PERROR("Closing tracefile");
- goto end;
- }
-
- /*
- * Update the offset and FD of all the eventual indexes created by the
- * data connection before the rotation command arrived.
- */
- ret = relay_index_switch_all_files(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end;
- }
-
- stream->stream_fd->fd = new_fd;
- stream->tracefile_size_current = diff;
- stream->pos_after_last_complete_data_index = 0;
- stream->rotate_at_seq_num = -1ULL;
-
- ret = 0;
-
-end:
- return ret;
-}
-
-/*
- * Check if a stream's index file should be rotated (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream_index(struct relay_stream *stream)
-{
- int ret = 0;
-
- if (stream->rotate_at_seq_num == -1ULL) {
- /* No rotation expected. */
- goto end;
- }
-
- if (stream->index_rotated) {
- /* Rotation of the index has already occurred. */
- goto end;
- }
-
- if (stream->prev_index_seq == -1ULL ||
- stream->prev_index_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_index_seq);
- goto end;
- } else if (stream->prev_index_seq != stream->rotate_at_seq_num) {
- /*
- * Unexpected, protocol error/bug.
- * It could mean that we received a rotation position
- * that is in the past.
- */
- ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq,
- stream->prev_index_seq);
- ret = -1;
- goto end;
- } else {
- DBG("Rotating stream %" PRIu64 " index file",
- stream->stream_handle);
- ret = create_rotate_index_file(stream, stream->path_name);
- stream->index_rotated = true;
-
- if (stream->data_rotated && stream->index_rotated) {
- /* Rotation completed; reset its state. */
- DBG("Rotation completed for stream %" PRIu64,
- stream->stream_handle);
- stream->rotate_at_seq_num = -1ULL;
- stream->data_rotated = false;
- stream->index_rotated = false;
- }
- }
-
-end:
- return ret;
-}
-
-/*
- * Check if a stream's data file (as opposed to index) should be rotated
- * (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream_data(struct relay_stream *stream)
-{
- int ret = 0;
-
- if (stream->rotate_at_seq_num == -1ULL) {
- /* No rotation expected. */
- goto end;
- }
-
- if (stream->data_rotated) {
- /* Rotation of the data file has already occurred. */
- goto end;
- }
-
- if (stream->prev_data_seq == -1ULL ||
- stream->prev_data_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq);
- goto end;
- } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
- /*
- * prev_data_seq is checked here since indexes and rotation
- * commands are serialized with respect to each other.
- */
- DBG("Rotation after too much data has been written in tracefile "
- "for stream %" PRIu64 ", need to truncate before "
- "rotating", stream->stream_handle);
- ret = rotate_truncate_stream(stream);
- if (ret) {
- ERR("Failed to truncate stream");
- goto end;
- }
- } else if (stream->prev_data_seq != stream->rotate_at_seq_num) {
- /*
- * Unexpected, protocol error/bug.
- * It could mean that we received a rotation position
- * that is in the past.
- */
- ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq);
- ret = -1;
- goto end;
- } else {
- ret = do_rotate_stream_data(stream);
- }
-
-end:
- return ret;
-}
-