Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / common / consumer / consumer.cpp
index b059eaf52c58397166b564988136dbd59b1b0036..1da243601cd7ff3c72fc8febb5d6c11f2f150c4b 100644 (file)
@@ -20,6 +20,7 @@
 #include <common/dynamic-array.hpp>
 #include <common/index/ctf-index.hpp>
 #include <common/index/index.hpp>
+#include <common/io-hint.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/time.hpp>
 #include <common/trace-chunk-registry.hpp>
 #include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
 #include <common/ust-consumer/ust-consumer.hpp>
 #include <common/utils.hpp>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -42,6 +45,7 @@
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <type_traits>
 #include <unistd.h>
 
 lttng_consumer_global_data the_consumer_data;
@@ -79,7 +83,7 @@ int data_consumption_paused;
  */
 int consumer_quit;
 
-static const char *get_consumer_domain(void)
+static const char *get_consumer_domain()
 {
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -100,11 +104,12 @@ static const char *get_consumer_domain(void)
  */
 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
 {
-       struct lttng_consumer_stream *null_stream = NULL;
+       struct lttng_consumer_stream *null_stream = nullptr;
 
        LTTNG_ASSERT(pipe);
 
-       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+                                                                            pointer. */
 }
 
 static void notify_health_quit_pipe(int *pipe)
@@ -138,7 +143,7 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
 
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
 {
-       notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+       notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
 }
 
 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
@@ -173,14 +178,7 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
 
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
-               /*
-                * Once a stream is added to this list, the buffers were created so we
-                * have a guarantee that this call will succeed. Setting the monitor
-                * mode to 0 so we don't lock nor try to delete the stream from the
-                * global hash table.
-                */
-               stream->monitor = 0;
-               consumer_stream_destroy(stream, NULL);
+               consumer_stream_destroy(stream, nullptr);
        }
 }
 
