Replace explicit rcu_read_lock/unlock with lttng::urcu::read_lock_guard
[lttng-tools.git] / src / common / consumer / consumer.cpp
index 4d64b0ea2be2b7f531f5fc03c1eef4573d2eefc8..27b34a39f249071867e49e9df7fb438517bb6ee8 100644 (file)
@@ -29,6 +29,7 @@
 #include <common/time.hpp>
 #include <common/trace-chunk-registry.hpp>
 #include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
 #include <common/ust-consumer/ust-consumer.hpp>
 #include <common/utils.hpp>
 
@@ -202,7 +203,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                return nullptr;
        }
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        lttng_ht_lookup(ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
@@ -210,8 +211,6 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
 
-       rcu_read_unlock();
-
        return stream;
 }
 
@@ -219,7 +218,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        stream = find_stream(key, ht);
        if (stream) {
                stream->key = (uint64_t) -1ULL;
@@ -230,7 +229,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
                 */
                stream->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -273,7 +271,7 @@ static void steal_channel_key(uint64_t key)
 {
        struct lttng_consumer_channel *channel;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(key);
        if (channel) {
                channel->key = (uint64_t) -1ULL;
@@ -284,7 +282,6 @@ static void steal_channel_key(uint64_t key)
                 */
                channel->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 static void free_channel_rcu(struct rcu_head *head)
@@ -404,7 +401,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->is_published) {
                int ret;
 
-               rcu_read_lock();
+               lttng::urcu::read_lock_guard read_lock;
                iter.iter.node = &channel->node.node;
                ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
                LTTNG_ASSERT(!ret);
@@ -412,7 +409,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                iter.iter.node = &channel->channels_by_session_id_ht_node.node;
                ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
                LTTNG_ASSERT(!ret);
-               rcu_read_unlock();
        }
 
        channel->is_deleted = true;
@@ -431,14 +427,15 @@ static void cleanup_relayd_ht()
        struct lttng_ht_iter iter;
        struct consumer_relayd_sock_pair *relayd;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
-               consumer_destroy_relayd(relayd);
+               cds_lfht_for_each_entry (
+                       the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+                       consumer_destroy_relayd(relayd);
+               }
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.relayd_ht);
 }
 
@@ -457,7 +454,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
 
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Let's begin with metadata */
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
@@ -474,7 +471,6 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
                }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -581,7 +577,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
@@ -614,7 +610,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        the_consumer_data.stream_count++;
        the_consumer_data.need_update = 1;
 
-       rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
@@ -720,7 +715,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
        LTTNG_ASSERT(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -757,7 +752,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
            stream->net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -774,7 +768,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        LTTNG_ASSERT(net_seq_idx != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -797,7 +791,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -809,12 +802,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -1127,11 +1119,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
         */
        steal_channel_key(channel->key);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
        lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
                         &channel->channels_by_session_id_ht_node);
-       rcu_read_unlock();
        channel->is_published = true;
 
        pthread_mutex_unlock(&channel->timer_lock);
@@ -1169,35 +1160,34 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 
        DBG("Updating poll fd array");
        *nb_inactive_fd = 0;
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Only active streams with an active end point can be added to the
-                * poll set and local stream storage of the thread.
-                *
-                * There is a potential race here for endpoint_status to be updated
-                * just after the check. However, this is OK since the stream(s) will
-                * be deleted once the thread is notified that the end point state has
-                * changed where this function will be called back again.
-                *
-                * We track the number of inactive FDs because they still need to be
-                * closed by the polling thread after a wakeup on the data_pipe or
-                * metadata_pipe.
-                */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
-                       (*nb_inactive_fd)++;
-                       continue;
+
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Only active streams with an active end point can be added to the
+                        * poll set and local stream storage of the thread.
+                        *
+                        * There is a potential race here for endpoint_status to be updated
+                        * just after the check. However, this is OK since the stream(s) will
+                        * be deleted once the thread is notified that the end point state has
+                        * changed where this function will be called back again.
+                        *
+                        * We track the number of inactive FDs because they still need to be
+                        * closed by the polling thread after a wakeup on the data_pipe or
+                        * metadata_pipe.
+                        */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                               (*nb_inactive_fd)++;
+                               continue;
+                       }
+
+                       (*pollfd)[i].fd = stream->wait_fd;
+                       (*pollfd)[i].events = POLLIN | POLLPRI;
+                       local_stream[i] = stream;
+                       i++;
                }
