Introduce channel timer lock
[lttng-tools.git] / src / common / consumer-stream.c
index 24f1b8a42d14c580c013f3f38bac9b9576f3c89a..02887fcc457aaa3deb435d94bfc3ccb04fa2496b 100644 (file)
@@ -19,6 +19,7 @@
 
 #define _GNU_SOURCE
 #include <assert.h>
+#include <inttypes.h>
 #include <sys/mman.h>
 #include <unistd.h>
 
@@ -57,8 +58,10 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
        assert(stream);
        assert(relayd);
 
-       uatomic_dec(&relayd->refcount);
-       assert(uatomic_read(&relayd->refcount) >= 0);
+       if (stream->sent_to_relayd) {
+               uatomic_dec(&relayd->refcount);
+               assert(uatomic_read(&relayd->refcount) >= 0);
+       }
 
        /* Closing streams requires to lock the control socket. */
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -80,6 +83,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
                        uatomic_read(&relayd->destroy_flag)) {
                consumer_destroy_relayd(relayd);
        }
+       stream->net_seq_idx = (uint64_t) -1ULL;
+       stream->sent_to_relayd = 0;
 }
 
 /*
@@ -110,11 +115,11 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                        if (ret) {
                                PERROR("close");
                        }
+                       stream->wait_fd = -1;
                }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_del_stream(stream);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -127,6 +132,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                if (ret) {
                        PERROR("close");
                }
+               stream->out_fd = -1;
        }
 
        /* Check and cleanup relayd if needed. */
@@ -151,6 +157,8 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream,
        struct lttng_ht_iter iter;
 
        assert(stream);
+       /* Should NEVER be called not in monitor mode. */
+       assert(stream->chan->monitor);
 
        rcu_read_lock();
 
@@ -193,29 +201,51 @@ void consumer_stream_free(struct lttng_consumer_stream *stream)
 }
 
 /*
- * Destroy a stream completely. This will delete, close and free the stream.
- * Once return, the stream is NO longer usable. Its channel may get destroyed
- * if conditions are met.
- *
- * This MUST be called WITHOUT the consumer data and stream lock acquired.
+ * Destroy the stream's buffers of the tracer.
  */
-void consumer_stream_destroy(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 {
-       struct lttng_consumer_channel *free_chan = NULL;
-
        assert(stream);
 
-       DBG("Consumer stream destroy - wait_fd: %d", stream->wait_fd);
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_del_stream(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+       }
+}
 
-       pthread_mutex_lock(&consumer_data.lock);
-       pthread_mutex_lock(&stream->lock);
+/*
+ * Destroy and close a already created stream.
+ */
+static void destroy_close_stream(struct lttng_consumer_stream *stream)
+{
+       assert(stream);
 
-       /* Remove every reference of the stream in the consumer. */
-       consumer_stream_delete(stream, ht);
+       DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
 
+       /* Destroy tracer buffers of the stream. */
+       consumer_stream_destroy_buffers(stream);
        /* Close down everything including the relayd if one. */
        consumer_stream_close(stream);
+}
+
+/*
+ * Decrement the stream's channel refcount and if down to 0, return the channel
+ * pointer so it can be destroyed by the caller or NULL if not.
+ */
+static struct lttng_consumer_channel *unref_channel(
+               struct lttng_consumer_stream *stream)
+{
+       struct lttng_consumer_channel *free_chan = NULL;
+
+       assert(stream);
+       assert(stream->chan);
 
        /* Update refcount of channel and see if we need to destroy it. */
        if (!uatomic_sub_return(&stream->chan->refcount, 1)
@@ -223,14 +253,64 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                free_chan = stream->chan;
        }
 
-       /* Indicates that the consumer data state MUST be updated after this. */
-       consumer_data.need_update = 1;
+       return free_chan;
+}
+
+/*
+ * Destroy a stream completely. This will delete, close and free the stream.
+ * Once return, the stream is NO longer usable. Its channel may get destroyed
+ * if conditions are met for a monitored stream.
+ *
+ * This MUST be called WITHOUT the consumer data and stream lock acquired if
+ * the stream is in _monitor_ mode else it does not matter.
+ */
+void consumer_stream_destroy(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
+{
+       assert(stream);
+
+       /* Stream is in monitor mode. */
+       if (stream->monitor) {
+               struct lttng_consumer_channel *free_chan = NULL;
 
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&consumer_data.lock);
+               /*
+                * This means that the stream was successfully removed from the streams
+                * list of the channel and sent to the right thread managing this
+                * stream thus being globally visible.
+                */
+               if (stream->globally_visible) {
+                       pthread_mutex_lock(&consumer_data.lock);
+                       pthread_mutex_lock(&stream->chan->lock);
+                       pthread_mutex_lock(&stream->chan->timer_lock);
+                       pthread_mutex_lock(&stream->lock);
+                       /* Remove every reference of the stream in the consumer. */
+                       consumer_stream_delete(stream, ht);
+
+                       destroy_close_stream(stream);
+
+                       /* Update channel's refcount of the stream. */
+                       free_chan = unref_channel(stream);
+
+                       /* Indicates that the consumer data state MUST be updated after this. */
+                       consumer_data.need_update = 1;
+
+                       pthread_mutex_unlock(&stream->lock);
+                       pthread_mutex_unlock(&stream->chan->timer_lock);
+                       pthread_mutex_unlock(&stream->chan->lock);
+                       pthread_mutex_unlock(&consumer_data.lock);
+               } else {
+                       /*
+                        * If the stream is not visible globally, this needs to be done
+                        * outside of the consumer data lock section.
+                        */
+                       free_chan = unref_channel(stream);
+               }
 
-       if (free_chan) {
-               consumer_del_channel(free_chan);
+               if (free_chan) {
+                       consumer_del_channel(free_chan);
+               }
+       } else {
+               destroy_close_stream(stream);
        }
 
        /* Free stream within a RCU call. */
This page took 0.025775 seconds and 4 git commands to generate.