relayd: fix: rotate_truncate_stream() assumes non-null next chunk
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 5af870d36fc4af2bcd708d461ad7bcdb13ea7447..06e82b29198a082476c34b5d215cc3a083ad14bf 100644 (file)
@@ -77,132 +77,6 @@ static void stream_complete_rotation(struct relay_stream *stream)
        stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
 }
 
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives.
- */
-static int rotate_truncate_stream(struct relay_stream *stream)
-{
-       int ret, new_fd;
-       off_t lseek_ret;
-       uint64_t diff, pos = 0;
-       char buf[FILE_IO_STACK_BUFFER_SIZE];
-
-       assert(!stream->is_metadata);
-
-       assert(stream->tracefile_size_current >
-                       stream->pos_after_last_complete_data_index);
-       diff = stream->tracefile_size_current -
-                       stream->pos_after_last_complete_data_index;
-
-       /* Create the new tracefile. */
-       new_fd = utils_create_stream_file(stream->path_name,
-                       stream->channel_name,
-                       stream->tracefile_size, stream->tracefile_count,
-                       /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
-       if (new_fd < 0) {
-               ERR("Failed to create new stream file at path %s for channel %s",
-                               stream->path_name, stream->channel_name);
-               ret = -1;
-               goto end;
-       }
-
-       /*
-        * Rewind the current tracefile to the position at which the rotation
-        * should have occurred.
-        */
-       lseek_ret = lseek(stream->stream_fd->fd,
-                       stream->pos_after_last_complete_data_index, SEEK_SET);
-       if (lseek_ret < 0) {
-               PERROR("seek truncate stream");
-               ret = -1;
-               goto end;
-       }
-
-       /* Move data from the old file to the new file. */
-       while (pos < diff) {
-               uint64_t count, bytes_left;
-               ssize_t io_ret;
-
-               bytes_left = diff - pos;
-               count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
-               assert(count <= SIZE_MAX);
-
-               io_ret = lttng_read(stream->stream_fd->fd, buf, count);
-               if (io_ret < (ssize_t) count) {
-                       char error_string[256];
-
-                       snprintf(error_string, sizeof(error_string),
-                                       "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
-                                       count, stream->stream_fd->fd, io_ret);
-                       if (io_ret == -1) {
-                               PERROR("%s", error_string);
-                       } else {
-                               ERR("%s", error_string);
-                       }
-                       ret = -1;
-                       goto end;
-               }
-
-               io_ret = lttng_write(new_fd, buf, count);
-               if (io_ret < (ssize_t) count) {
-                       char error_string[256];
-
-                       snprintf(error_string, sizeof(error_string),
-                                       "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
-                                       count, new_fd, io_ret);
-                       if (io_ret == -1) {
-                               PERROR("%s", error_string);
-                       } else {
-                               ERR("%s", error_string);
-                       }
-                       ret = -1;
-                       goto end;
-               }
-
-               pos += count;
-       }
-
-       /* Truncate the file to get rid of the excess data. */
-       ret = ftruncate(stream->stream_fd->fd,
-                       stream->pos_after_last_complete_data_index);
-       if (ret) {
-               PERROR("ftruncate");
-               goto end;
-       }
-
-       ret = close(stream->stream_fd->fd);
-       if (ret < 0) {
-               PERROR("Closing tracefile");
-               goto end;
-       }
-
-       /*
-        * Update the offset and FD of all the eventual indexes created by the
-        * data connection before the rotation command arrived.
-        */
-       ret = relay_index_switch_all_files(stream);
-       if (ret < 0) {
-               ERR("Failed to rotate index file");
-               goto end;
-       }
-
-       stream->stream_fd->fd = new_fd;
-       stream->tracefile_size_current = diff;
-       stream->pos_after_last_complete_data_index = 0;
-       stream_complete_rotation(stream);
-
-       ret = 0;
-
-end:
-       return ret;
-}
-
 static int stream_create_data_output_file_from_trace_chunk(
                struct relay_stream *stream,
                struct lttng_trace_chunk *trace_chunk,
@@ -318,6 +192,161 @@ end:
        return ret;
 }
 
+/*
+ * If too much data has been written in a tracefile before we received the
+ * rotation command, we have to move the excess data to the new tracefile and
+ * perform the rotation. This can happen because the control and data
+ * connections are separate, the indexes as well as the commands arrive from
+ * the control connection and we have no control over the order so we could be
+ * in a situation where too much data has been received on the data connection
+ * before the rotation command on the control connection arrives.
+ */
+static int rotate_truncate_stream(struct relay_stream *stream)
+{
+       int ret;
+       off_t lseek_ret, previous_stream_copy_origin;
+       uint64_t copy_bytes_left, misplaced_data_size;
+       bool acquired_reference;
+       struct stream_fd *previous_stream_fd = NULL;
+       struct lttng_trace_chunk *previous_chunk = NULL;
+
+       if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+               ERR("Protocol error encoutered in %s(): stream rotation "
+                       "sequence number is before the current sequence number "
+                       "and the next trace chunk is unset. Honoring this "
+                       "rotation command would result in data loss",
+                               __FUNCTION__);
+               ret = -1;
+               goto end;
+       }
+
+       ASSERT_LOCKED(stream->lock);
+       /*
+        * Acquire a reference to the current trace chunk to ensure
+        * it is not reclaimed when `stream_rotate_data_file` is called.
+        * Failing to do so would violate the contract of the trace
+        * chunk API as an active file descriptor would outlive the
+        * trace chunk.
+        */
+       acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
+       assert(acquired_reference);
+       previous_chunk = stream->trace_chunk;
+
+       /*
+        * Steal the stream's reference to its stream_fd. A new
+        * stream_fd will be created when the rotation completes and
+        * the orinal stream_fd will be used to copy the "extra" data
+        * to the new file.
+        */
+       assert(stream->stream_fd);
+       previous_stream_fd = stream->stream_fd;
+       stream->stream_fd = NULL;
+
+       assert(!stream->is_metadata);
+       assert(stream->tracefile_size_current >
+                       stream->pos_after_last_complete_data_index);
+       misplaced_data_size = stream->tracefile_size_current -
+                             stream->pos_after_last_complete_data_index;
+       copy_bytes_left = misplaced_data_size;
+       previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
+
+       ret = stream_rotate_data_file(stream);
+       if (ret) {
+               goto end;
+       }
+
+       assert(stream->stream_fd);
+       /*
+        * Seek the current tracefile to the position at which the rotation
+        * should have occurred.
+        */
+       lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
+                       SEEK_SET);
+       if (lseek_ret < 0) {
+               PERROR("Failed to seek to offset %" PRIu64
+                      " while copying extra data received before a stream rotation",
+                               (uint64_t) previous_stream_copy_origin);
+               ret = -1;
+               goto end;
+       }
+
+       /* Move data from the old file to the new file. */
+       while (copy_bytes_left) {
+               ssize_t io_ret;
+               char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
+               const off_t copy_size_this_pass = min_t(
+                               off_t, copy_bytes_left, sizeof(copy_buffer));
+
+               io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
+                               copy_size_this_pass);
+               if (io_ret < (ssize_t) copy_size_this_pass) {
+                       if (io_ret == -1) {
+                               PERROR("Failed to read %" PRIu64
+                                      " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               previous_stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       } else {
+                               ERR("Failed to read %" PRIu64
+                                   " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               previous_stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+
+               io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
+                               copy_size_this_pass);
+               if (io_ret < (ssize_t) copy_size_this_pass) {
+                       if (io_ret == -1) {
+                               PERROR("Failed to write %" PRIu64
+                                      " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               stream->stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       } else {
+                               ERR("Failed to write %" PRIu64
+                                   " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               stream->stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+               copy_bytes_left -= copy_size_this_pass;
+       }
+
+       /* Truncate the file to get rid of the excess data. */
+       ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+       if (ret) {
+               PERROR("Failed to truncate current stream file to offset %" PRIu64,
+                               previous_stream_copy_origin);
+               goto end;
+       }
+
+       /*
+        * Update the offset and FD of all the eventual indexes created by the
+        * data connection before the rotation command arrived.
+        */
+       ret = relay_index_switch_all_files(stream);
+       if (ret < 0) {
+               ERR("Failed to rotate index file");
+               goto end;
+       }
+
+       stream->tracefile_size_current = misplaced_data_size;
+       /* Index and data contents are back in sync. */
+       stream->pos_after_last_complete_data_index = 0;
+       ret = 0;
+end:
+       lttng_trace_chunk_put(previous_chunk);
+       stream_fd_put(previous_stream_fd);
+       return ret;
+}
+
 /*
  * Check if a stream's data file (as opposed to index) should be rotated
  * (for session rotation).
@@ -997,13 +1026,14 @@ int stream_write(struct relay_stream *stream,
        }
 
        if (stream->is_metadata) {
-               stream->metadata_received += packet->size + padding_len;
+               stream->metadata_received += packet ? packet->size : 0;
+               stream->metadata_received += padding_len;
        }
 
-       DBG("Wrote to %sstream %" PRIu64 ": data_length = %" PRIu64 ", padding_length = %" PRIu64,
+       DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
                        stream->is_metadata ? "metadata " : "",
                        stream->stream_handle,
-                       packet ? packet->size : 0, padding_len);
+                       packet ? packet->size : (size_t) 0, padding_len);
 end:
        return ret;
 }
This page took 0.025961 seconds and 4 git commands to generate.