Fix: protect consumer_find_channel with rcu locking
[lttng-tools.git] / src / common / consumer.c
index dbec177b7328468b278d0205598e04c574609b53..51f861e23695c642a56a943aa272fb24c89c7a5d 100644 (file)
@@ -127,6 +127,12 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht)
        rcu_read_unlock();
 }
 
+/*
+ * Return a channel object for the given key.
+ *
+ * RCU read side lock MUST be acquired before calling this function and
+ * protects the channel ptr.
+ */
 static struct lttng_consumer_channel *consumer_find_channel(int key)
 {
        struct lttng_ht_iter iter;
@@ -138,8 +144,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
                return NULL;
        }
 
-       rcu_read_lock();
-
        lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -147,8 +151,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
                channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
 
-       rcu_read_unlock();
-
        return channel;
 }
 
@@ -348,8 +350,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                goto free_stream;
        }
 
-       pthread_mutex_lock(&stream->lock);
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -441,8 +443,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 
 end:
        consumer_data.need_update = 1;
-       pthread_mutex_unlock(&consumer_data.lock);
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
                consumer_del_channel(free_chan);
@@ -475,6 +477,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                goto end;
        }
 
+       rcu_read_lock();
+
        /*
         * Get stream's channel reference. Needed when adding the stream to the
         * global hash table.
@@ -531,9 +535,12 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                        stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
                        (unsigned long long) stream->mmap_len, stream->out_fd,
                        stream->net_seq_idx, stream->session_id);
+
+       rcu_read_unlock();
        return stream;
 
 error:
+       rcu_read_unlock();
        free(stream);
 end:
        return NULL;
@@ -554,6 +561,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %d", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
        /* Steal stream identifier to avoid having streams with the same key */
@@ -593,6 +601,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        consumer_data.need_update = 1;
 
        rcu_read_unlock();
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -1278,8 +1287,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
-       pthread_mutex_lock(&stream->lock);
-
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1404,7 +1411,6 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
-       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1445,8 +1451,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
-       pthread_mutex_lock(&stream->lock);
-
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1614,7 +1618,6 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
-       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1760,9 +1763,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                goto free_stream;
        }
 
+       pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
 
-       pthread_mutex_lock(&consumer_data.lock);
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                if (stream->mmap_base != NULL) {
@@ -1852,8 +1855,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
 end:
-       pthread_mutex_unlock(&consumer_data.lock);
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
                consumer_del_channel(free_chan);
@@ -1872,6 +1875,8 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
 
        assert(stream);
        assert(ht);
@@ -1879,6 +1884,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
@@ -1886,6 +1892,15 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
         */
 
        rcu_read_lock();
+
+       /*
+        * Lookup the stream just to make sure it does not exist in our internal
+        * state. This should NEVER happen.
+        */
+       lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       assert(!node);
+
        /* Find relayd and, if one is found, increment refcount. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
@@ -1906,9 +1921,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_streams);
        }
 
-       /* Steal stream identifier to avoid having streams with the same key */
-       consumer_steal_stream_key(stream->key, ht);
-
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
        /*
@@ -1920,6 +1932,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2547,17 +2560,27 @@ end:
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
+       ssize_t ret;
+
+       pthread_mutex_lock(&stream->lock);
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_read_subbuffer(stream, ctx);
+               ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+               break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               return lttng_ustconsumer_read_subbuffer(stream, ctx);
+               ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+               break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
-               return -ENOSYS;
+               ret = -ENOSYS;
+               break;
        }
+
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
 }
 
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
This page took 0.0261 seconds and 4 git commands to generate.