From a9838785aecf564595c531772f7ca906b658afa0 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 15 Jul 2013 19:45:24 -0400 Subject: [PATCH] consumer: introduce channel lock Reviewed-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- src/common/consumer-metadata-cache.c | 2 ++ src/common/consumer-stream.c | 2 ++ src/common/consumer.c | 11 +++++++++++ src/common/consumer.h | 11 +++++++++++ src/common/ust-consumer/ust-consumer.c | 6 +++++- 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/common/consumer-metadata-cache.c b/src/common/consumer-metadata-cache.c index 0c20e7fc7..c6e6c6b16 100644 --- a/src/common/consumer-metadata-cache.c +++ b/src/common/consumer-metadata-cache.c @@ -204,6 +204,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, cache = channel->metadata_cache; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->metadata_cache->lock); if (cache->rb_pushed >= offset) { @@ -224,6 +225,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, } pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 723ec829f..717e0a735 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -280,6 +280,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, */ if (stream->globally_visible) { pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* Remove every reference of the stream in the consumer. */ consumer_stream_delete(stream, ht); @@ -293,6 +294,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, consumer_data.need_update = 1; pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); } else { /* diff --git a/src/common/consumer.c b/src/common/consumer.c index 9b544dd11..57e26dcf8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -291,6 +291,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, @@ -324,6 +325,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) call_rcu(&channel->node.head, free_channel_rcu); end: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); } @@ -547,6 +549,7 @@ static int add_stream(struct lttng_consumer_stream *stream, DBG3("Adding consumer stream %" PRIu64, stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); rcu_read_lock(); @@ -584,6 +587,7 @@ static int add_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; @@ -832,6 +836,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + pthread_mutex_init(&channel->lock, NULL); /* * In monitor mode, the streams associated with the channel will be put in @@ -877,6 +882,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); @@ -893,6 +899,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && @@ -1852,6 +1859,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { @@ -1945,6 +1953,7 @@ end: stream->chan->metadata_stream = NULL; pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -1972,6 +1981,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* @@ -2017,6 +2027,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } diff --git a/src/common/consumer.h b/src/common/consumer.h index dd26e5183..3a0c6c8d8 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -168,6 +168,16 @@ struct lttng_consumer_channel { * monitor list of the channel. */ unsigned int monitor; + + /* + * Channel lock. + * + * This is nested INSIDE the consumer data lock. + * This is nested OUTSIDE the metadata cache lock. + * This is nested OUTSIDE stream lock. + * This is nested OUTSIDE consumer_relayd_sock_pair lock. + */ + pthread_mutex_t lock; }; /* @@ -247,6 +257,7 @@ struct lttng_consumer_stream { * * This is nested INSIDE the consumer_data lock. * This is nested INSIDE the metadata cache lock. + * This is nested INSIDE the channel lock. * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ pthread_mutex_t lock; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b2434eb5a..abfe613a7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -638,6 +638,7 @@ static int close_metadata(uint64_t chan_key) } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; @@ -658,6 +659,7 @@ static int close_metadata(uint64_t chan_key) } error_unlock: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); error: return ret; @@ -1047,7 +1049,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * and ultimately try to get rid of this global consumer data lock. */ pthread_mutex_lock(&consumer_data.lock); - + pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->metadata_cache->lock); ret = consumer_metadata_cache_write(channel, offset, len, metadata_str); if (ret < 0) { @@ -1059,10 +1061,12 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); goto end_free; } pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); while (consumer_metadata_cache_flushed(channel, offset + len)) { -- 2.34.1