Fix: relayd: put chunk reference when closing stream
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index f98b465572ec880d4730224da7766fe4ce7fc040..9d753bd0a7037c042c6218487d0bda0a4d95a0b4 100644 (file)
@@ -210,6 +210,16 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        struct stream_fd *previous_stream_fd = NULL;
        struct lttng_trace_chunk *previous_chunk = NULL;
 
+       if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+               ERR("Protocol error encoutered in %s(): stream rotation "
+                       "sequence number is before the current sequence number "
+                       "and the next trace chunk is unset. Honoring this "
+                       "rotation command would result in data loss",
+                               __FUNCTION__);
+               ret = -1;
+               goto end;
+       }
+
        ASSERT_LOCKED(stream->lock);
        /*
         * Acquire a reference to the current trace chunk to ensure
@@ -245,6 +255,7 @@ static int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
+       assert(stream->stream_fd);
        /*
         * Seek the current tracefile to the position at which the rotation
         * should have occurred.
@@ -633,7 +644,9 @@ end:
                stream_put(stream);
                stream = NULL;
        }
-       lttng_trace_chunk_put(current_trace_chunk);
+       if (acquired_reference) {
+               lttng_trace_chunk_put(current_trace_chunk);
+       }
        return stream;
 
 error_no_alloc:
@@ -910,6 +923,27 @@ void try_stream_close(struct relay_stream *stream)
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
+
+       /*
+        * If we are closed by an application exiting (per-pid buffers),
+        * we need to put our reference on the stream trace chunk right
+        * away, because otherwise still holding the reference on the
+        * trace chunk could allow a viewer stream (which holds a reference
+        * to the stream) to postpone destroy waiting for the chunk to cease
+        * to exist endlessly until the viewer is detached.
+        */
+
+       /* Put stream fd before put chunk. */
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        pthread_mutex_unlock(&stream->lock);
        DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
@@ -945,7 +979,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -1041,6 +1075,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        uint64_t data_offset;
        struct relay_index *index;
 
+       assert(stream->trace_chunk);
        ASSERT_LOCKED(stream->lock);
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
@@ -1081,6 +1116,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                *flushed = true;
@@ -1122,9 +1158,7 @@ int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size
 
        stream->prev_data_seq = sequence_number;
        ret = try_rotate_stream_data(stream);
-       if (ret < 0) {
-               goto end;
-       }
+
 end:
        return ret;
 }
@@ -1176,6 +1210,7 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
This page took 0.02458 seconds and 4 git commands to generate.