@@ -192,25 +190,23 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
 {
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
-       struct lttng_consumer_stream *stream = NULL;
+       struct lttng_consumer_stream *stream = nullptr;
 
        LTTNG_ASSERT(ht);
 
        /* -1ULL keys are lookup failures */
        if (key == (uint64_t) -1ULL) {
-               return NULL;
+               return nullptr;
        }
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        lttng_ht_lookup(ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
+       if (node != nullptr) {
                stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
 
-       rcu_read_unlock();
-
        return stream;
 }
 
@@ -218,7 +214,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        stream = find_stream(key, ht);
        if (stream) {
                stream->key = (uint64_t) -1ULL;
@@ -229,7 +225,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
                 */
                stream->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -242,18 +237,18 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
 {
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
-       struct lttng_consumer_channel *channel = NULL;
+       struct lttng_consumer_channel *channel = nullptr;
 
        ASSERT_RCU_READ_LOCKED();
 
        /* -1ULL keys are lookup failures */
        if (key == (uint64_t) -1ULL) {
-               return NULL;
+               return nullptr;
        }
 
        lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
+       if (node != nullptr) {
                channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
        }
 
@@ -272,7 +267,7 @@ static void steal_channel_key(uint64_t key)
 {
        struct lttng_consumer_channel *channel;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(key);
        if (channel) {
                channel->key = (uint64_t) -1ULL;
@@ -283,7 +278,6 @@ static void steal_channel_key(uint64_t key)
                 */
                channel->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 static void free_channel_rcu(struct rcu_head *head)
@@ -303,7 +297,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -338,7 +333,7 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
        int ret;
        struct lttng_ht_iter iter;
 
-       if (relayd == NULL) {
+       if (relayd == nullptr) {
                return;
        }
 
@@ -398,12 +393,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        }
 
        lttng_trace_chunk_put(channel->trace_chunk);
-       channel->trace_chunk = NULL;
+       channel->trace_chunk = nullptr;
 
        if (channel->is_published) {
                int ret;
 
-               rcu_read_lock();
+               lttng::urcu::read_lock_guard read_lock;
                iter.iter.node = &channel->node.node;
                ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
                LTTNG_ASSERT(!ret);
@@ -411,7 +406,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                iter.iter.node = &channel->channels_by_session_id_ht_node.node;
                ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
                LTTNG_ASSERT(!ret);
-               rcu_read_unlock();
        }
 
        channel->is_deleted = true;
@@ -425,19 +419,20 @@ end:
  * Iterate over the relayd hash table and destroy each element. Finally,
  * destroy the whole hash table.
  */
-static void cleanup_relayd_ht(void)
+static void cleanup_relayd_ht()
 {
        struct lttng_ht_iter iter;
        struct consumer_relayd_sock_pair *relayd;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
-               consumer_destroy_relayd(relayd);
+               cds_lfht_for_each_entry (
+                       the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+                       consumer_destroy_relayd(relayd);
+               }
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.relayd_ht);
 }
 
@@ -456,12 +451,14 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
 
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Let's begin with metadata */
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
@@ -473,7 +470,6 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
                }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -580,7 +576,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
@@ -613,7 +609,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        the_consumer_data.stream_count++;
        the_consumer_data.need_update = 1;
 
-       rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
@@ -635,7 +630,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
+       if (node != nullptr) {
                goto end;
        }
        lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
@@ -649,7 +644,7 @@ end:
  */
 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
 {
-       struct consumer_relayd_sock_pair *obj = NULL;
+       struct consumer_relayd_sock_pair *obj = nullptr;
 
        /* net sequence index of -1 is a failure */
        if (net_seq_idx == (uint64_t) -1ULL) {
@@ -657,7 +652,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint
        }
 
        obj = zmalloc<consumer_relayd_sock_pair>();
-       if (obj == NULL) {
+       if (obj == nullptr) {
                PERROR("zmalloc relayd sock");
                goto error;
        }
@@ -668,7 +663,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint
        obj->control_sock.sock.fd = -1;
        obj->data_sock.sock.fd = -1;
        lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
-       pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
+       pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
 
 error:
        return obj;
@@ -685,7 +680,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
 {
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
 
        ASSERT_RCU_READ_LOCKED();
 
@@ -696,7 +691,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
+       if (node != nullptr) {
                relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
        }
 
@@ -719,9 +714,9 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
        LTTNG_ASSERT(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
+       if (relayd != nullptr) {
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_add_stream(&relayd->control_sock,
@@ -756,7 +751,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
            stream->net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -773,9 +767,9 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        LTTNG_ASSERT(net_seq_idx != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(net_seq_idx);
-       if (relayd != NULL) {
+       if (relayd != nullptr) {
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_streams_sent(&relayd->control_sock);
@@ -796,7 +790,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -808,12 +801,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -1010,8 +1002,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                                                         const char *root_shm_path,
                                                         const char *shm_path)
 {
-       struct lttng_consumer_channel *channel = NULL;
-       struct lttng_trace_chunk *trace_chunk = NULL;
+       struct lttng_consumer_channel *channel = nullptr;
+       struct lttng_trace_chunk *trace_chunk = nullptr;
 
        if (chunk_id) {
                trace_chunk = lttng_trace_chunk_registry_find_chunk(
@@ -1022,9 +1014,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == NULL) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1050,8 +1044,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
-               channel = NULL;
+               delete channel;
+               channel = nullptr;
                goto end;
        }
 
@@ -1103,7 +1097,7 @@ end:
        return channel;
 error:
        consumer_del_channel(channel);
-       channel = NULL;
+       channel = nullptr;
        goto end;
 }
 
@@ -1126,11 +1120,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
         */
        steal_channel_key(channel->key);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
        lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
                         &channel->channels_by_session_id_ht_node);
-       rcu_read_unlock();
        channel->is_published = true;
 
        pthread_mutex_unlock(&channel->timer_lock);
@@ -1168,35 +1161,34 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 
        DBG("Updating poll fd array");
        *nb_inactive_fd = 0;
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Only active streams with an active end point can be added to the
-                * poll set and local stream storage of the thread.
-                *
-                * There is a potential race here for endpoint_status to be updated
-                * just after the check. However, this is OK since the stream(s) will
-                * be deleted once the thread is notified that the end point state has
-                * changed where this function will be called back again.
-                *
-                * We track the number of inactive FDs because they still need to be
-                * closed by the polling thread after a wakeup on the data_pipe or
-                * metadata_pipe.
-                */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
-                       (*nb_inactive_fd)++;
-                       continue;
+
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Only active streams with an active end point can be added to the
+                        * poll set and local stream storage of the thread.
+                        *
+                        * There is a potential race here for endpoint_status to be updated
+                        * just after the check. However, this is OK since the stream(s) will
+                        * be deleted once the thread is notified that the end point state has
+                        * changed where this function will be called back again.
+                        *
+                        * We track the number of inactive FDs because they still need to be
+                        * closed by the polling thread after a wakeup on the data_pipe or
+                        * metadata_pipe.
+                        */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                               (*nb_inactive_fd)++;
+                               continue;
+                       }
+
+                       (*pollfd)[i].fd = stream->wait_fd;
+                       (*pollfd)[i].events = POLLIN | POLLPRI;
+                       local_stream[i] = stream;
+                       i++;
                }
-               /*
-                * This clobbers way too much the debug output. Uncomment that if you
-                * need it for debugging purposes.
-                */
-               (*pollfd)[i].fd = stream->wait_fd;
-               (*pollfd)[i].events = POLLIN | POLLPRI;
-               local_stream[i] = stream;
-               i++;
        }
-       rcu_read_unlock();
 
        /*
         * Insert the consumer_data_pipe at the end of the array and don't
@@ -1257,11 +1249,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx,
  * Send return code to the session daemon.
  * If the socket is not defined, we return 0, it is not a fatal error
  */
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+                             enum lttcomm_return_code error_code)
 {
        if (ctx->consumer_error_socket > 0) {
+               const std::int32_t comm_code = std::int32_t(error_code);
+
+               static_assert(
+                       sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+                       "Fixed-size communication type too small to accomodate lttcomm_return_code");
                return lttcomm_send_unix_sock(
-                       ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+                       ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
        }
 
        return 0;
@@ -1271,20 +1269,21 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
  * Close all the tracefiles and stream fds and MUST be called when all
  * instances are destroyed i.e. when all threads were joined and are ended.
  */
-void lttng_consumer_cleanup(void)
+void lttng_consumer_cleanup()
 {
        struct lttng_ht_iter iter;
        struct lttng_consumer_channel *channel;
        unsigned int trace_chunks_left;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-               consumer_del_channel(channel);
+               cds_lfht_for_each_entry (
+                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+                       consumer_del_channel(channel);
+               }
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.channel_ht);
        lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
 
@@ -1344,7 +1343,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
  */
 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
 {
-       int ret;
        int outfd = stream->out_fd;
 
        /*
@@ -1356,31 +1354,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
        if (orig_offset < stream->max_sb_size) {
                return;
        }
-       lttng_sync_file_range(outfd,
-                             orig_offset - stream->max_sb_size,
-                             stream->max_sb_size,
-                             SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
-                                     SYNC_FILE_RANGE_WAIT_AFTER);
-       /*
-        * Give hints to the kernel about how we access the file:
-        * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-        * we write it.
-        *
-        * We need to call fadvise again after the file grows because the
-        * kernel does not seem to apply fadvise to non-existing parts of the
-        * file.
-        *
-        * Call fadvise _after_ having waited for the page writeback to
-        * complete because the dirty page writeback semantic is not well
-        * defined. So it can be expected to lead to lower throughput in
-        * streaming.
-        */
-       ret = posix_fadvise(
-               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
-       if (ret && ret != -ENOSYS) {
-               errno = ret;
-               PERROR("posix_fadvise on fd %i", outfd);
-       }
+       lttng::io::hint_flush_range_dont_need_sync(
+               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
 }
 
 /*
@@ -1414,14 +1389,14 @@ lttng_consumer_create(enum lttng_consumer_type type,
        the_consumer_data.type = type;
 
        ctx = zmalloc<lttng_consumer_local_data>();
-       if (ctx == NULL) {
+       if (ctx == nullptr) {
                PERROR("allocating context");
                goto error;
        }
 
        ctx->consumer_error_socket = -1;
        ctx->consumer_metadata_socket = -1;
-       pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
+       pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
        /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
        ctx->on_recv_channel = recv_channel;
@@ -1470,7 +1445,7 @@ error_wakeup_pipe:
 error_poll_pipe:
        free(ctx);
 error:
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -1481,19 +1456,20 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
-       if (ht == NULL) {
+       if (ht == nullptr) {
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_stream(stream, ht);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Ignore return value since we are currently cleaning up so any error
+                        * can't be handled.
+                        */
+                       (void) consumer_del_stream(stream, ht);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1507,19 +1483,20 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht)
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
-       if (ht == NULL) {
+       if (ht == nullptr) {
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_metadata_stream(stream, ht);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Ignore return value since we are currently cleaning up so any error
+                        * can't be handled.
+                        */
+                       (void) consumer_del_metadata_stream(stream, ht);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1615,19 +1592,19 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
        unsigned int relayd_hang_up = 0;
        const size_t subbuf_content_size = buffer->size - padding;
        size_t write_len;
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
-               if (relayd == NULL) {
+               if (relayd == nullptr) {
                        ret = -EPIPE;
                        goto end;
                }
@@ -1739,8 +1716,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(
-                       outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+               lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
                stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
@@ -1761,7 +1737,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1783,7 +1758,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        int fd = stream->wait_fd;
        /* Default is on the disk */
        int outfd = stream->out_fd;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
        int *splice_pipe;
        unsigned int relayd_hang_up = 0;
 
@@ -1800,12 +1775,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        }
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
-               if (relayd == NULL) {
+               if (relayd == nullptr) {
                        written = -ret;
                        goto end;
                }
@@ -1886,7 +1861,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
                    fd,
                    splice_pipe[1]);
                ret_splice = splice(
-                       fd, &offset, splice_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE);
+                       fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe, ret %zd", ret_splice);
                if (ret_splice < 0) {
                        ret = errno;
@@ -1912,9 +1887,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
 
                /* Splice data out */
                ret_splice = splice(splice_pipe[0],
-                                   NULL,
+                                   nullptr,
                                    outfd,
-                                   NULL,
+                                   nullptr,
                                    ret_splice,
                                    SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
@@ -1940,8 +1915,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
                        /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(
-                               outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+                       lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
                        stream->out_fd_offset += ret_splice;
                }
                stream->output_written += ret_splice;
@@ -1983,7 +1957,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return written;
 }
 
@@ -2083,7 +2056,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
 }
 
-static void lttng_consumer_close_all_metadata(void)
+static void lttng_consumer_close_all_metadata()
 {
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -2114,7 +2087,7 @@ static void lttng_consumer_close_all_metadata(void)
  */
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
 {
-       struct lttng_consumer_channel *channel = NULL;
+       struct lttng_consumer_channel *channel = nullptr;
        bool free_channel = false;
 
        LTTNG_ASSERT(stream);
@@ -2154,14 +2127,15 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
                /* Go for channel deletion! */
                free_channel = true;
        }
-       stream->chan = NULL;
+       stream->chan = nullptr;
 
        /*
         * Nullify the stream reference so it is not used after deletion. The
         * channel lock MUST be acquired before being able to check for a NULL
         * pointer value.
         */
-       channel->metadata_stream = NULL;
+       channel->metadata_stream = nullptr;
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
@@ -2175,7 +2149,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
        }
 
        lttng_trace_chunk_put(stream->trace_chunk);
-       stream->trace_chunk = NULL;
+       stream->trace_chunk = nullptr;
        consumer_stream_free(stream);
 }
 
@@ -2204,7 +2178,7 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         * after this point.
         */
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /*
         * Lookup the stream just to make sure it does not exist in our internal
@@ -2238,8 +2212,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         */
        lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
 
-       rcu_read_unlock();
-
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
@@ -2249,23 +2221,25 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 /*
  * Delete data stream that are flagged for deletion (endpoint_status).
  */
-static void validate_endpoint_status_data_stream(void)
+static void validate_endpoint_status_data_stream()
 {
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
        DBG("Consumer delete flagged data stream");
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
-               /* Validate delete flag of the stream */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                       continue;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+                       /* Validate delete flag of the stream */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                               continue;
+                       }
+                       /* Delete it right now */
+                       consumer_del_stream(stream, data_ht);
                }
-               /* Delete it right now */
-               consumer_del_stream(stream, data_ht);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2280,22 +2254,23 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po
 
        LTTNG_ASSERT(pollset);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
-               /* Validate delete flag of the stream */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                       continue;
-               }
-               /*
-                * Remove from pollset so the metadata thread can continue without
-                * blocking on a deleted stream.
-                */
-               lttng_poll_del(pollset, stream->wait_fd);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+                       /* Validate delete flag of the stream */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                               continue;
+                       }
+                       /*
+                        * Remove from pollset so the metadata thread can continue without
+                        * blocking on a deleted stream.
+                        */
+                       lttng_poll_del(pollset, stream->wait_fd);
 
-               /* Delete it right now */
-               consumer_del_metadata_stream(stream, metadata_ht);
+                       /* Delete it right now */
+                       consumer_del_metadata_stream(stream, metadata_ht);
+               }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2306,7 +2281,7 @@ void *consumer_thread_metadata_poll(void *data)
 {
        int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
-       struct lttng_consumer_stream *stream = NULL;
+       struct lttng_consumer_stream *stream = nullptr;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
        struct lttng_poll_event events;
@@ -2340,7 +2315,7 @@ void *consumer_thread_metadata_poll(void *data)
        /* Main loop */
        DBG("Metadata main loop started");
 
-       while (1) {
+       while (true) {
        restart:
                health_code_update();
                health_poll_entry();
@@ -2375,8 +2350,11 @@ void *consumer_thread_metadata_poll(void *data)
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                                   &stream,
-                                                                  sizeof(stream));
-                                       if (pipe_len < sizeof(stream)) {
+                                                                  sizeof(stream)); /* NOLINT sizeof
+                                                                                      used on a
+                                                                                      pointer. */
+                                       if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+                                                                           pointer. */
                                                if (pipe_len < 0) {
                                                        PERROR("read metadata stream");
                                                }
@@ -2393,7 +2371,7 @@ void *consumer_thread_metadata_poll(void *data)
                                        }
 
                                        /* A NULL stream means that the state has changed. */
-                                       if (stream == NULL) {
+                                       if (stream == nullptr) {
                                                /* Check for deleted streams. */
                                                validate_endpoint_status_metadata_stream(&events);
                                                goto restart;
@@ -2427,7 +2405,7 @@ void *consumer_thread_metadata_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
@@ -2491,11 +2469,9 @@ void *consumer_thread_metadata_poll(void *data)
                                consumer_del_metadata_stream(stream, metadata_ht);
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
                        /* Release RCU lock for the stream looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -2513,7 +2489,7 @@ error_testpoint:
        }
        health_unregister(health_consumerd);
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -2522,10 +2498,10 @@ error_testpoint:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i, err = -1;
-       struct pollfd *pollfd = NULL;
+       int num_rdy, high_prio, ret, i, err = -1;
+       struct pollfd *pollfd = nullptr;
        /* local view of the streams */
-       struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
+       struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
        /* 2 for the consumer_data_pipe and wake up pipe */
@@ -2546,16 +2522,15 @@ void *consumer_thread_data_poll(void *data)
        health_code_update();
 
        local_stream = zmalloc<lttng_consumer_stream *>();
-       if (local_stream == NULL) {
+       if (local_stream == nullptr) {
                PERROR("local_stream malloc");
                goto end;
        }
 
-       while (1) {
+       while (true) {
                health_code_update();
 
                high_prio = 0;
-               num_hup = 0;
 
                /*
                 * the fds set has been updated, we need to update our
@@ -2564,15 +2539,15 @@ void *consumer_thread_data_poll(void *data)
                pthread_mutex_lock(&the_consumer_data.lock);
                if (the_consumer_data.need_update) {
                        free(pollfd);
-                       pollfd = NULL;
+                       pollfd = nullptr;
 
                        free(local_stream);
-                       local_stream = NULL;
+                       local_stream = nullptr;
 
                        /* Allocate for all fds */
                        pollfd =
                                calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
-                       if (pollfd == NULL) {
+                       if (pollfd == nullptr) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&the_consumer_data.lock);
                                goto end;
@@ -2580,7 +2555,7 @@ void *consumer_thread_data_poll(void *data)
 
                        local_stream = calloc<lttng_consumer_stream *>(
                                the_consumer_data.stream_count + nb_pipes_fd);
-                       if (local_stream == NULL) {
+                       if (local_stream == nullptr) {
                                PERROR("local_stream malloc");
                                pthread_mutex_unlock(&the_consumer_data.lock);
                                goto end;
@@ -2643,9 +2618,12 @@ void *consumer_thread_data_poll(void *data)
                        ssize_t pipe_readlen;
 
                        DBG("consumer_data_pipe wake up");
-                       pipe_readlen = lttng_pipe_read(
-                               ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < sizeof(new_stream)) {
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+                                                      &new_stream,
+                                                      sizeof(new_stream)); /* NOLINT sizeof used on
+                                                                              a pointer. */
+                       if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+                                                                 */
                                PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
@@ -2656,7 +2634,7 @@ void *consumer_thread_data_poll(void *data)
                         * the sessiond poll thread changed the consumer_quit state and is
                         * waking us up to test it.
                         */
-                       if (new_stream == NULL) {
+                       if (new_stream == nullptr) {
                                validate_endpoint_status_data_stream();
                                continue;
                        }
@@ -2683,7 +2661,7 @@ void *consumer_thread_data_poll(void *data)
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
 
-                       if (local_stream[i] == NULL) {
+                       if (local_stream[i] == nullptr) {
                                continue;
                        }
                        if (pollfd[i].revents & POLLPRI) {
@@ -2694,7 +2672,7 @@ void *consumer_thread_data_poll(void *data)
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
                                        consumer_del_stream(local_stream[i], data_ht);
-                                       local_stream[i] = NULL;
+                                       local_stream[i] = nullptr;
                                } else if (len > 0) {
                                        local_stream[i]->has_data_left_to_be_read_before_teardown =
                                                1;
@@ -2714,7 +2692,7 @@ void *consumer_thread_data_poll(void *data)
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
 
-                       if (local_stream[i] == NULL) {
+                       if (local_stream[i] == nullptr) {
                                continue;
                        }
                        if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
@@ -2725,7 +2703,7 @@ void *consumer_thread_data_poll(void *data)
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
                                        consumer_del_stream(local_stream[i], data_ht);
-                                       local_stream[i] = NULL;
+                                       local_stream[i] = nullptr;
                                } else if (len > 0) {
                                        local_stream[i]->has_data_left_to_be_read_before_teardown =
                                                1;
@@ -2737,7 +2715,7 @@ void *consumer_thread_data_poll(void *data)
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
 
-                       if (local_stream[i] == NULL) {
+                       if (local_stream[i] == nullptr) {
                                continue;
                        }
                        if (!local_stream[i]->hangup_flush_done &&
@@ -2767,25 +2745,22 @@ void *consumer_thread_data_poll(void *data)
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
-                                       local_stream[i] = NULL;
-                                       num_hup++;
+                                       local_stream[i] = nullptr;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
-                                       local_stream[i] = NULL;
-                                       num_hup++;
+                                       local_stream[i] = nullptr;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
-                                       local_stream[i] = NULL;
-                                       num_hup++;
+                                       local_stream[i] = nullptr;
                                }
                        }
-                       if (local_stream[i] != NULL) {
+                       if (local_stream[i] != nullptr) {
                                local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
                        }
                }
@@ -2815,7 +2790,7 @@ error_testpoint:
        health_unregister(health_consumerd);
 
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -2831,7 +2806,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -2871,7 +2846,6 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
 }
 
 static void destroy_channel_ht(struct lttng_ht *ht)
@@ -2880,16 +2854,18 @@ static void destroy_channel_ht(struct lttng_ht *ht)
        struct lttng_consumer_channel *channel;
        int ret;
 
-       if (ht == NULL) {
+       if (ht == nullptr) {
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
-               ret = lttng_ht_del(ht, &iter);
-               LTTNG_ASSERT(ret != 0);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
+                       ret = lttng_ht_del(ht, &iter);
+                       LTTNG_ASSERT(ret != 0);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -2905,7 +2881,7 @@ void *consumer_thread_channel_poll(void *data)
 {
        int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
-       struct lttng_consumer_channel *chan = NULL;
+       struct lttng_consumer_channel *chan = nullptr;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
        struct lttng_poll_event events;
@@ -2945,7 +2921,7 @@ void *consumer_thread_channel_poll(void *data)
        /* Main loop */
        DBG("Channel main loop started");
 
-       while (1) {
+       while (true) {
        restart:
                health_code_update();
                DBG("Channel poll wait");
@@ -2991,19 +2967,20 @@ void *consumer_thread_channel_poll(void *data)
 
                                        switch (action) {
                                        case CONSUMER_CHANNEL_ADD:
+                                       {
                                                DBG("Adding channel %d to poll set", chan->wait_fd);
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                                       chan->wait_fd);
-                                               rcu_read_lock();
+                                               lttng::urcu::read_lock_guard read_lock;
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                        &chan->wait_fd_node);
-                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                // FIXME: Empty flag on a pipe pollset, this might
                                                // hang on FreeBSD.
                                                lttng_poll_add(&events, chan->wait_fd, 0);
                                                break;
+                                       }
                                        case CONSUMER_CHANNEL_DEL:
                                        {
                                                /*
@@ -3016,10 +2993,9 @@ void *consumer_thread_channel_poll(void *data)
                                                 * GET_CHANNEL failed.
                                                 */
 
-                                               rcu_read_lock();
+                                               lttng::urcu::read_lock_guard read_lock;
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
-                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64
                                                            " not found for del channel",
                                                            key);
@@ -3052,7 +3028,6 @@ void *consumer_thread_channel_poll(void *data)
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
-                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -3086,7 +3061,7 @@ void *consumer_thread_channel_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
@@ -3119,12 +3094,10 @@ void *consumer_thread_channel_poll(void *data)
                                }
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -3143,7 +3116,7 @@ error_testpoint:
        }
        health_unregister(health_consumerd);
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
@@ -3266,7 +3239,7 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].fd = sock;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
-       while (1) {
+       while (true) {
                health_code_update();
 
                health_poll_entry();
@@ -3323,7 +3296,7 @@ end:
         */
        notify_thread_lttng_pipe(ctx->consumer_data_pipe);
 
-       notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
+       notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
 
        notify_health_quit_pipe(health_quit_pipe);
 
@@ -3349,7 +3322,7 @@ error_testpoint:
        health_unregister(health_consumerd);
 
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 static int post_consume(struct lttng_consumer_stream *stream,
@@ -3503,7 +3476,7 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
 /*
  * Allocate and set consumer data hash tables.
  */
-int lttng_consumer_init(void)
+int lttng_consumer_init()
 {
        the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!the_consumer_data.channel_ht) {
@@ -3570,7 +3543,7 @@ void consumer_add_relayd_socket(uint64_t net_seq_idx,
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(sock >= 0);
@@ -3580,11 +3553,11 @@ void consumer_add_relayd_socket(uint64_t net_seq_idx,
 
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
-       if (relayd == NULL) {
+       if (relayd == nullptr) {
                LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
-               if (relayd == NULL) {
+               if (relayd == nullptr) {
                        ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                } else {
@@ -3727,7 +3700,7 @@ error_nosignal:
 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 {
        struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
 
        ASSERT_RCU_READ_LOCKED();
 
@@ -3743,7 +3716,7 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
                }
        }
 
-       return NULL;
+       return nullptr;
 
 found:
        return relayd;
@@ -3761,12 +3734,12 @@ int consumer_data_pending(uint64_t id)
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
        int (*data_pending)(struct lttng_consumer_stream *);
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
@@ -3880,13 +3853,11 @@ int consumer_data_pending(uint64_t id)
 data_not_pending:
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 0;
 
 data_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 1;
 }
 
@@ -3998,7 +3969,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        uint64_t next_chunk_id, stream_count = 0;
        enum lttng_trace_chunk_status chunk_status;
        const bool is_local_trace = relayd_id == -1ULL;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
        bool rotating_to_new_chunk = true;
        /* Array of `struct lttng_consumer_stream *` */
        struct lttng_dynamic_pointer_array streams_packet_to_open;
@@ -4008,11 +3979,12 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
-       lttng_dynamic_array_init(
-               &stream_rotation_positions, sizeof(struct relayd_stream_rotation_position), NULL);
-       lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
+       lttng_dynamic_array_init(&stream_rotation_positions,
+                                sizeof(struct relayd_stream_rotation_position),
+                                nullptr);
+       lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
@@ -4107,7 +4079,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                chunk_status = lttng_trace_chunk_get_name(
                                                        stream->trace_chunk,
                                                        &trace_chunk_name,
-                                                       NULL);
+                                                       nullptr);
                                                if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
                                                        trace_chunk_name = "none";
                                                }
@@ -4290,7 +4262,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
 
                pthread_mutex_unlock(&stream->lock);
        }
-       stream = NULL;
+       stream = nullptr;
 
        if (!is_local_trace) {
                relayd = consumer_find_relayd(relayd_id);
@@ -4303,7 +4275,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_rotate_streams(&relayd->control_sock,
                                            stream_count,
-                                           rotating_to_new_chunk ? &next_chunk_id : NULL,
+                                           rotating_to_new_chunk ? &next_chunk_id : nullptr,
                                            (const struct relayd_stream_rotation_position *)
                                                    stream_rotation_positions.buffer.data);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
@@ -4364,7 +4336,6 @@ end_unlock_stream:
 end_unlock_channel:
        pthread_mutex_unlock(&channel->lock);
 end:
-       rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
        lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
@@ -4427,7 +4398,7 @@ static int consumer_clear_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
-       ret = consumer_stream_flush_buffer(stream, 1);
+       ret = consumer_stream_flush_buffer(stream, true);
        if (ret < 0) {
                ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
                ret = LTTCOMM_CONSUMERD_FATAL;
@@ -4451,7 +4422,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
        int ret;
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&channel->lock);
        cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                health_code_update();
@@ -4463,13 +4434,11 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
                pthread_mutex_unlock(&stream->lock);
        }
        pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
        return 0;
 
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4561,7 +4530,7 @@ static int rotate_local_stream(struct lttng_consumer_stream *stream)
 
        if (stream->index_file) {
                lttng_index_file_put(stream->index_file);
-               stream->index_file = NULL;
+               stream->index_file = nullptr;
        }
 
        if (!stream->trace_chunk) {
@@ -4599,7 +4568,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
                 * parent channel, becomes part of no chunk and can't output
                 * anything until a new trace chunk is created.
                 */
-               stream->trace_chunk = NULL;
+               stream->trace_chunk = nullptr;
        } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
                ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
                ret = -1;
@@ -4665,7 +4634,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
 
        ASSERT_RCU_READ_LOCKED();
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
@@ -4700,7 +4669,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
        ret = 0;
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4735,7 +4703,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
 {
        int ret;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-       struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
+       struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
        enum lttng_trace_chunk_status chunk_status;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        char creation_timestamp_buffer[ISO8601_STR_LEN];
@@ -4784,7 +4752,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
         * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
         * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
         */
-       created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, NULL);
+       created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
        if (!created_chunk) {
                ERR("Failed to create trace chunk");
                ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
@@ -4811,7 +4779,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                 * directory.
                 */
                chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
-               chunk_directory_handle = NULL;
+               chunk_directory_handle = nullptr;
                if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        ERR("Failed to set trace chunk's directory handle");
                        ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
@@ -4822,53 +4790,57 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
        published_chunk = lttng_trace_chunk_registry_publish_chunk(
                the_consumer_data.chunk_registry, session_id, created_chunk);
        lttng_trace_chunk_put(created_chunk);
-       created_chunk = NULL;
+       created_chunk = nullptr;
        if (!published_chunk) {
                ERR("Failed to publish trace chunk");
                ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
                goto error;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(
-               the_consumer_data.channels_by_session_id_ht->ht,
-               the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
-               the_consumer_data.channels_by_session_id_ht->match_fct,
-               &session_id,
-               &iter.iter,
-               channel,
-               channels_by_session_id_ht_node.node)
        {
-               ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
-               if (ret) {
-                       /*
-                        * Roll-back the creation of this chunk.
-                        *
-                        * This is important since the session daemon will
-                        * assume that the creation of this chunk failed and
-                        * will never ask for it to be closed, resulting
-                        * in a leak and an inconsistent state for some
-                        * channels.
-                        */
-                       enum lttcomm_return_code close_ret;
-                       char path[LTTNG_PATH_MAX];
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry_duplicate(
+                       the_consumer_data.channels_by_session_id_ht->ht,
+                       the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
+                                                                             lttng_ht_seed),
+                       the_consumer_data.channels_by_session_id_ht->match_fct,
+                       &session_id,
+                       &iter.iter,
+                       channel,
+                       channels_by_session_id_ht_node.node)
+               {
+                       ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+                       if (ret) {
+                               /*
+                                * Roll-back the creation of this chunk.
+                                *
+                                * This is important since the session daemon will
+                                * assume that the creation of this chunk failed and
+                                * will never ask for it to be closed, resulting
+                                * in a leak and an inconsistent state for some
+                                * channels.
+                                */
+                               enum lttcomm_return_code close_ret;
+                               char path[LTTNG_PATH_MAX];
+
+                               DBG("Failed to set new trace chunk on existing channels, rolling back");
+                               close_ret =
+                                       lttng_consumer_close_trace_chunk(relayd_id,
+                                                                        session_id,
+                                                                        chunk_id,
+                                                                        chunk_creation_timestamp,
+                                                                        nullptr,
+                                                                        path);
+                               if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+                                       ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+                                           ", chunk_id = %" PRIu64,
+                                           session_id,
+                                           chunk_id);
+                               }
 
-                       DBG("Failed to set new trace chunk on existing channels, rolling back");
-                       close_ret = lttng_consumer_close_trace_chunk(relayd_id,
-                                                                    session_id,
-                                                                    chunk_id,
-                                                                    chunk_creation_timestamp,
-                                                                    NULL,
-                                                                    path);
-                       if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
-                               ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
-                                   ", chunk_id = %" PRIu64,
-                                   session_id,
-                                   chunk_id);
+                               ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+                               break;
                        }
-
-                       ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
-                       break;
                }
        }
 
@@ -4892,7 +4864,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                                                                     session_id,
                                                                     chunk_id,
                                                                     chunk_creation_timestamp,
-                                                                    NULL,
+                                                                    nullptr,
                                                                     path);
                        if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
                                ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
@@ -4906,7 +4878,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 error:
        /* Release the reference returned by the "publish" operation. */
        lttng_trace_chunk_put(published_chunk);
@@ -4982,30 +4953,32 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * it; it is only kept around to compare it (by address) to the
         * current chunk found in the session's channels.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-               int ret;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (
+                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+                       int ret;
 
-               /*
-                * Only change the channel's chunk to NULL if it still
-                * references the chunk being closed. The channel may
-                * reference a newer channel in the case of a session
-                * rotation. When a session rotation occurs, the "next"
-                * chunk is created before the "current" chunk is closed.
-                */
-               if (channel->trace_chunk != chunk) {
-                       continue;
-               }
-               ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
-               if (ret) {
                        /*
-                        * Attempt to close the chunk on as many channels as
-                        * possible.
+                        * Only change the channel's chunk to NULL if it still
+                        * references the chunk being closed. The channel may
+                        * reference a newer channel in the case of a session
+                        * rotation. When a session rotation occurs, the "next"
+                        * chunk is created before the "current" chunk is closed.
                         */
-                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       if (channel->trace_chunk != chunk) {
+                               continue;
+                       }
+                       ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+                       if (ret) {
+                               /*
+                                * Attempt to close the chunk on as many channels as
+                                * possible.
+                                */
+                               ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       }
                }
        }
-
        if (relayd_id) {
                int ret;
                struct consumer_relayd_sock_pair *relayd;
@@ -5025,7 +4998,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 end:
        /*
         * Release the reference returned by the "find" operation and
@@ -5045,8 +5017,9 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
        const bool is_local_trace = !relayd_id;
-       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_sock_pair *relayd = nullptr;
        bool chunk_exists_local, chunk_exists_remote;
+       lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -5079,7 +5052,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
                goto end;
        }
 
-       rcu_read_lock();
        relayd = consumer_find_relayd(*relayd_id);
        if (!relayd) {
                ERR("Failed to find relayd %" PRIu64, *relayd_id);
@@ -5101,7 +5073,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
 
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret_code;
 }
@@ -5115,7 +5086,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -5138,12 +5109,10 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
        return LTTCOMM_CONSUMERD_SUCCESS;
 
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -5184,56 +5153,56 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum
                goto end;
        }
 
-       rcu_read_lock();
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
-               enum consumer_stream_open_packet_status status;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+                       enum consumer_stream_open_packet_status status;
 
-               pthread_mutex_lock(&stream->lock);
-               if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
-               }
+                       pthread_mutex_lock(&stream->lock);
+                       if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                               goto next;
+                       }
 
-               status = consumer_stream_open_packet(stream);
-               switch (status) {
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel name = %s, session id = %" PRIu64,
-                           stream->key,
-                           stream->chan->name,
-                           stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk = true;
-                       break;
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel name = %s, session id = %" PRIu64,
-                           stream->key,
-                           stream->chan->name,
-                           stream->chan->session_id);
-                       break;
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                       /*
-                        * Only unexpected internal errors can lead to this
-                        * failing. Report an unknown error.
-                        */
-                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel id = %" PRIu64 ", channel name = %s"
-                           ", session id = %" PRIu64,
-                           stream->key,
-                           channel->key,
-                           channel->name,
-                           channel->session_id);
-                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
-                       goto error_unlock;
-               default:
-                       abort();
-               }
+                       status = consumer_stream_open_packet(stream);
+                       switch (status) {
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                               DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel name = %s, session id = %" PRIu64,
+                                   stream->key,
+                                   stream->chan->name,
+                                   stream->chan->session_id);
+                               stream->opened_packet_in_current_trace_chunk = true;
+                               break;
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                               DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel name = %s, session id = %" PRIu64,
+                                   stream->key,
+                                   stream->chan->name,
+                                   stream->chan->session_id);
+                               break;
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                               /*
+                                * Only unexpected internal errors can lead to this
+                                * failing. Report an unknown error.
+                                */
+                               ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel id = %" PRIu64 ", channel name = %s"
+                                   ", session id = %" PRIu64,
+                                   stream->key,
+                                   channel->key,
+                                   channel->name,
+                                   channel->session_id);
+                               ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                               goto error_unlock;
+                       default:
+                               abort();
+                       }
 
-       next:
-               pthread_mutex_unlock(&stream->lock);
+               next:
+                       pthread_mutex_unlock(&stream->lock);
+               }
        }
-
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret;
 
This page took 0.051074 seconds and 4 git commands to generate.