X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=f5bc5c58aa8506f27b8dffc37a02172223222e54;hb=447e8ad9216e708f1698e05dabfdaacc7b3196cf;hp=a0fb2384cd566fe59b58bbca771b3a279f8670c2;hpb=ec6ea7d01adc8a9d1481ba645b282c92ec27208e;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index a0fb2384c..f5bc5c58a 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -292,7 +292,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - pthread_mutex_lock(&channel->timer_lock); /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, @@ -326,7 +325,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) call_rcu(&channel->node.head, free_channel_rcu); end: - pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); } @@ -459,6 +457,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, consumer_stream_destroy(stream, ht); } +/* + * XXX naming of del vs destroy is all mixed up. + */ +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream) +{ + consumer_stream_destroy(stream, data_ht); +} + +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) +{ + consumer_stream_destroy(stream, metadata_ht); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -541,9 +552,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -static int add_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_data_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = data_ht; int ret = 0; assert(stream); @@ -598,6 +609,11 @@ static int add_stream(struct lttng_consumer_stream *stream, return ret; } +void consumer_del_data_stream(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -1869,7 +1885,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); - pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { @@ -1964,13 +1979,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, end: /* * Nullify the stream reference so it is not used after deletion. The - * consumer data lock MUST be acquired before being able to check for a - * NULL pointer value. + * channel lock MUST be acquired before being able to check for + * a NULL pointer value. */ stream->chan->metadata_stream = NULL; pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -1986,9 +2000,9 @@ free_stream_rcu: * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -static int add_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = metadata_ht; int ret = 0; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -2210,14 +2224,6 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); - ret = add_metadata_stream(stream, metadata_ht); - if (ret) { - ERR("Unable to add metadata stream"); - /* Stream was not setup properly. Continuing. */ - consumer_del_metadata_stream(stream, NULL); - continue; - } - /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); @@ -2431,17 +2437,6 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = add_stream(new_stream, data_ht); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - } - /* Continue to update the local streams and handle prio ones */ continue; }