Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / common / consumer / consumer.cpp
index eb7e6375eed4577736fb303a098ab37a6d7c3ece..a44d397d0b2b8118138c0b7b53cef9674b6becc9 100644 (file)
 #include <common/dynamic-array.hpp>
 #include <common/index/ctf-index.hpp>
 #include <common/index/index.hpp>
+#include <common/io-hint.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/time.hpp>
 #include <common/trace-chunk-registry.hpp>
 #include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
 #include <common/ust-consumer/ust-consumer.hpp>
 #include <common/utils.hpp>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -42,6 +46,7 @@
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <type_traits>
 #include <unistd.h>
 
 lttng_consumer_global_data the_consumer_data;
@@ -104,7 +109,8 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
 
        LTTNG_ASSERT(pipe);
 
-       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+                                                                            pointer. */
 }
 
 static void notify_health_quit_pipe(int *pipe)
@@ -167,19 +173,12 @@ error:
  */
 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
 {
-       struct lttng_consumer_stream *stream, *stmp;
-
        LTTNG_ASSERT(channel);
 
        /* Delete streams that might have been left in the stream list. */
-       cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
-               /*
-                * Once a stream is added to this list, the buffers were created so we
-                * have a guarantee that this call will succeed. Setting the monitor
-                * mode to 0 so we don't lock nor try to delete the stream from the
-                * global hash table.
-                */
-               stream->monitor = 0;
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                consumer_stream_destroy(stream, nullptr);
        }
 }
@@ -201,16 +200,14 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                return nullptr;
        }
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        lttng_ht_lookup(ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
 
-       rcu_read_unlock();
-
        return stream;
 }
 
@@ -218,7 +215,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        stream = find_stream(key, ht);
        if (stream) {
                stream->key = (uint64_t) -1ULL;
@@ -229,7 +226,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
                 */
                stream->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -252,7 +248,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        }
 
        lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
        }
@@ -272,7 +268,7 @@ static void steal_channel_key(uint64_t key)
 {
        struct lttng_consumer_channel *channel;
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(key);
        if (channel) {
                channel->key = (uint64_t) -1ULL;
@@ -283,7 +279,6 @@ static void steal_channel_key(uint64_t key)
                 */
                channel->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 static void free_channel_rcu(struct rcu_head *head)
@@ -303,7 +298,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -403,7 +399,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->is_published) {
                int ret;
 
-               rcu_read_lock();
+               const lttng::urcu::read_lock_guard read_lock;
                iter.iter.node = &channel->node.node;
                ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
                LTTNG_ASSERT(!ret);
@@ -411,7 +407,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                iter.iter.node = &channel->channels_by_session_id_ht_node.node;
                ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
                LTTNG_ASSERT(!ret);
-               rcu_read_unlock();
        }
 
        channel->is_deleted = true;
@@ -427,17 +422,14 @@ end:
  */
 static void cleanup_relayd_ht()
 {
-       struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd;
-
-       rcu_read_lock();
-
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+       for (auto *relayd :
+            lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+                                                decltype(consumer_relayd_sock_pair::node),
+                                                &consumer_relayd_sock_pair::node>(
+                    *the_consumer_data.relayd_ht->ht)) {
                consumer_destroy_relayd(relayd);
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.relayd_ht);
 }
 
@@ -451,29 +443,31 @@ static void cleanup_relayd_ht()
 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                                             enum consumer_endpoint_status status)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       rcu_read_lock();
-
        /* Let's begin with metadata */
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
 
        /* Follow up by the data streams */
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*data_ht->ht)) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
                }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -580,7 +574,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
@@ -613,7 +607,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        the_consumer_data.stream_count++;
        the_consumer_data.need_update = 1;
 
-       rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
@@ -626,7 +619,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
  */
 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
-       int ret = 0;
+       const int ret = 0;
        struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
@@ -634,7 +627,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
        ASSERT_RCU_READ_LOCKED();
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                goto end;
        }
@@ -695,7 +688,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
        }
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
        }