-               /*
-                * This clobbers way too much the debug output. Uncomment that if you
-                * need it for debugging purposes.
-                */
-               (*pollfd)[i].fd = stream->wait_fd;
-               (*pollfd)[i].events = POLLIN | POLLPRI;
-               local_stream[i] = stream;
-               i++;
        }
-       rcu_read_unlock();
 
        /*
         * Insert the consumer_data_pipe at the end of the array and don't
@@ -1278,14 +1268,15 @@ void lttng_consumer_cleanup()
        struct lttng_consumer_channel *channel;
        unsigned int trace_chunks_left;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-               consumer_del_channel(channel);
+               cds_lfht_for_each_entry (
+                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+                       consumer_del_channel(channel);
+               }
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.channel_ht);
        lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
 
@@ -1486,15 +1477,16 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_stream(stream, ht);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Ignore return value since we are currently cleaning up so any error
+                        * can't be handled.
+                        */
+                       (void) consumer_del_stream(stream, ht);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1512,15 +1504,16 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_metadata_stream(stream, ht);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Ignore return value since we are currently cleaning up so any error
+                        * can't be handled.
+                        */
+                       (void) consumer_del_metadata_stream(stream, ht);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1622,7 +1615,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        size_t write_len;
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
@@ -1762,7 +1755,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1801,7 +1793,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        }
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
@@ -1984,7 +1976,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return written;
 }
 
@@ -2205,7 +2196,7 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         * after this point.
         */
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /*
         * Lookup the stream just to make sure it does not exist in our internal
@@ -2239,8 +2230,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         */
        lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
 
-       rcu_read_unlock();
-
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
@@ -2257,16 +2246,18 @@ static void validate_endpoint_status_data_stream()
 
        DBG("Consumer delete flagged data stream");
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
-               /* Validate delete flag of the stream */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                       continue;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+                       /* Validate delete flag of the stream */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                               continue;
+                       }
+                       /* Delete it right now */
+                       consumer_del_stream(stream, data_ht);
                }
-               /* Delete it right now */
-               consumer_del_stream(stream, data_ht);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2281,22 +2272,23 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po
 
        LTTNG_ASSERT(pollset);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
-               /* Validate delete flag of the stream */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                       continue;
-               }
-               /*
-                * Remove from pollset so the metadata thread can continue without
-                * blocking on a deleted stream.
-                */
-               lttng_poll_del(pollset, stream->wait_fd);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+                       /* Validate delete flag of the stream */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                               continue;
+                       }
+                       /*
+                        * Remove from pollset so the metadata thread can continue without
+                        * blocking on a deleted stream.
+                        */
+                       lttng_poll_del(pollset, stream->wait_fd);
 
-               /* Delete it right now */
-               consumer_del_metadata_stream(stream, metadata_ht);
+                       /* Delete it right now */
+                       consumer_del_metadata_stream(stream, metadata_ht);
+               }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2431,7 +2423,7 @@ void *consumer_thread_metadata_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
@@ -2495,11 +2487,9 @@ void *consumer_thread_metadata_poll(void *data)
                                consumer_del_metadata_stream(stream, metadata_ht);
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
                        /* Release RCU lock for the stream looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -2838,7 +2828,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -2878,7 +2868,6 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
 }
 
 static void destroy_channel_ht(struct lttng_ht *ht)
@@ -2891,12 +2880,14 @@ static void destroy_channel_ht(struct lttng_ht *ht)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
-               ret = lttng_ht_del(ht, &iter);
-               LTTNG_ASSERT(ret != 0);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
+                       ret = lttng_ht_del(ht, &iter);
+                       LTTNG_ASSERT(ret != 0);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -2998,19 +2989,20 @@ void *consumer_thread_channel_poll(void *data)
 
                                        switch (action) {
                                        case CONSUMER_CHANNEL_ADD:
+                                       {
                                                DBG("Adding channel %d to poll set", chan->wait_fd);
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                                       chan->wait_fd);
-                                               rcu_read_lock();
+                                               lttng::urcu::read_lock_guard read_lock;
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                        &chan->wait_fd_node);
-                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                // FIXME: Empty flag on a pipe pollset, this might
                                                // hang on FreeBSD.
                                                lttng_poll_add(&events, chan->wait_fd, 0);
                                                break;
+                                       }
                                        case CONSUMER_CHANNEL_DEL:
                                        {
                                                /*
@@ -3023,10 +3015,9 @@ void *consumer_thread_channel_poll(void *data)
                                                 * GET_CHANNEL failed.
                                                 */
 
