consumer: explicitly set endpoint status to active at init
[lttng-tools.git] / src / common / consumer.c
index bcede1ef5d8f3857515baeaf709ad9b78294a684..94a0cc3efb1d557da732ed9a201848b0ee4dc3e4 100644 (file)
@@ -291,6 +291,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
 
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
@@ -324,6 +325,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
@@ -489,6 +491,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
        stream->monitor = monitor;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -547,6 +550,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -584,6 +588,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -832,6 +837,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       pthread_mutex_init(&channel->lock, NULL);
 
        /*
         * In monitor mode, the streams associated with the channel will be put in
@@ -877,6 +883,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -893,6 +900,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
@@ -1852,6 +1860,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1945,6 +1954,7 @@ end:
        stream->chan->metadata_stream = NULL;
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -1972,6 +1982,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -2017,6 +2028,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -3457,3 +3469,23 @@ int consumer_send_status_channel(int sock,
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t max_stream_size)
+{
+       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+               /* Offset from the produced position to get the latest buffers. */
+               return produced_pos - max_stream_size;
+       }
+
+       return consumed_pos;
+}
This page took 0.024329 seconds and 4 git commands to generate.