@@ -719,7 +712,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
        LTTNG_ASSERT(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -756,7 +749,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
            stream->net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -773,7 +765,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        LTTNG_ASSERT(net_seq_idx != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -796,7 +788,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -808,12 +799,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -1022,9 +1012,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == nullptr) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1050,7 +1042,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
+               delete channel;
                channel = nullptr;
                goto end;
        }
@@ -1090,7 +1082,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
        if (trace_chunk) {
-               int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
+               const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
                if (ret) {
                        goto error;
                }
@@ -1126,11 +1118,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
         */
        steal_channel_key(channel->key);
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
        lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
                         &channel->channels_by_session_id_ht_node);
-       rcu_read_unlock();
        channel->is_published = true;
 
        pthread_mutex_unlock(&channel->timer_lock);
@@ -1158,8 +1149,6 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                             int *nb_inactive_fd)
 {
        int i = 0;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(ht);
@@ -1168,8 +1157,11 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 
        DBG("Updating poll fd array");
        *nb_inactive_fd = 0;
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
                /*
                 * Only active streams with an active end point can be added to the
                 * poll set and local stream storage of the thread.
@@ -1187,16 +1179,12 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                        (*nb_inactive_fd)++;
                        continue;
                }
-               /*
-                * This clobbers way too much the debug output. Uncomment that if you
-                * need it for debugging purposes.
-                */
+
                (*pollfd)[i].fd = stream->wait_fd;
                (*pollfd)[i].events = POLLIN | POLLPRI;
                local_stream[i] = stream;
                i++;
        }
-       rcu_read_unlock();
 
        /*
         * Insert the consumer_data_pipe at the end of the array and don't
@@ -1257,11 +1245,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx,
  * Send return code to the session daemon.
  * If the socket is not defined, we return 0, it is not a fatal error
  */
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+                             enum lttcomm_return_code error_code)
 {
        if (ctx->consumer_error_socket > 0) {
+               const std::int32_t comm_code = std::int32_t(error_code);
+
+               static_assert(
+                       sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+                       "Fixed-size communication type too small to accomodate lttcomm_return_code");
                return lttcomm_send_unix_sock(
-                       ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+                       ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
        }
 
        return 0;
@@ -1273,18 +1267,16 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
  */
 void lttng_consumer_cleanup()
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
        unsigned int trace_chunks_left;
 
-       rcu_read_lock();
-
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::node),
+                                                &lttng_consumer_channel::node>(
+                    *the_consumer_data.channel_ht->ht)) {
                consumer_del_channel(channel);
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.channel_ht);
        lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
 