-                                               rcu_read_lock();
+                                               lttng::urcu::read_lock_guard read_lock;
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
-                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64
                                                            " not found for del channel",
                                                            key);
@@ -3059,7 +3050,6 @@ void *consumer_thread_channel_poll(void *data)
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
-                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -3093,7 +3083,7 @@ void *consumer_thread_channel_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
@@ -3126,12 +3116,10 @@ void *consumer_thread_channel_poll(void *data)
                                }
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -3773,7 +3761,7 @@ int consumer_data_pending(uint64_t id)
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
@@ -3887,13 +3875,11 @@ int consumer_data_pending(uint64_t id)
 data_not_pending:
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 0;
 
 data_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 1;
 }
 
@@ -4020,7 +4006,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                 nullptr);
        lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
@@ -4372,7 +4358,6 @@ end_unlock_stream:
 end_unlock_channel:
        pthread_mutex_unlock(&channel->lock);
 end:
-       rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
        lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
@@ -4459,7 +4444,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
        int ret;
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&channel->lock);
        cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                health_code_update();
@@ -4471,13 +4456,11 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
                pthread_mutex_unlock(&stream->lock);
        }
        pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
        return 0;
 
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4673,7 +4656,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
 
        ASSERT_RCU_READ_LOCKED();
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
@@ -4708,7 +4691,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
        ret = 0;
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4837,46 +4819,50 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                goto error;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(
-               the_consumer_data.channels_by_session_id_ht->ht,
-               the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
-               the_consumer_data.channels_by_session_id_ht->match_fct,
-               &session_id,
-               &iter.iter,
-               channel,
-               channels_by_session_id_ht_node.node)
        {
-               ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
-               if (ret) {
-                       /*
-                        * Roll-back the creation of this chunk.
-                        *
-                        * This is important since the session daemon will
-                        * assume that the creation of this chunk failed and
-                        * will never ask for it to be closed, resulting
-                        * in a leak and an inconsistent state for some
-                        * channels.
-                        */
-                       enum lttcomm_return_code close_ret;
-                       char path[LTTNG_PATH_MAX];
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry_duplicate(
+                       the_consumer_data.channels_by_session_id_ht->ht,
+                       the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
+                                                                             lttng_ht_seed),
+                       the_consumer_data.channels_by_session_id_ht->match_fct,
+                       &session_id,
+                       &iter.iter,
+                       channel,
+                       channels_by_session_id_ht_node.node)
+               {
+                       ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+                       if (ret) {
+                               /*
+                                * Roll-back the creation of this chunk.
+                                *
+                                * This is important since the session daemon will
+                                * assume that the creation of this chunk failed and
+                                * will never ask for it to be closed, resulting
+                                * in a leak and an inconsistent state for some
+                                * channels.
+                                */
+                               enum lttcomm_return_code close_ret;
+                               char path[LTTNG_PATH_MAX];
+
+                               DBG("Failed to set new trace chunk on existing channels, rolling back");
+                               close_ret =
+                                       lttng_consumer_close_trace_chunk(relayd_id,
+                                                                        session_id,
+                                                                        chunk_id,
+                                                                        chunk_creation_timestamp,
+                                                                        nullptr,
+                                                                        path);
+                               if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+                                       ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+                                           ", chunk_id = %" PRIu64,
+                                           session_id,
+                                           chunk_id);
+                               }
 
-                       DBG("Failed to set new trace chunk on existing channels, rolling back");
-                       close_ret = lttng_consumer_close_trace_chunk(relayd_id,
-                                                                    session_id,
-                                                                    chunk_id,
-                                                                    chunk_creation_timestamp,
-                                                                    nullptr,
-                                                                    path);
-                       if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
-                               ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
-                                   ", chunk_id = %" PRIu64,
-                                   session_id,
-                                   chunk_id);
+                               ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+                               break;
                        }
-
-                       ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
-                       break;
                }
        }
 
@@ -4914,7 +4900,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 error:
        /* Release the reference returned by the "publish" operation. */
        lttng_trace_chunk_put(published_chunk);
