X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=4b657f33249f7a4f777b7b3ff784b379b15f5878;hp=6e99a7f52d0d24ac1473db3f1c2bf12880a8df78;hb=6a00837f8cb0431a2ad90974d67fae138ea97dd5;hpb=75d83e506938a3e673ad19bb11838ee88e977e72 diff --git a/src/common/consumer.c b/src/common/consumer.c index 6e99a7f52..4b657f332 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -457,6 +457,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, consumer_stream_destroy(stream, ht); } +/* + * XXX naming of del vs destroy is all mixed up. + */ +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream) +{ + consumer_stream_destroy(stream, data_ht); +} + +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) +{ + consumer_stream_destroy(stream, metadata_ht); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -539,9 +552,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -static int add_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_data_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = data_ht; int ret = 0; assert(stream); @@ -551,6 +564,7 @@ static int add_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); rcu_read_lock(); @@ -588,12 +602,18 @@ static int add_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } +void consumer_del_data_stream(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -838,6 +858,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_count = tracefile_count; channel->monitor = monitor; pthread_mutex_init(&channel->lock, NULL); + pthread_mutex_init(&channel->timer_lock, NULL); /* * In monitor mode, the streams associated with the channel will be put in @@ -884,6 +905,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); + pthread_mutex_lock(&channel->timer_lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); @@ -900,6 +922,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -1956,8 +1979,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, end: /* * Nullify the stream reference so it is not used after deletion. The - * consumer data lock MUST be acquired before being able to check for a - * NULL pointer value. + * channel lock MUST be acquired before being able to check for + * a NULL pointer value. */ stream->chan->metadata_stream = NULL; @@ -1977,9 +2000,9 @@ free_stream_rcu: * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -static int add_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = metadata_ht; int ret = 0; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -1991,6 +2014,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); /* @@ -2037,6 +2061,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); + pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -2182,7 +2207,7 @@ restart: pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < 0) { - ERR("read metadata stream, ret: %ld", pipe_len); + ERR("read metadata stream, ret: %zd", pipe_len); /* * Continue here to handle the rest of the streams. */ @@ -2199,14 +2224,6 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); - ret = add_metadata_stream(stream, metadata_ht); - if (ret) { - ERR("Unable to add metadata stream"); - /* Stream was not setup properly. Continuing. */ - consumer_del_metadata_stream(stream, NULL); - continue; - } - /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); @@ -2405,7 +2422,7 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - ERR("Consumer data pipe ret %ld", pipe_readlen); + ERR("Consumer data pipe ret %zd", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2420,17 +2437,6 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = add_stream(new_stream, data_ht); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - } - /* Continue to update the local streams and handle prio ones */ continue; }