@@ -1344,8 +1336,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
  */
 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
 {
-       int ret;
-       int outfd = stream->out_fd;
+       const int outfd = stream->out_fd;
 
        /*
         * This does a blocking write-and-wait on any page that belongs to the
@@ -1356,31 +1347,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
        if (orig_offset < stream->max_sb_size) {
                return;
        }
-       lttng_sync_file_range(outfd,
-                             orig_offset - stream->max_sb_size,
-                             stream->max_sb_size,
-                             SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
-                                     SYNC_FILE_RANGE_WAIT_AFTER);
-       /*
-        * Give hints to the kernel about how we access the file:
-        * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-        * we write it.
-        *
-        * We need to call fadvise again after the file grows because the
-        * kernel does not seem to apply fadvise to non-existing parts of the
-        * file.
-        *
-        * Call fadvise _after_ having waited for the page writeback to
-        * complete because the dirty page writeback semantic is not well
-        * defined. So it can be expected to lead to lower throughput in
-        * streaming.
-        */
-       ret = posix_fadvise(
-               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
-       if (ret && ret != -ENOSYS) {
-               errno = ret;
-               PERROR("posix_fadvise on fd %i", outfd);
-       }
+       lttng::io::hint_flush_range_dont_need_sync(
+               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
 }
 
 /*
@@ -1478,22 +1446,20 @@ error:
  */
 static void destroy_data_stream_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        if (ht == nullptr) {
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
                /*
                 * Ignore return value since we are currently cleaning up so any error
                 * can't be handled.
                 */
                (void) consumer_del_stream(stream, ht);
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1504,22 +1470,20 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
  */
 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        if (ht == nullptr) {
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
                /*
                 * Ignore return value since we are currently cleaning up so any error
                 * can't be handled.
                 */
                (void) consumer_del_metadata_stream(stream, ht);
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1621,7 +1585,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        size_t write_len;
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
@@ -1739,8 +1703,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(
-                       outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+               lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
                stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
@@ -1761,7 +1724,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1780,7 +1742,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
        off_t orig_offset = stream->out_fd_offset;
-       int fd = stream->wait_fd;
+       const int fd = stream->wait_fd;
        /* Default is on the disk */
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = nullptr;
@@ -1800,7 +1762,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        }
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
@@ -1897,7 +1859,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
 
                /* Handle stream on the relayd if the output is on the network */
                if (relayd && stream->metadata_flag) {
-                       size_t metadata_payload_size =
+                       const size_t metadata_payload_size =
                                sizeof(struct lttcomm_relayd_metadata_payload);
 
                        /* Update counter to fit the spliced data */
@@ -1940,8 +1902,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
                        /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(
-                               outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+                       lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
                        stream->out_fd_offset += ret_splice;
                }
                stream->output_written += ret_splice;
@@ -1983,7 +1944,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return written;
 }
 
@@ -2154,7 +2114,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
                /* Go for channel deletion! */
                free_channel = true;
        }
-       stream->chan = nullptr;
 
        /*
         * Nullify the stream reference so it is not used after deletion. The
@@ -2162,6 +2121,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
@@ -2204,14 +2164,14 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         * after this point.
         */
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        /*
         * Lookup the stream just to make sure it does not exist in our internal
         * state. This should NEVER happen.
         */
        lttng_ht_lookup(ht, &stream->key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        LTTNG_ASSERT(!node);
 
        /*
@@ -2238,8 +2198,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         */
        lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
 
-       rcu_read_unlock();
-
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
@@ -2251,13 +2209,12 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
  */
 static void validate_endpoint_status_data_stream()
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer delete flagged data stream");
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*data_ht->ht)) {
                /* Validate delete flag of the stream */
                if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
                        continue;
@@ -2265,7 +2222,6 @@ static void validate_endpoint_status_data_stream()
                /* Delete it right now */
                consumer_del_stream(stream, data_ht);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2273,15 +2229,14 @@ static void validate_endpoint_status_data_stream()
  */
 static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer delete flagged metadata stream");
 
        LTTNG_ASSERT(pollset);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
                /* Validate delete flag of the stream */
                if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
                        continue;
@@ -2295,7 +2250,6 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po
                /* Delete it right now */
                consumer_del_metadata_stream(stream, metadata_ht);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2375,8 +2329,11 @@ void *consumer_thread_metadata_poll(void *data)
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                                   &stream,
-                                                                  sizeof(stream));
-                                       if (pipe_len < sizeof(stream)) {
+                                                                  sizeof(stream)); /* NOLINT sizeof
+                                                                                      used on a
+                                                                                      pointer. */
+                                       if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+                                                                           pointer. */
                                                if (pipe_len < 0) {
                                                        PERROR("read metadata stream");
                                                }
@@ -2427,16 +2384,16 @@ void *consumer_thread_metadata_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       const lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
                                lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
                        }
-                       node = lttng_ht_iter_get_node_u64(&iter);
+                       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
-                       stream = caa_container_of(node, struct lttng_consumer_stream, node);
+                       stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
 
                        if (revents & (LPOLLIN | LPOLLPRI)) {
                                /* Get the data out of the metadata file descriptor */
@@ -2491,11 +2448,9 @@ void *consumer_thread_metadata_poll(void *data)
                                consumer_del_metadata_stream(stream, metadata_ht);
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
                        /* Release RCU lock for the stream looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -2522,7 +2477,7 @@ error_testpoint:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i, err = -1;
+       int num_rdy, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = nullptr;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
@@ -2555,7 +2510,6 @@ void *consumer_thread_data_poll(void *data)
                health_code_update();
 
                high_prio = 0;
-               num_hup = 0;
 
                /*
                 * the fds set has been updated, we need to update our
@@ -2643,9 +2597,12 @@ void *consumer_thread_data_poll(void *data)
                        ssize_t pipe_readlen;
 
                        DBG("consumer_data_pipe wake up");
-                       pipe_readlen = lttng_pipe_read(
-                               ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < sizeof(new_stream)) {
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+                                                      &new_stream,
+                                                      sizeof(new_stream)); /* NOLINT sizeof used on
+                                                                              a pointer. */
+                       if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+                                                                 */
                                PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
@@ -2768,21 +2725,18 @@ void *consumer_thread_data_poll(void *data)
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = nullptr;
-                                       num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = nullptr;
-                                       num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = nullptr;
-                                       num_hup++;
                                }
                        }
                        if (local_stream[i] != nullptr) {
@@ -2825,21 +2779,16 @@ error_testpoint:
  */
 static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
 {
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                /*
                 * Protect against teardown with mutex.
                 */
@@ -2871,25 +2820,21 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
 }
 
 static void destroy_channel_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
-       int ret;
-
        if (ht == nullptr) {
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
-               ret = lttng_ht_del(ht, &iter);
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::wait_fd_node),
+                                                &lttng_consumer_channel::wait_fd_node>(*ht->ht)) {
+               const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
                LTTNG_ASSERT(ret != 0);
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -2991,19 +2936,20 @@ void *consumer_thread_channel_poll(void *data)
 
                                        switch (action) {
                                        case CONSUMER_CHANNEL_ADD:
+                                       {
                                                DBG("Adding channel %d to poll set", chan->wait_fd);
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                                       chan->wait_fd);
-                                               rcu_read_lock();
+                                               const lttng::urcu::read_lock_guard read_lock;
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                        &chan->wait_fd_node);
-                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                // FIXME: Empty flag on a pipe pollset, this might
                                                // hang on FreeBSD.
                                                lttng_poll_add(&events, chan->wait_fd, 0);
                                                break;
+                                       }
                                        case CONSUMER_CHANNEL_DEL:
                                        {
                                                /*
@@ -3016,10 +2962,9 @@ void *consumer_thread_channel_poll(void *data)
                                                 * GET_CHANNEL failed.
                                                 */
 
-                                               rcu_read_lock();
+                                               const lttng::urcu::read_lock_guard read_lock;
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
-                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64
                                                            " not found for del channel",
                                                            key);