@@ -4990,30 +4975,32 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * it; it is only kept around to compare it (by address) to the
         * current chunk found in the session's channels.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-               int ret;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (
+                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+                       int ret;
 
-               /*
-                * Only change the channel's chunk to NULL if it still
-                * references the chunk being closed. The channel may
-                * reference a newer channel in the case of a session
-                * rotation. When a session rotation occurs, the "next"
-                * chunk is created before the "current" chunk is closed.
-                */
-               if (channel->trace_chunk != chunk) {
-                       continue;
-               }
-               ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
-               if (ret) {
                        /*
-                        * Attempt to close the chunk on as many channels as
-                        * possible.
+                        * Only change the channel's chunk to NULL if it still
+                        * references the chunk being closed. The channel may
+                        * reference a newer channel in the case of a session
+                        * rotation. When a session rotation occurs, the "next"
+                        * chunk is created before the "current" chunk is closed.
                         */
-                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       if (channel->trace_chunk != chunk) {
+                               continue;
+                       }
+                       ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+                       if (ret) {
+                               /*
+                                * Attempt to close the chunk on as many channels as
+                                * possible.
+                                */
+                               ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       }
                }
        }
-
        if (relayd_id) {
                int ret;
                struct consumer_relayd_sock_pair *relayd;
@@ -5033,7 +5020,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 end:
        /*
         * Release the reference returned by the "find" operation and
@@ -5055,6 +5041,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        const bool is_local_trace = !relayd_id;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        bool chunk_exists_local, chunk_exists_remote;
+       lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -5087,7 +5074,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
                goto end;
        }
 
-       rcu_read_lock();
        relayd = consumer_find_relayd(*relayd_id);
        if (!relayd) {
                ERR("Failed to find relayd %" PRIu64, *relayd_id);
@@ -5109,7 +5095,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
 
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret_code;
 }
@@ -5123,7 +5108,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -5146,12 +5131,10 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
        return LTTCOMM_CONSUMERD_SUCCESS;
 
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -5192,56 +5175,56 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum
                goto end;
        }
 
-       rcu_read_lock();
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
-               enum consumer_stream_open_packet_status status;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+                       enum consumer_stream_open_packet_status status;
 
-               pthread_mutex_lock(&stream->lock);
-               if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
-               }
+                       pthread_mutex_lock(&stream->lock);
+                       if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                               goto next;
+                       }
 
-               status = consumer_stream_open_packet(stream);
-               switch (status) {
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel name = %s, session id = %" PRIu64,
-                           stream->key,
-                           stream->chan->name,
-                           stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk = true;
-                       break;
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel name = %s, session id = %" PRIu64,
-                           stream->key,
-                           stream->chan->name,
-                           stream->chan->session_id);
-                       break;
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                       /*
-                        * Only unexpected internal errors can lead to this
-                        * failing. Report an unknown error.
-                        */
-                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel id = %" PRIu64 ", channel name = %s"
-                           ", session id = %" PRIu64,
-                           stream->key,
-                           channel->key,
-                           channel->name,
-                           channel->session_id);
-                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
-                       goto error_unlock;
-               default:
-                       abort();
-               }
+                       status = consumer_stream_open_packet(stream);
+                       switch (status) {
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                               DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel name = %s, session id = %" PRIu64,
+                                   stream->key,
+                                   stream->chan->name,
+                                   stream->chan->session_id);
+                               stream->opened_packet_in_current_trace_chunk = true;
+                               break;
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                               DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel name = %s, session id = %" PRIu64,
+                                   stream->key,
+                                   stream->chan->name,
+                                   stream->chan->session_id);
+                               break;
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                               /*
+                                * Only unexpected internal errors can lead to this
+                                * failing. Report an unknown error.
+                                */
+                               ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel id = %" PRIu64 ", channel name = %s"
+                                   ", session id = %" PRIu64,
+                                   stream->key,
+                                   channel->key,
+                                   channel->name,
+                                   channel->session_id);
+                               ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                               goto error_unlock;
+                       default:
+                               abort();
+                       }
 
-       next:
-               pthread_mutex_unlock(&stream->lock);
+               next:
+                       pthread_mutex_unlock(&stream->lock);
+               }
        }
-
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret;
 
This page took 0.037346 seconds and 4 git commands to generate.