Fix: relayd: put chunk reference when closing stream
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 1b4e38ac8fc2b0c9d209b39ac3a559fe9e284f98..9d753bd0a7037c042c6218487d0bda0a4d95a0b4 100644 (file)
@@ -644,7 +644,9 @@ end:
                stream_put(stream);
                stream = NULL;
        }
-       lttng_trace_chunk_put(current_trace_chunk);
+       if (acquired_reference) {
+               lttng_trace_chunk_put(current_trace_chunk);
+       }
        return stream;
 
 error_no_alloc:
@@ -921,6 +923,27 @@ void try_stream_close(struct relay_stream *stream)
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
+
+       /*
+        * If we are closed by an application exiting (per-pid buffers),
+        * we need to put our reference on the stream trace chunk right
+        * away, because otherwise still holding the reference on the
+        * trace chunk could allow a viewer stream (which holds a reference
+        * to the stream) to postpone destroy waiting for the chunk to cease
+        * to exist endlessly until the viewer is detached.
+        */
+
+       /* Put stream fd before put chunk. */
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        pthread_mutex_unlock(&stream->lock);
        DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
@@ -956,7 +979,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -1052,6 +1075,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        uint64_t data_offset;
        struct relay_index *index;
 
+       assert(stream->trace_chunk);
        ASSERT_LOCKED(stream->lock);
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
@@ -1092,6 +1116,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                *flushed = true;
@@ -1185,6 +1210,7 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
This page took 0.02429 seconds and 4 git commands to generate.