@@ -3052,7 +2997,6 @@ void *consumer_thread_channel_poll(void *data)
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
-                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -3086,16 +3030,17 @@ void *consumer_thread_channel_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       const lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
                                lttng_ht_lookup(channel_ht, &tmp_id, &iter);
                        }
-                       node = lttng_ht_iter_get_node_u64(&iter);
+                       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
-                       chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
+                       chan = lttng::utils::container_of(node,
+                                                         &lttng_consumer_channel::wait_fd_node);
 
                        /* Check for error event */
                        if (revents & (LPOLLERR | LPOLLHUP)) {
@@ -3119,12 +3064,10 @@ void *consumer_thread_channel_poll(void *data)
                                }
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -3726,27 +3669,23 @@ error_nosignal:
  */
 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 {
-       struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd = nullptr;
-
-       ASSERT_RCU_READ_LOCKED();
-
        /* Iterate over all relayd since they are indexed by net_seq_idx. */
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+       for (auto *relayd :
+            lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+                                                decltype(consumer_relayd_sock_pair::node),
+                                                &consumer_relayd_sock_pair::node>(
+                    *the_consumer_data.relayd_ht->ht)) {
                /*
                 * Check by sessiond id which is unique here where the relayd session
                 * id might not be when having multiple relayd.
                 */
                if (relayd->sessiond_session_id == id) {
                        /* Found the relayd. There can be only one per id. */
-                       goto found;
+                       return relayd;
                }
        }
 
        return nullptr;
-
-found:
-       return relayd;
 }
 
 /*
@@ -3758,16 +3697,13 @@ found:
 int consumer_data_pending(uint64_t id)
 {
        int ret;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
+       const auto ht = the_consumer_data.stream_list_ht;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        int (*data_pending)(struct lttng_consumer_stream *);
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       rcu_read_lock();
-       pthread_mutex_lock(&the_consumer_data.lock);
+       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3782,18 +3718,12 @@ int consumer_data_pending(uint64_t id)
                abort();
        }
 
-       /* Ease our life a bit */
-       ht = the_consumer_data.stream_list_ht;
-
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&id, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &id,
-                                         &iter.iter,
-                                         stream,
-                                         node_session_id.node)
-       {
-               pthread_mutex_lock(&stream->lock);
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_session_id),
+                    &lttng_consumer_stream::node_session_id,
+                    std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                /*
                 * A removed node from the hash table indicates that the stream has
@@ -3806,35 +3736,30 @@ int consumer_data_pending(uint64_t id)
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
                                goto data_pending;
                        }
                }
-
-               pthread_mutex_unlock(&stream->lock);
        }
 
        relayd = find_relayd_by_session_id(id);
        if (relayd) {
                unsigned int is_data_inflight = 0;
 
+               const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
+
                /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
                if (ret < 0) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        /* Communication error thus the relayd so no data pending. */
                        goto data_not_pending;
                }
 
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
-               {
+               for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                            lttng_consumer_stream,
+                            decltype(lttng_consumer_stream::node_session_id),
+                            &lttng_consumer_stream::node_session_id,
+                            std::uint64_t>(
+                            *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                               stream->relayd_stream_id);
@@ -3845,13 +3770,11 @@ int consumer_data_pending(uint64_t id)
                        }
 
                        if (ret == 1) {
-                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_pending;
                        } else if (ret < 0) {
                                ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
                                    relayd->net_seq_idx);
                                lttng_consumer_cleanup_relayd(relayd);
