Fix: relayd stream.c: LTTNG_OPTIONAL_GET address confusion
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 043d19612ce0489efffad2b1f99fc1c8be52e98d..755fb6734072113af804e59f6e1e81089adb6b1f 100644 (file)
@@ -77,132 +77,6 @@ static void stream_complete_rotation(struct relay_stream *stream)
        stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
 }
 
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives.
- */
-static int rotate_truncate_stream(struct relay_stream *stream)
-{
-       int ret, new_fd;
-       off_t lseek_ret;
-       uint64_t diff, pos = 0;
-       char buf[FILE_IO_STACK_BUFFER_SIZE];
-
-       assert(!stream->is_metadata);
-
-       assert(stream->tracefile_size_current >
-                       stream->pos_after_last_complete_data_index);
-       diff = stream->tracefile_size_current -
-                       stream->pos_after_last_complete_data_index;
-
-       /* Create the new tracefile. */
-       new_fd = utils_create_stream_file(stream->path_name,
-                       stream->channel_name,
-                       stream->tracefile_size, stream->tracefile_count,
-                       /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
-       if (new_fd < 0) {
-               ERR("Failed to create new stream file at path %s for channel %s",
-                               stream->path_name, stream->channel_name);
-               ret = -1;
-               goto end;
-       }
-
-       /*
-        * Rewind the current tracefile to the position at which the rotation
-        * should have occurred.
-        */
-       lseek_ret = lseek(stream->stream_fd->fd,
-                       stream->pos_after_last_complete_data_index, SEEK_SET);
-       if (lseek_ret < 0) {
-               PERROR("seek truncate stream");
-               ret = -1;
-               goto end;
-       }
-
-       /* Move data from the old file to the new file. */
-       while (pos < diff) {
-               uint64_t count, bytes_left;
-               ssize_t io_ret;
-
-               bytes_left = diff - pos;
-               count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
-               assert(count <= SIZE_MAX);
-
-               io_ret = lttng_read(stream->stream_fd->fd, buf, count);
-               if (io_ret < (ssize_t) count) {
-                       char error_string[256];
-
-                       snprintf(error_string, sizeof(error_string),
-                                       "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
-                                       count, stream->stream_fd->fd, io_ret);
-                       if (io_ret == -1) {
-                               PERROR("%s", error_string);
-                       } else {
-                               ERR("%s", error_string);
-                       }
-                       ret = -1;
-                       goto end;
-               }
-
-               io_ret = lttng_write(new_fd, buf, count);
-               if (io_ret < (ssize_t) count) {
-                       char error_string[256];
-
-                       snprintf(error_string, sizeof(error_string),
-                                       "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
-                                       count, new_fd, io_ret);
-                       if (io_ret == -1) {
-                               PERROR("%s", error_string);
-                       } else {
-                               ERR("%s", error_string);
-                       }
-                       ret = -1;
-                       goto end;
-               }
-
-               pos += count;
-       }
-
-       /* Truncate the file to get rid of the excess data. */
-       ret = ftruncate(stream->stream_fd->fd,
-                       stream->pos_after_last_complete_data_index);
-       if (ret) {
-               PERROR("ftruncate");
-               goto end;
-       }
-
-       ret = close(stream->stream_fd->fd);
-       if (ret < 0) {
-               PERROR("Closing tracefile");
-               goto end;
-       }
-
-       /*
-        * Update the offset and FD of all the eventual indexes created by the
-        * data connection before the rotation command arrived.
-        */
-       ret = relay_index_switch_all_files(stream);
-       if (ret < 0) {
-               ERR("Failed to rotate index file");
-               goto end;
-       }
-
-       stream->stream_fd->fd = new_fd;
-       stream->tracefile_size_current = diff;
-       stream->pos_after_last_complete_data_index = 0;
-       stream_complete_rotation(stream);
-
-       ret = 0;
-
-end:
-       return ret;
-}
-
 static int stream_create_data_output_file_from_trace_chunk(
                struct relay_stream *stream,
                struct lttng_trace_chunk *trace_chunk,
@@ -216,7 +90,6 @@ static int stream_create_data_output_file_from_trace_chunk(
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
        ASSERT_LOCKED(stream->lock);
-       assert(stream->trace_chunk);
 
        ret = utils_stream_file_path(stream->path_name, stream->channel_name,
                        stream->tracefile_size, stream->tracefile_current_index,
@@ -318,6 +191,161 @@ end:
        return ret;
 }
 
+/*
+ * If too much data has been written in a tracefile before we received the
+ * rotation command, we have to move the excess data to the new tracefile and
+ * perform the rotation. This can happen because the control and data
+ * connections are separate, the indexes as well as the commands arrive from
+ * the control connection and we have no control over the order so we could be
+ * in a situation where too much data has been received on the data connection
+ * before the rotation command on the control connection arrives.
+ */
+static int rotate_truncate_stream(struct relay_stream *stream)
+{
+       int ret;
+       off_t lseek_ret, previous_stream_copy_origin;
+       uint64_t copy_bytes_left, misplaced_data_size;
+       bool acquired_reference;
+       struct stream_fd *previous_stream_fd = NULL;
+       struct lttng_trace_chunk *previous_chunk = NULL;
+
+       if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
+               ERR("Protocol error encoutered in %s(): stream rotation "
+                       "sequence number is before the current sequence number "
+                       "and the next trace chunk is unset. Honoring this "
+                       "rotation command would result in data loss",
+                               __FUNCTION__);
+               ret = -1;
+               goto end;
+       }
+
+       ASSERT_LOCKED(stream->lock);
+       /*
+        * Acquire a reference to the current trace chunk to ensure
+        * it is not reclaimed when `stream_rotate_data_file` is called.
+        * Failing to do so would violate the contract of the trace
+        * chunk API as an active file descriptor would outlive the
+        * trace chunk.
+        */
+       acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
+       assert(acquired_reference);
+       previous_chunk = stream->trace_chunk;
+
+       /*
+        * Steal the stream's reference to its stream_fd. A new
+        * stream_fd will be created when the rotation completes and
+        * the orinal stream_fd will be used to copy the "extra" data
+        * to the new file.
+        */
+       assert(stream->stream_fd);
+       previous_stream_fd = stream->stream_fd;
+       stream->stream_fd = NULL;
+
+       assert(!stream->is_metadata);
+       assert(stream->tracefile_size_current >
+                       stream->pos_after_last_complete_data_index);
+       misplaced_data_size = stream->tracefile_size_current -
+                             stream->pos_after_last_complete_data_index;
+       copy_bytes_left = misplaced_data_size;
+       previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
+
+       ret = stream_rotate_data_file(stream);
+       if (ret) {
+               goto end;
+       }
+
+       assert(stream->stream_fd);
+       /*
+        * Seek the current tracefile to the position at which the rotation
+        * should have occurred.
+        */
+       lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
+                       SEEK_SET);
+       if (lseek_ret < 0) {
+               PERROR("Failed to seek to offset %" PRIu64
+                      " while copying extra data received before a stream rotation",
+                               (uint64_t) previous_stream_copy_origin);
+               ret = -1;
+               goto end;
+       }
+
+       /* Move data from the old file to the new file. */
+       while (copy_bytes_left) {
+               ssize_t io_ret;
+               char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
+               const off_t copy_size_this_pass = min_t(
+                               off_t, copy_bytes_left, sizeof(copy_buffer));
+
+               io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
+                               copy_size_this_pass);
+               if (io_ret < (ssize_t) copy_size_this_pass) {
+                       if (io_ret == -1) {
+                               PERROR("Failed to read %" PRIu64
+                                      " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               previous_stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       } else {
+                               ERR("Failed to read %" PRIu64
+                                   " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               previous_stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+
+               io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
+                               copy_size_this_pass);
+               if (io_ret < (ssize_t) copy_size_this_pass) {
+                       if (io_ret == -1) {
+                               PERROR("Failed to write %" PRIu64
+                                      " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               stream->stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       } else {
+                               ERR("Failed to write %" PRIu64
+                                   " bytes from fd %i in %s(), returned %zi",
+                                               copy_size_this_pass,
+                                               stream->stream_fd->fd,
+                                               __FUNCTION__, io_ret);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+               copy_bytes_left -= copy_size_this_pass;
+       }
+
+       /* Truncate the file to get rid of the excess data. */
+       ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+       if (ret) {
+               PERROR("Failed to truncate current stream file to offset %" PRIu64,
+                               previous_stream_copy_origin);
+               goto end;
+       }
+
+       /*
+        * Update the offset and FD of all the eventual indexes created by the
+        * data connection before the rotation command arrived.
+        */
+       ret = relay_index_switch_all_files(stream);
+       if (ret < 0) {
+               ERR("Failed to rotate index file");
+               goto end;
+       }
+
+       stream->tracefile_size_current = misplaced_data_size;
+       /* Index and data contents are back in sync. */
+       stream->pos_after_last_complete_data_index = 0;
+       ret = 0;
+end:
+       lttng_trace_chunk_put(previous_chunk);
+       stream_fd_put(previous_stream_fd);
+       return ret;
+}
+
 /*
  * Check if a stream's data file (as opposed to index) should be rotated
  * (for session rotation).
@@ -615,7 +643,9 @@ end:
                stream_put(stream);
                stream = NULL;
        }
-       lttng_trace_chunk_put(current_trace_chunk);
+       if (acquired_reference) {
+               lttng_trace_chunk_put(current_trace_chunk);
+       }
        return stream;
 
 error_no_alloc:
@@ -892,6 +922,27 @@ void try_stream_close(struct relay_stream *stream)
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
+
+       /*
+        * If we are closed by an application exiting (per-pid buffers),
+        * we need to put our reference on the stream trace chunk right
+        * away, because otherwise still holding the reference on the
+        * trace chunk could allow a viewer stream (which holds a reference
+        * to the stream) to postpone destroy waiting for the chunk to cease
+        * to exist endlessly until the viewer is detached.
+        */
+
+       /* Put stream fd before put chunk. */
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        pthread_mutex_unlock(&stream->lock);
        DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
@@ -903,6 +954,14 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
        int ret = 0;
 
        ASSERT_LOCKED(stream->lock);
+
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
        if (caa_likely(stream->tracefile_size == 0)) {
                /* No size limit set; nothing to check. */
                goto end;
@@ -922,12 +981,12 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                }
                DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
                                ", current_file_size = %" PRIu64
-                               ", packet_size = %" PRIu64 ", current_file_index = %" PRIu64
+                               ", packet_size = %zu, current_file_index = %" PRIu64
                                " new_file_index = %" PRIu64,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -968,6 +1027,12 @@ int stream_write(struct relay_stream *stream,
        memset(padding_buffer, 0,
                        min(sizeof(padding_buffer), padding_to_write));
 
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
        if (packet) {
                write_ret = lttng_write(stream->stream_fd->fd,
                                packet->data, packet->size);
@@ -997,13 +1062,14 @@ int stream_write(struct relay_stream *stream,
        }
 
        if (stream->is_metadata) {
-               stream->metadata_received += packet->size + padding_len;
+               stream->metadata_received += packet ? packet->size : 0;
+               stream->metadata_received += padding_len;
        }
 
-       DBG("Wrote to %sstream %" PRIu64 ": data_length = %" PRIu64 ", padding_length = %" PRIu64,
+       DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
                        stream->is_metadata ? "metadata " : "",
                        stream->stream_handle,
-                       packet ? packet->size : 0, padding_len);
+                       packet ? packet->size : (size_t) 0, padding_len);
 end:
        return ret;
 }
@@ -1022,6 +1088,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        uint64_t data_offset;
        struct relay_index *index;
 
+       assert(stream->trace_chunk);
        ASSERT_LOCKED(stream->lock);
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
@@ -1062,6 +1129,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                *flushed = true;
@@ -1103,9 +1171,7 @@ int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size
 
        stream->prev_data_seq = sequence_number;
        ret = try_rotate_stream_data(stream);
-       if (ret < 0) {
-               goto end;
-       }
+
 end:
        return ret;
 }
@@ -1157,6 +1223,7 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
This page took 0.02766 seconds and 4 git commands to generate.