Fix: relayd stream.c: LTTNG_OPTIONAL_GET address confusion
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index f98b465572ec880d4730224da7766fe4ce7fc040..755fb6734072113af804e59f6e1e81089adb6b1f 100644 (file)
@@ -90,7 +90,6 @@ static int stream_create_data_output_file_from_trace_chunk(
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
        ASSERT_LOCKED(stream->lock);
-       assert(stream->trace_chunk);
 
        ret = utils_stream_file_path(stream->path_name, stream->channel_name,
                        stream->tracefile_size, stream->tracefile_current_index,
@@ -210,6 +209,16 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        struct stream_fd *previous_stream_fd = NULL;
        struct lttng_trace_chunk *previous_chunk = NULL;
 
+       if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
+               ERR("Protocol error encoutered in %s(): stream rotation "
+                       "sequence number is before the current sequence number "
+                       "and the next trace chunk is unset. Honoring this "
+                       "rotation command would result in data loss",
+                               __FUNCTION__);
+               ret = -1;
+               goto end;
+       }
+
        ASSERT_LOCKED(stream->lock);
        /*
         * Acquire a reference to the current trace chunk to ensure
@@ -245,6 +254,7 @@ static int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
+       assert(stream->stream_fd);
        /*
         * Seek the current tracefile to the position at which the rotation
         * should have occurred.
@@ -633,7 +643,9 @@ end:
                stream_put(stream);
                stream = NULL;
        }
-       lttng_trace_chunk_put(current_trace_chunk);
+       if (acquired_reference) {
+               lttng_trace_chunk_put(current_trace_chunk);
+       }
        return stream;
 
 error_no_alloc:
@@ -910,6 +922,27 @@ void try_stream_close(struct relay_stream *stream)
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
+
+       /*
+        * If we are closed by an application exiting (per-pid buffers),
+        * we need to put our reference on the stream trace chunk right
+        * away, because otherwise still holding the reference on the
+        * trace chunk could allow a viewer stream (which holds a reference
+        * to the stream) to postpone destroy waiting for the chunk to cease
+        * to exist endlessly until the viewer is detached.
+        */
+
+       /* Put stream fd before put chunk. */
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        pthread_mutex_unlock(&stream->lock);
        DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
@@ -921,6 +954,14 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
        int ret = 0;
 
        ASSERT_LOCKED(stream->lock);
+
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
        if (caa_likely(stream->tracefile_size == 0)) {
                /* No size limit set; nothing to check. */
                goto end;
@@ -945,7 +986,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -986,6 +1027,12 @@ int stream_write(struct relay_stream *stream,
        memset(padding_buffer, 0,
                        min(sizeof(padding_buffer), padding_to_write));
 
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
        if (packet) {
                write_ret = lttng_write(stream->stream_fd->fd,
                                packet->data, packet->size);
@@ -1041,6 +1088,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        uint64_t data_offset;
        struct relay_index *index;
 
+       assert(stream->trace_chunk);
        ASSERT_LOCKED(stream->lock);
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
@@ -1081,6 +1129,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                *flushed = true;
@@ -1122,9 +1171,7 @@ int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size
 
        stream->prev_data_seq = sequence_number;
        ret = try_rotate_stream_data(stream);
-       if (ret < 0) {
-               goto end;
-       }
+
 end:
        return ret;
 }
@@ -1176,6 +1223,7 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
This page took 0.024415 seconds and 4 git commands to generate.