Fix: add globally visible flag in stream
authorDavid Goulet <dgoulet@efficios.com>
Thu, 4 Jul 2013 19:35:21 +0000 (15:35 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 5 Jul 2013 18:26:02 +0000 (14:26 -0400)
This flag is added so we can destroy it correctly with the
consumer-stream API.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer-stream.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 027d751508cf69b76411beae1626d3c0fd847aac..03bac86870031eb2b63661f178f5cee0601748c6 100644 (file)
@@ -218,22 +218,13 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 }
 
 /*
 }
 
 /*
- * Destroy a stream in no monitor mode.
- *
- * We need a separate function because this can be called inside a destroy
- * channel path which have the consumer data lock acquired. Also, in no monitor
- * mode, the channel refcount is NOT incremented per stream since the ownership
- * of those streams are INSIDE the channel making the lazy destroy channel not
- * possible for a non monitor stream.
- *
- * Furthermore, there is no need to delete the stream from the global hash
- * table so we avoid useless calls.
+ * Destroy and close a already created stream.
  */
  */
-static void destroy_no_monitor(struct lttng_consumer_stream *stream)
+static void destroy_close_stream(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
 {
        assert(stream);
 
-       DBG("Consumer stream destroy unmonitored key: %" PRIu64, stream->key);
+       DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
 
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
 
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
@@ -242,21 +233,24 @@ static void destroy_no_monitor(struct lttng_consumer_stream *stream)
 }
 
 /*
 }
 
 /*
- * Destroy a stream in monitor mode.
+ * Decrement the stream's channel refcount and if down to 0, return the channel
+ * pointer so it can be destroyed by the caller or NULL if not.
  */
  */
-static void destroy_monitor(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+static struct lttng_consumer_channel *unref_channel(
+               struct lttng_consumer_stream *stream)
 {
 {
+       struct lttng_consumer_channel *free_chan = NULL;
+
        assert(stream);
        assert(stream);
+       assert(stream->chan);
 
 
-       DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
+       /* Update refcount of channel and see if we need to destroy it. */
+       if (!uatomic_sub_return(&stream->chan->refcount, 1)
+                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+               free_chan = stream->chan;
+       }
 
 
-       /* Remove every reference of the stream in the consumer. */
-       consumer_stream_delete(stream, ht);
-       /* Destroy tracer buffers of the stream. */
-       consumer_stream_destroy_buffers(stream);
-       /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       return free_chan;
 }
 
 /*
 }
 
 /*
@@ -273,35 +267,43 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        assert(stream);
 
        /* Stream is in monitor mode. */
        assert(stream);
 
        /* Stream is in monitor mode. */
-       if (stream->chan->monitor) {
+       if (stream->monitor) {
                struct lttng_consumer_channel *free_chan = NULL;
 
                struct lttng_consumer_channel *free_chan = NULL;
 
-               pthread_mutex_lock(&consumer_data.lock);
-               pthread_mutex_lock(&stream->lock);
-
-               destroy_monitor(stream, ht);
-
-               /* Update refcount of channel and see if we need to destroy it. */
-               if (!uatomic_sub_return(&stream->chan->refcount, 1)
-                               && !uatomic_read(&stream->chan->nb_init_stream_left)) {
-                       free_chan = stream->chan;
+               /*
+                * This means that the stream was successfully removed from the streams
+                * list of the channel and sent to the right thread managing this
+                * stream thus being globally visible.
+                */
+               if (stream->globally_visible) {
+                       pthread_mutex_lock(&consumer_data.lock);
+                       pthread_mutex_lock(&stream->lock);
+                       /* Remove every reference of the stream in the consumer. */
+                       consumer_stream_delete(stream, ht);
+
+                       destroy_close_stream(stream);
+
+                       /* Update channel's refcount of the stream. */
+                       free_chan = unref_channel(stream);
+
+                       /* Indicates that the consumer data state MUST be updated after this. */
+                       consumer_data.need_update = 1;
+
+                       pthread_mutex_unlock(&stream->lock);
+                       pthread_mutex_unlock(&consumer_data.lock);
+               } else {
+                       /*
+                        * If the stream is not visible globally, this needs to be done
+                        * outside of the consumer data lock section.
+                        */
+                       free_chan = unref_channel(stream);
                }
 
                }
 
-               /* Indicates that the consumer data state MUST be updated after this. */
-               consumer_data.need_update = 1;
-
-               pthread_mutex_unlock(&stream->lock);
-               pthread_mutex_unlock(&consumer_data.lock);
-
                if (free_chan) {
                        consumer_del_channel(free_chan);
                }
        } else {
                if (free_chan) {
                        consumer_del_channel(free_chan);
                }
        } else {
-               /*
-                * No monitor mode the stream's ownership is in its channel thus we
-                * don't have to handle the channel refcount nor the lazy deletion.
-                */
-               destroy_no_monitor(stream);
+               destroy_close_stream(stream);
        }
 
        /* Free stream within a RCU call. */
        }
 
        /* Free stream within a RCU call. */
index 30a95b48f4fc7d12e307e3752ff9f2b4d2bc19cd..3aafb519380b334099d584b83ca17d9b201ed6e1 100644 (file)
@@ -465,7 +465,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
-               enum consumer_channel_type type)
+               enum consumer_channel_type type,
+               unsigned int monitor)
 {
        int ret;
        struct lttng_consumer_stream *stream;
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -487,6 +488,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
+       stream->monitor = monitor;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
index 4cf6e910d5ef44ad2ba1967b897458632aca9e10..a64bcad3bba01975a97609793379baab651dee97 100644 (file)
@@ -259,6 +259,18 @@ struct lttng_consumer_stream {
        /* On-disk circular buffer */
        uint64_t tracefile_size_current;
        uint64_t tracefile_count_current;
        /* On-disk circular buffer */
        uint64_t tracefile_size_current;
        uint64_t tracefile_count_current;
+       /*
+        * Monitor or not the streams of this channel meaning this indicates if the
+        * streams should be sent to the data/metadata thread or added to the no
+        * monitor list of the channel.
+        */
+       unsigned int monitor;
+       /*
+        * Indicate if the stream is globally visible meaning that it has been
+        * added to the multiple hash tables. If *not* set, NO lock should be
+        * acquired in the destroy path.
+        */
+       unsigned int globally_visible;
 };
 
 /*
 };
 
 /*
@@ -478,7 +490,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
-               enum consumer_channel_type type);
+               enum consumer_channel_type type,
+               unsigned int monitor);
 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
index 77132c9ae135e2fa407e02df9e67475254ba1eac..bdf0eff7c4a2b56a72bdda1f5bd06ea35e213a98 100644 (file)
@@ -559,7 +559,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                channel->session_id,
                                msg.u.stream.cpu,
                                &alloc_ret,
                                channel->session_id,
                                msg.u.stream.cpu,
                                &alloc_ret,
-                               channel->type);
+                               channel->type,
+                               channel->monitor);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
index ff4ba25b9730f0a2896ef798e8869ca7c09aa1cc..3330ff522cdeea40fd47b1cc8dd60f72aedf7f37 100644 (file)
@@ -152,7 +152,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
                        channel->session_id,
                        cpu,
                        &alloc_ret,
                        channel->session_id,
                        cpu,
                        &alloc_ret,
-                       channel->type);
+                       channel->type,
+                       channel->monitor);
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
@@ -559,6 +560,12 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
 
                /* Remove node from the channel stream list. */
                cds_list_del(&stream->send_node);
 
                /* Remove node from the channel stream list. */
                cds_list_del(&stream->send_node);
+
+               /*
+                * From this point on, the stream's ownership has been moved away from
+                * the channel and becomes globally visible.
+                */
+               stream->globally_visible = 1;
        }
 
 error:
        }
 
 error:
This page took 0.029102 seconds and 4 git commands to generate.