Fix: add globally visible flag in stream
[lttng-tools.git] / src / common / consumer-stream.c
index 027d751508cf69b76411beae1626d3c0fd847aac..03bac86870031eb2b63661f178f5cee0601748c6 100644 (file)
@@ -218,22 +218,13 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 }
 
 /*
- * Destroy a stream in no monitor mode.
- *
- * We need a separate function because this can be called inside a destroy
- * channel path which have the consumer data lock acquired. Also, in no monitor
- * mode, the channel refcount is NOT incremented per stream since the ownership
- * of those streams are INSIDE the channel making the lazy destroy channel not
- * possible for a non monitor stream.
- *
- * Furthermore, there is no need to delete the stream from the global hash
- * table so we avoid useless calls.
+ * Destroy and close a already created stream.
  */
-static void destroy_no_monitor(struct lttng_consumer_stream *stream)
+static void destroy_close_stream(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
-       DBG("Consumer stream destroy unmonitored key: %" PRIu64, stream->key);
+       DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
 
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
@@ -242,21 +233,24 @@ static void destroy_no_monitor(struct lttng_consumer_stream *stream)
 }
 
 /*
- * Destroy a stream in monitor mode.
+ * Decrement the stream's channel refcount and if down to 0, return the channel
+ * pointer so it can be destroyed by the caller or NULL if not.
  */
-static void destroy_monitor(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+static struct lttng_consumer_channel *unref_channel(
+               struct lttng_consumer_stream *stream)
 {
+       struct lttng_consumer_channel *free_chan = NULL;
+
        assert(stream);
+       assert(stream->chan);
 
-       DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
+       /* Update refcount of channel and see if we need to destroy it. */
+       if (!uatomic_sub_return(&stream->chan->refcount, 1)
+                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+               free_chan = stream->chan;
+       }
 
-       /* Remove every reference of the stream in the consumer. */
-       consumer_stream_delete(stream, ht);
-       /* Destroy tracer buffers of the stream. */
-       consumer_stream_destroy_buffers(stream);
-       /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       return free_chan;
 }
 
 /*
@@ -273,35 +267,43 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        assert(stream);
 
        /* Stream is in monitor mode. */
-       if (stream->chan->monitor) {
+       if (stream->monitor) {
                struct lttng_consumer_channel *free_chan = NULL;
 
-               pthread_mutex_lock(&consumer_data.lock);
-               pthread_mutex_lock(&stream->lock);
-
-               destroy_monitor(stream, ht);
-
-               /* Update refcount of channel and see if we need to destroy it. */
-               if (!uatomic_sub_return(&stream->chan->refcount, 1)
-                               && !uatomic_read(&stream->chan->nb_init_stream_left)) {
-                       free_chan = stream->chan;
+               /*
+                * This means that the stream was successfully removed from the streams
+                * list of the channel and sent to the right thread managing this
+                * stream thus being globally visible.
+                */
+               if (stream->globally_visible) {
+                       pthread_mutex_lock(&consumer_data.lock);
+                       pthread_mutex_lock(&stream->lock);
+                       /* Remove every reference of the stream in the consumer. */
+                       consumer_stream_delete(stream, ht);
+
+                       destroy_close_stream(stream);
+
+                       /* Update channel's refcount of the stream. */
+                       free_chan = unref_channel(stream);
+
+                       /* Indicates that the consumer data state MUST be updated after this. */
+                       consumer_data.need_update = 1;
+
+                       pthread_mutex_unlock(&stream->lock);
+                       pthread_mutex_unlock(&consumer_data.lock);
+               } else {
+                       /*
+                        * If the stream is not visible globally, this needs to be done
+                        * outside of the consumer data lock section.
+                        */
+                       free_chan = unref_channel(stream);
                }
 
-               /* Indicates that the consumer data state MUST be updated after this. */
-               consumer_data.need_update = 1;
-
-               pthread_mutex_unlock(&stream->lock);
-               pthread_mutex_unlock(&consumer_data.lock);
-
                if (free_chan) {
                        consumer_del_channel(free_chan);
                }
        } else {
-               /*
-                * No monitor mode the stream's ownership is in its channel thus we
-                * don't have to handle the channel refcount nor the lazy deletion.
-                */
-               destroy_no_monitor(stream);
+               destroy_close_stream(stream);
        }
 
        /* Free stream within a RCU call. */
This page took 0.025036 seconds and 4 git commands to generate.