-                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_not_pending;
                        }
                }
@@ -3859,7 +3782,6 @@ int consumer_data_pending(uint64_t id)
                /* Send end command for data pending. */
                ret = relayd_end_data_pending(
                        &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
                            relayd->net_seq_idx);
@@ -3879,14 +3801,10 @@ int consumer_data_pending(uint64_t id)
 
 data_not_pending:
        /* Data is available to be read by a viewer. */
-       pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 0;
 
 data_pending:
        /* Data is still being extracted from buffers. */
-       pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 1;
 }
 
@@ -3991,9 +3909,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                  uint64_t relayd_id)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
        struct lttng_dynamic_array stream_rotation_positions;
        uint64_t next_chunk_id, stream_count = 0;
        enum lttng_trace_chunk_status chunk_status;
@@ -4002,7 +3918,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        bool rotating_to_new_chunk = true;
        /* Array of `struct lttng_consumer_stream *` */
        struct lttng_dynamic_pointer_array streams_packet_to_open;
-       size_t stream_idx;
 
        ASSERT_RCU_READ_LOCKED();
 
@@ -4013,24 +3928,23 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                 nullptr);
        lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       rcu_read_lock();
+       const lttng::pthread::lock_guard channel_lock(channel->lock);
 
-       pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
        chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ret = -1;
-               goto end_unlock_channel;
+               goto end;
        }
 
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                unsigned long produced_pos = 0, consumed_pos = 0;
 
                health_code_update();
