Fix: big relayd cleanup and refactor
[lttng-tools.git] / src / bin / lttng-relayd / viewer-stream.c
index 3953d7dd10a4bd18132926286536df87a27ff5f0..a16f331b8f795607389a1efa2c20263ac7609b6f 100644 (file)
@@ -41,11 +41,12 @@ static void deferred_free_viewer_stream(struct rcu_head *head)
 }
 
 struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
-               enum lttng_viewer_seek seek_t)
+               enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
 {
        struct relay_viewer_stream *vstream;
 
        assert(stream);
+       assert(ctf_trace);
 
        vstream = zmalloc(sizeof(*vstream));
        if (!vstream) {
@@ -53,7 +54,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                goto error;
        }
 
-       vstream->session_id = stream->session->id;
+       vstream->session_id = stream->session_id;
        vstream->stream_handle = stream->stream_handle;
        vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
        vstream->channel_name = strndup(stream->channel_name,
@@ -74,11 +75,9 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                goto error;
        }
 
-       vstream->ctf_trace = stream->ctf_trace;
        if (vstream->metadata_flag) {
-               vstream->ctf_trace->viewer_metadata_stream = vstream;
+               ctf_trace->viewer_metadata_stream = vstream;
        }
-       uatomic_inc(&vstream->ctf_trace->refcount);
 
        /* Globally visible after the add unique. */
        lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
@@ -144,14 +143,16 @@ void viewer_stream_delete(struct relay_viewer_stream *stream)
        assert(!ret);
 }
 
-void viewer_stream_destroy(struct relay_viewer_stream *stream)
+void viewer_stream_destroy(struct ctf_trace *ctf_trace,
+               struct relay_viewer_stream *stream)
 {
        int ret;
-       unsigned long ret_ref;
 
        assert(stream);
-       ret_ref = uatomic_add_return(&stream->ctf_trace->refcount, -1);
-       assert(ret_ref >= 0);
+
+       if (ctf_trace) {
+               ctf_trace_put_ref(ctf_trace);
+       }
 
        if (stream->read_fd >= 0) {
                ret = close(stream->read_fd);
@@ -166,25 +167,6 @@ void viewer_stream_destroy(struct relay_viewer_stream *stream)
                }
        }
 
-       /*
-        * If the only stream left in the HT is the metadata stream,
-        * we need to remove it because we won't detect a EOF for this
-        * stream.
-        */
-       if (ret_ref == 1 && stream->ctf_trace->viewer_metadata_stream) {
-               viewer_stream_delete(stream->ctf_trace->viewer_metadata_stream);
-               viewer_stream_destroy(stream->ctf_trace->viewer_metadata_stream);
-               stream->ctf_trace->metadata_stream = NULL;
-               DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id);
-               /*
-                * The streaming-side is already closed and we can't receive a new
-                * stream concurrently at this point (since the session is being
-                * destroyed), so when we detect the refcount equals 0, we are the
-                * only owners of the ctf_trace and we can free it ourself.
-                */
-               free(stream->ctf_trace);
-       }
-
        call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
 }
 
@@ -223,6 +205,13 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream,
        uint64_t tracefile_id;
 
        assert(vstream);
+       assert(stream);
+
+       if (vstream->tracefile_count == 0) {
+               /* Ignore rotation, there is none to do. */
+               ret = 0;
+               goto end;
+       }
 
        tracefile_id = (vstream->tracefile_count_current + 1) %
                vstream->tracefile_count;
@@ -236,19 +225,16 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream,
        }
 
        /*
-        * If the stream on the streaming side still exists, lock to execute
-        * rotation in order to avoid races between a modification on the index
-        * values.
+        * Lock to execute rotation in order to avoid races between a modification
+        * on the index values.
         */
-       if (stream) {
-               pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
-       }
+       pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
 
        /*
         * The writer and the reader are not working in the same tracefile, we can
         * read up to EOF, we don't care about the total_index_received.
         */
-       if (!stream || (stream->tracefile_count_current != tracefile_id)) {
+       if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
                vstream->close_write_flag = 1;
        } else {
                /*
@@ -256,7 +242,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream,
                 * limit our reading to the number of indexes received.
                 */
                vstream->close_write_flag = 0;
-               if (stream) {
+               if (stream->close_flag) {
                        vstream->total_index_received = stream->total_index_received;
                }
        }
@@ -278,9 +264,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream,
        vstream->abort_flag = 0;
        pthread_mutex_unlock(&vstream->overwrite_lock);
 
-       if (stream) {
-               pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
-       }
+       pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
 
        ret = index_open(vstream->path_name, vstream->channel_name,
                        vstream->tracefile_count, vstream->tracefile_count_current);
This page took 0.025616 seconds and 4 git commands to generate.