consumer: introduce channel lock
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 15 Jul 2013 23:45:24 +0000 (19:45 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 16 Jul 2013 18:44:14 +0000 (14:44 -0400)
Reviewed-by: Julien Desfossez <julien.desfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer-metadata-cache.c
src/common/consumer.c
src/common/consumer.h
src/common/ust-consumer/ust-consumer.c

index 0c20e7fc7cbba6850a3337fc4ebb788d3e5d5f84..c6e6c6b164a423b81dace0312ee3c71c9cb3626e 100644 (file)
@@ -204,6 +204,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
        cache = channel->metadata_cache;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->metadata_cache->lock);
 
        if (cache->rb_pushed >= offset) {
@@ -224,6 +225,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
        }
 
        pthread_mutex_unlock(&channel->metadata_cache->lock);
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
index 560ec6cd06a24366daf5879fb779c4aa6a19ff4d..b24f95003c9fcd25a0e93eba6349c51bc72151ca 100644 (file)
@@ -292,6 +292,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -321,6 +322,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
@@ -651,6 +653,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -694,6 +697,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -879,6 +883,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
+       pthread_mutex_init(&channel->lock, NULL);
 
        strncpy(channel->pathname, pathname, sizeof(channel->pathname));
        channel->pathname[sizeof(channel->pathname) - 1] = '\0';
@@ -911,6 +916,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -927,6 +933,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
@@ -1886,6 +1893,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1980,6 +1988,7 @@ end:
        stream->chan->metadata_stream = NULL;
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -2008,6 +2017,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -2059,6 +2069,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
index a5a758be5ea141ff609051463852be49525d6b1b..8751f234e31605188294c809d9fef593a82d8198 100644 (file)
@@ -157,6 +157,15 @@ struct lttng_consumer_channel {
        /* On-disk circular buffer */
        uint64_t tracefile_size;
        uint64_t tracefile_count;
+       /*
+        * Channel lock.
+        *
+        * This is nested INSIDE the consumer data lock.
+        * This is nested OUTSIDE the metadata cache lock.
+        * This is nested OUTSIDE stream lock.
+        * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+        */
+       pthread_mutex_t lock;
 };
 
 /*
@@ -228,6 +237,7 @@ struct lttng_consumer_stream {
         *
         * This is nested INSIDE the consumer_data lock.
         * This is nested INSIDE the metadata cache lock.
+        * This is nested INSIDE the channel lock.
         * This is nested OUTSIDE consumer_relayd_sock_pair lock.
         */
        pthread_mutex_t lock;
index 1068087225255705a1d2407a7642fef376bcde37..739cebaefe04934573876b0ced133de8b258e338 100644 (file)
@@ -653,6 +653,7 @@ static int close_metadata(uint64_t chan_key)
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
 
        if (cds_lfht_is_node_deleted(&channel->node.node)) {
                goto error_unlock;
@@ -673,6 +674,7 @@ static int close_metadata(uint64_t chan_key)
        }
 
 error_unlock:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 error:
        return ret;
@@ -776,7 +778,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
         * and ultimately try to get rid of this global consumer data lock.
         */
        pthread_mutex_lock(&consumer_data.lock);
-
+       pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->metadata_cache->lock);
        ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
        if (ret < 0) {
@@ -788,10 +790,12 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * waiting for the metadata cache to be flushed.
                 */
                pthread_mutex_unlock(&channel->metadata_cache->lock);
+               pthread_mutex_unlock(&channel->lock);
                pthread_mutex_unlock(&consumer_data.lock);
                goto end_free;
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        while (consumer_metadata_cache_flushed(channel, offset + len)) {
This page took 0.030698 seconds and 4 git commands to generate.