@@ -4038,7 +3952,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                /*
                 * Lock stream because we are about to change its state.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                if (stream->trace_chunk == stream->chan->trace_chunk) {
                        rotating_to_new_chunk = false;
@@ -4092,7 +4006,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                        ret = sample_stream_positions(
                                                stream, &produced_pos, &consumed_pos);
                                        if (ret) {
-                                               goto end_unlock_stream;
+                                               goto end;
                                        }
 
                                        /*
@@ -4141,26 +4055,26 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                    stream->key);
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
 
                ret = lttng_consumer_take_snapshot(stream);
                if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
                        ERR("Failed to sample snapshot position during channel rotation");
-                       goto end_unlock_stream;
+                       goto end;
                }
                if (!ret) {
                        ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
                        if (ret < 0) {
                                ERR("Failed to sample produced position during channel rotation");
-                               goto end_unlock_stream;
+                               goto end;
                        }
 
                        ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
                        if (ret < 0) {
                                ERR("Failed to sample consumed position during channel rotation");
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
                /*
@@ -4200,7 +4114,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
                            stream->key);
                        ret = -1;
-                       goto end_unlock_stream;
+                       goto end;
                }
                stream->rotate_position = stream->last_sequence_number + 1 +
                        ((produced_pos - consumed_pos) / stream->max_sb_size);
@@ -4223,7 +4137,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                              &position);
                        if (ret) {
                                ERR("Failed to allocate stream rotation position");
-                               goto end_unlock_stream;
+                               goto end;
                        }
                        stream_count++;
                }
@@ -4285,20 +4199,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        if (ret) {
                                PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
                                ret = -1;
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
-
-               pthread_mutex_unlock(&stream->lock);
        }
-       stream = nullptr;
 
        if (!is_local_trace) {
                relayd = consumer_find_relayd(relayd_id);
                if (!relayd) {
                        ERR("Failed to find relayd %" PRIu64, relayd_id);
                        ret = -1;
-                       goto end_unlock_channel;
+                       goto end;
                }
 
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -4312,21 +4223,19 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
                            relayd->net_seq_idx);
                        lttng_consumer_cleanup_relayd(relayd);
-                       goto end_unlock_channel;
+                       goto end;
                }
        }
 
-       for (stream_idx = 0;
+       for (std::size_t stream_idx = 0;
             stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
             stream_idx++) {
                enum consumer_stream_open_packet_status status;
-
-               stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
+               auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
                        &streams_packet_to_open, stream_idx);
 
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                status = consumer_stream_open_packet(stream);
-               pthread_mutex_unlock(&stream->lock);
                switch (status) {
                case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                        DBG("Opened a packet after a rotation: stream id = %" PRIu64
@@ -4350,22 +4259,14 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                        /* Logged by callee. */
                        ret = -1;
-                       goto end_unlock_channel;
+                       goto end;
                default:
                        abort();
                }
        }
 
-       pthread_mutex_unlock(&channel->lock);
        ret = 0;
-       goto end;
-
-end_unlock_stream:
-       pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
-       pthread_mutex_unlock(&channel->lock);
 end:
-       rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
        lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
@@ -4449,29 +4350,22 @@ error:
 
 static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
 {
-       int ret;
-       struct lttng_consumer_stream *stream;
+       const lttng::urcu::read_lock_guard read_lock;
+       const lttng::pthread::lock_guard channel_lock(channel->lock);
 
-       rcu_read_lock();
-       pthread_mutex_lock(&channel->lock);
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
-               pthread_mutex_lock(&stream->lock);
-               ret = consumer_clear_stream(stream);
+
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
+               const auto ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error_unlock;
+                       return ret;
                }
-               pthread_mutex_unlock(&stream->lock);
        }
-       pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
-       return 0;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
-       return ret;
+       return 0;
 }
 
 /*
@@ -4660,39 +4554,31 @@ error:
 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        ASSERT_RCU_READ_LOCKED();
 
-       rcu_read_lock();
-
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
-               pthread_mutex_lock(&stream->chan->lock);
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                if (!stream->rotate_ready) {
-                       pthread_mutex_unlock(&stream->lock);
-                       pthread_mutex_unlock(&stream->chan->lock);
                        continue;
                }
-               DBG("Consumer rotate ready stream %" PRIu64, stream->key);
 
+               DBG("Consumer rotate ready stream %" PRIu64, stream->key);
                ret = lttng_consumer_rotate_stream(stream);
-               pthread_mutex_unlock(&stream->lock);
-               pthread_mutex_unlock(&stream->chan->lock);
                if (ret) {
                        goto end;
                }
@@ -4701,7 +4587,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
        ret = 0;
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4742,8 +4627,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
        char creation_timestamp_buffer[ISO8601_STR_LEN];
        const char *relayd_id_str = "(none)";
        const char *creation_timestamp_str;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -4830,16 +4713,15 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                goto error;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(
-               the_consumer_data.channels_by_session_id_ht->ht,
-               the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
-               the_consumer_data.channels_by_session_id_ht->match_fct,
-               &session_id,
-               &iter.iter,
-               channel,
-               channels_by_session_id_ht_node.node)
-       {
+       for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_channel,
+                    decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
+                    &lttng_consumer_channel::channels_by_session_id_ht_node,
+                    std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
+                                   &session_id,
+                                   the_consumer_data.channels_by_session_id_ht->hash_fct(
+                                           &session_id, lttng_ht_seed),
+                                   the_consumer_data.channels_by_session_id_ht->match_fct)) {
                ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
                if (ret) {
                        /*
@@ -4907,7 +4789,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 error:
        /* Release the reference returned by the "publish" operation. */
        lttng_trace_chunk_put(published_chunk);
@@ -4928,8 +4809,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
        const char *close_command_name = "none";
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
        enum lttng_trace_chunk_status chunk_status;
 
        if (relayd_id) {
@@ -4983,8 +4862,11 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * it; it is only kept around to compare it (by address) to the
         * current chunk found in the session's channels.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::node),
+                                                &lttng_consumer_channel::node>(
+                    *the_consumer_data.channel_ht->ht)) {
                int ret;
 
                /*
@@ -5026,7 +4908,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 end:
        /*
         * Release the reference returned by the "find" operation and
@@ -5048,6 +4929,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        const bool is_local_trace = !relayd_id;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        bool chunk_exists_local, chunk_exists_remote;
+       const lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -5080,7 +4962,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
                goto end;
        }
 
-       rcu_read_lock();
        relayd = consumer_find_relayd(*relayd_id);
        if (!relayd) {
                ERR("Failed to find relayd %" PRIu64, *relayd_id);
@@ -5102,50 +4983,38 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
 
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret_code;
 }
 
 static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
 {
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
        int ret;
-
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                /*
                 * Protect against teardown with mutex.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
+                       continue;
                }
+
                ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error_unlock;
+                       return ret;
                }
-       next:
-               pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
-       return LTTCOMM_CONSUMERD_SUCCESS;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       rcu_read_unlock();
-       return ret;
+       return LTTCOMM_CONSUMERD_SUCCESS;
 }
 
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
@@ -5176,22 +5045,22 @@ end:
 
 enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
 {
-       struct lttng_consumer_stream *stream;
        enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
 
        if (channel->metadata_stream) {
                ERR("Open channel packets command attempted on a metadata channel");
-               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
-               goto end;
+               return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
        }
 
-       rcu_read_lock();
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       const lttng::urcu::read_lock_guard read_lock;
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                enum consumer_stream_open_packet_status status;
 
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
+                       continue;
                }
 
                status = consumer_stream_open_packet(stream);
@@ -5223,24 +5092,13 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum
                            channel->key,
                            channel->name,
                            channel->session_id);
-                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
-                       goto error_unlock;
+                       return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
                default:
                        abort();
                }
-
-       next:
-               pthread_mutex_unlock(&stream->lock);
        }
 
-end_rcu_unlock:
-       rcu_read_unlock();
-end:
        return ret;
-
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       goto end_rcu_unlock;
 }
 
 void lttng_consumer_sigbus_handle(void *addr)
This page took 0.047765 seconds and 4 git commands to generate.