Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / common / consumer / consumer.cpp
index eb7e6375eed4577736fb303a098ab37a6d7c3ece..1da243601cd7ff3c72fc8febb5d6c11f2f150c4b 100644 (file)
@@ -20,6 +20,7 @@
 #include <common/dynamic-array.hpp>
 #include <common/index/ctf-index.hpp>
 #include <common/index/index.hpp>
+#include <common/io-hint.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/time.hpp>
 #include <common/trace-chunk-registry.hpp>
 #include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
 #include <common/ust-consumer/ust-consumer.hpp>
 #include <common/utils.hpp>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -42,6 +45,7 @@
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <type_traits>
 #include <unistd.h>
 
 lttng_consumer_global_data the_consumer_data;
@@ -104,7 +108,8 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
 
        LTTNG_ASSERT(pipe);
 
-       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+                                                                            pointer. */
 }
 
 static void notify_health_quit_pipe(int *pipe)
@@ -173,13 +178,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
 
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
-               /*
-                * Once a stream is added to this list, the buffers were created so we
-                * have a guarantee that this call will succeed. Setting the monitor
-                * mode to 0 so we don't lock nor try to delete the stream from the
-                * global hash table.
-                */
-               stream->monitor = 0;
                consumer_stream_destroy(stream, nullptr);
        }
 }
@@ -201,7 +199,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                return nullptr;
        }
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        lttng_ht_lookup(ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
@@ -209,8 +207,6 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
 
-       rcu_read_unlock();
-
        return stream;
 }
 
@@ -218,7 +214,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        stream = find_stream(key, ht);
        if (stream) {
                stream->key = (uint64_t) -1ULL;
@@ -229,7 +225,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
                 */
                stream->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -272,7 +267,7 @@ static void steal_channel_key(uint64_t key)
 {
        struct lttng_consumer_channel *channel;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(key);
        if (channel) {
                channel->key = (uint64_t) -1ULL;
@@ -283,7 +278,6 @@ static void steal_channel_key(uint64_t key)
                 */
                channel->node.key = (uint64_t) -1ULL;
        }
-       rcu_read_unlock();
 }
 
 static void free_channel_rcu(struct rcu_head *head)
@@ -303,7 +297,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -403,7 +398,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->is_published) {
                int ret;
 
-               rcu_read_lock();
+               lttng::urcu::read_lock_guard read_lock;
                iter.iter.node = &channel->node.node;
                ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
                LTTNG_ASSERT(!ret);
@@ -411,7 +406,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                iter.iter.node = &channel->channels_by_session_id_ht_node.node;
                ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
                LTTNG_ASSERT(!ret);
-               rcu_read_unlock();
        }
 
        channel->is_deleted = true;
@@ -430,14 +424,15 @@ static void cleanup_relayd_ht()
        struct lttng_ht_iter iter;
        struct consumer_relayd_sock_pair *relayd;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
-               consumer_destroy_relayd(relayd);
+               cds_lfht_for_each_entry (
+                       the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+                       consumer_destroy_relayd(relayd);
+               }
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.relayd_ht);
 }
 
@@ -456,12 +451,14 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
 
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Let's begin with metadata */
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
@@ -473,7 +470,6 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
                }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -580,7 +576,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
@@ -613,7 +609,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        the_consumer_data.stream_count++;
        the_consumer_data.need_update = 1;
 
-       rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
@@ -719,7 +714,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
        LTTNG_ASSERT(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -756,7 +751,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
            stream->net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -773,7 +767,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        LTTNG_ASSERT(net_seq_idx != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -796,7 +790,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -808,12 +801,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -1022,9 +1014,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == nullptr) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1038,8 +1032,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
        channel->is_live = is_in_live_session;
-       pthread_mutex_init(&channel->lock, nullptr);
-       pthread_mutex_init(&channel->timer_lock, nullptr);
+       pthread_mutex_init(&channel->lock, NULL);
+       pthread_mutex_init(&channel->timer_lock, NULL);
 
        switch (output) {
        case LTTNG_EVENT_SPLICE:
@@ -1050,7 +1044,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
+               delete channel;
                channel = nullptr;
                goto end;
        }
@@ -1126,11 +1120,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
         */
        steal_channel_key(channel->key);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
        lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
                         &channel->channels_by_session_id_ht_node);
-       rcu_read_unlock();
        channel->is_published = true;
 
        pthread_mutex_unlock(&channel->timer_lock);
@@ -1168,35 +1161,34 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 
        DBG("Updating poll fd array");
        *nb_inactive_fd = 0;
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Only active streams with an active end point can be added to the
-                * poll set and local stream storage of the thread.
-                *
-                * There is a potential race here for endpoint_status to be updated
-                * just after the check. However, this is OK since the stream(s) will
-                * be deleted once the thread is notified that the end point state has
-                * changed where this function will be called back again.
-                *
-                * We track the number of inactive FDs because they still need to be
-                * closed by the polling thread after a wakeup on the data_pipe or
-                * metadata_pipe.
-                */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
-                       (*nb_inactive_fd)++;
-                       continue;
+
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Only active streams with an active end point can be added to the
+                        * poll set and local stream storage of the thread.
+                        *
+                        * There is a potential race here for endpoint_status to be updated
+                        * just after the check. However, this is OK since the stream(s) will
+                        * be deleted once the thread is notified that the end point state has
+                        * changed where this function will be called back again.
+                        *
+                        * We track the number of inactive FDs because they still need to be
+                        * closed by the polling thread after a wakeup on the data_pipe or
+                        * metadata_pipe.
+                        */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                               (*nb_inactive_fd)++;
+                               continue;
+                       }
+
+                       (*pollfd)[i].fd = stream->wait_fd;
+                       (*pollfd)[i].events = POLLIN | POLLPRI;
+                       local_stream[i] = stream;
+                       i++;
                }
-               /*
-                * This clobbers way too much the debug output. Uncomment that if you
-                * need it for debugging purposes.
-                */
-               (*pollfd)[i].fd = stream->wait_fd;
-               (*pollfd)[i].events = POLLIN | POLLPRI;
-               local_stream[i] = stream;
-               i++;
        }
-       rcu_read_unlock();
 
        /*
         * Insert the consumer_data_pipe at the end of the array and don't
@@ -1257,11 +1249,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx,
  * Send return code to the session daemon.
  * If the socket is not defined, we return 0, it is not a fatal error
  */
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+                             enum lttcomm_return_code error_code)
 {
        if (ctx->consumer_error_socket > 0) {
+               const std::int32_t comm_code = std::int32_t(error_code);
+
+               static_assert(
+                       sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+                       "Fixed-size communication type too small to accomodate lttcomm_return_code");
                return lttcomm_send_unix_sock(
-                       ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+                       ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
        }
 
        return 0;
@@ -1277,14 +1275,15 @@ void lttng_consumer_cleanup()
        struct lttng_consumer_channel *channel;
        unsigned int trace_chunks_left;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-               consumer_del_channel(channel);
+               cds_lfht_for_each_entry (
+                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+                       consumer_del_channel(channel);
+               }
        }
 
-       rcu_read_unlock();
-
        lttng_ht_destroy(the_consumer_data.channel_ht);
        lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
 
@@ -1344,7 +1343,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
  */
 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
 {
-       int ret;
        int outfd = stream->out_fd;
 
        /*
@@ -1356,31 +1354,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
        if (orig_offset < stream->max_sb_size) {
                return;
        }
-       lttng_sync_file_range(outfd,
-                             orig_offset - stream->max_sb_size,
-                             stream->max_sb_size,
-                             SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
-                                     SYNC_FILE_RANGE_WAIT_AFTER);
-       /*
-        * Give hints to the kernel about how we access the file:
-        * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-        * we write it.
-        *
-        * We need to call fadvise again after the file grows because the
-        * kernel does not seem to apply fadvise to non-existing parts of the
-        * file.
-        *
-        * Call fadvise _after_ having waited for the page writeback to
-        * complete because the dirty page writeback semantic is not well
-        * defined. So it can be expected to lead to lower throughput in
-        * streaming.
-        */
-       ret = posix_fadvise(
-               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
-       if (ret && ret != -ENOSYS) {
-               errno = ret;
-               PERROR("posix_fadvise on fd %i", outfd);
-       }
+       lttng::io::hint_flush_range_dont_need_sync(
+               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
 }
 
 /*
@@ -1485,15 +1460,16 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_stream(stream, ht);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Ignore return value since we are currently cleaning up so any error
+                        * can't be handled.
+                        */
+                       (void) consumer_del_stream(stream, ht);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1511,15 +1487,16 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_metadata_stream(stream, ht);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+                       /*
+                        * Ignore return value since we are currently cleaning up so any error
+                        * can't be handled.
+                        */
+                       (void) consumer_del_metadata_stream(stream, ht);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1621,7 +1598,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        size_t write_len;
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
@@ -1739,8 +1716,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(
-                       outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+               lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
                stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
@@ -1761,7 +1737,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1800,7 +1775,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        }
 
        /* RCU lock for the relayd pointer */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
@@ -1940,8 +1915,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
                        /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(
-                               outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+                       lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
                        stream->out_fd_offset += ret_splice;
                }
                stream->output_written += ret_splice;
@@ -1983,7 +1957,6 @@ end:
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
-       rcu_read_unlock();
        return written;
 }
 
@@ -2162,6 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
@@ -2204,7 +2178,7 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         * after this point.
         */
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        /*
         * Lookup the stream just to make sure it does not exist in our internal
@@ -2238,8 +2212,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         */
        lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
 
-       rcu_read_unlock();
-
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
@@ -2256,16 +2228,18 @@ static void validate_endpoint_status_data_stream()
 
        DBG("Consumer delete flagged data stream");
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
-               /* Validate delete flag of the stream */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                       continue;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+                       /* Validate delete flag of the stream */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                               continue;
+                       }
+                       /* Delete it right now */
+                       consumer_del_stream(stream, data_ht);
                }
-               /* Delete it right now */
-               consumer_del_stream(stream, data_ht);
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2280,22 +2254,23 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po
 
        LTTNG_ASSERT(pollset);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
-               /* Validate delete flag of the stream */
-               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                       continue;
-               }
-               /*
-                * Remove from pollset so the metadata thread can continue without
-                * blocking on a deleted stream.
-                */
-               lttng_poll_del(pollset, stream->wait_fd);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+                       /* Validate delete flag of the stream */
+                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                               continue;
+                       }
+                       /*
+                        * Remove from pollset so the metadata thread can continue without
+                        * blocking on a deleted stream.
+                        */
+                       lttng_poll_del(pollset, stream->wait_fd);
 
-               /* Delete it right now */
-               consumer_del_metadata_stream(stream, metadata_ht);
+                       /* Delete it right now */
+                       consumer_del_metadata_stream(stream, metadata_ht);
+               }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -2375,8 +2350,11 @@ void *consumer_thread_metadata_poll(void *data)
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                                   &stream,
-                                                                  sizeof(stream));
-                                       if (pipe_len < sizeof(stream)) {
+                                                                  sizeof(stream)); /* NOLINT sizeof
+                                                                                      used on a
+                                                                                      pointer. */
+                                       if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+                                                                           pointer. */
                                                if (pipe_len < 0) {
                                                        PERROR("read metadata stream");
                                                }
@@ -2427,7 +2405,7 @@ void *consumer_thread_metadata_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
@@ -2491,11 +2469,9 @@ void *consumer_thread_metadata_poll(void *data)
                                consumer_del_metadata_stream(stream, metadata_ht);
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
                        /* Release RCU lock for the stream looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -2522,7 +2498,7 @@ error_testpoint:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i, err = -1;
+       int num_rdy, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = nullptr;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
@@ -2555,7 +2531,6 @@ void *consumer_thread_data_poll(void *data)
                health_code_update();
 
                high_prio = 0;
-               num_hup = 0;
 
                /*
                 * the fds set has been updated, we need to update our
@@ -2643,9 +2618,12 @@ void *consumer_thread_data_poll(void *data)
                        ssize_t pipe_readlen;
 
                        DBG("consumer_data_pipe wake up");
-                       pipe_readlen = lttng_pipe_read(
-                               ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < sizeof(new_stream)) {
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+                                                      &new_stream,
+                                                      sizeof(new_stream)); /* NOLINT sizeof used on
+                                                                              a pointer. */
+                       if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+                                                                 */
                                PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
@@ -2768,21 +2746,18 @@ void *consumer_thread_data_poll(void *data)
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = nullptr;
-                                       num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = nullptr;
-                                       num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = nullptr;
-                                       num_hup++;
                                }
                        }
                        if (local_stream[i] != nullptr) {
@@ -2831,7 +2806,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -2871,7 +2846,6 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
 }
 
 static void destroy_channel_ht(struct lttng_ht *ht)
@@ -2884,12 +2858,14 @@ static void destroy_channel_ht(struct lttng_ht *ht)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
-               ret = lttng_ht_del(ht, &iter);
-               LTTNG_ASSERT(ret != 0);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
+                       ret = lttng_ht_del(ht, &iter);
+                       LTTNG_ASSERT(ret != 0);
+               }
        }
-       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -2991,19 +2967,20 @@ void *consumer_thread_channel_poll(void *data)
 
                                        switch (action) {
                                        case CONSUMER_CHANNEL_ADD:
+                                       {
                                                DBG("Adding channel %d to poll set", chan->wait_fd);
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                                       chan->wait_fd);
-                                               rcu_read_lock();
+                                               lttng::urcu::read_lock_guard read_lock;
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                        &chan->wait_fd_node);
-                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                // FIXME: Empty flag on a pipe pollset, this might
                                                // hang on FreeBSD.
                                                lttng_poll_add(&events, chan->wait_fd, 0);
                                                break;
+                                       }
                                        case CONSUMER_CHANNEL_DEL:
                                        {
                                                /*
@@ -3016,10 +2993,9 @@ void *consumer_thread_channel_poll(void *data)
                                                 * GET_CHANNEL failed.
                                                 */
 
-                                               rcu_read_lock();
+                                               lttng::urcu::read_lock_guard read_lock;
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
-                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64
                                                            " not found for del channel",
                                                            key);
@@ -3052,7 +3028,6 @@ void *consumer_thread_channel_poll(void *data)
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
-                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -3086,7 +3061,7 @@ void *consumer_thread_channel_poll(void *data)
                                continue;
                        }
 
-                       rcu_read_lock();
+                       lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
@@ -3119,12 +3094,10 @@ void *consumer_thread_channel_poll(void *data)
                                }
                        } else {
                                ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                               rcu_read_unlock();
                                goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
-                       rcu_read_unlock();
                }
        }
 
@@ -3766,7 +3739,7 @@ int consumer_data_pending(uint64_t id)
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
@@ -3880,13 +3853,11 @@ int consumer_data_pending(uint64_t id)
 data_not_pending:
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 0;
 
 data_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&the_consumer_data.lock);
-       rcu_read_unlock();
        return 1;
 }
 
@@ -4013,7 +3984,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                 nullptr);
        lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
@@ -4365,7 +4336,6 @@ end_unlock_stream:
 end_unlock_channel:
        pthread_mutex_unlock(&channel->lock);
 end:
-       rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
        lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
@@ -4452,7 +4422,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
        int ret;
        struct lttng_consumer_stream *stream;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&channel->lock);
        cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                health_code_update();
@@ -4464,13 +4434,11 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
                pthread_mutex_unlock(&stream->lock);
        }
        pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
        return 0;
 
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&channel->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4666,7 +4634,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
 
        ASSERT_RCU_READ_LOCKED();
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
@@ -4701,7 +4669,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
        ret = 0;
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -4830,46 +4797,50 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                goto error;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(
-               the_consumer_data.channels_by_session_id_ht->ht,
-               the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
-               the_consumer_data.channels_by_session_id_ht->match_fct,
-               &session_id,
-               &iter.iter,
-               channel,
-               channels_by_session_id_ht_node.node)
        {
-               ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
-               if (ret) {
-                       /*
-                        * Roll-back the creation of this chunk.
-                        *
-                        * This is important since the session daemon will
-                        * assume that the creation of this chunk failed and
-                        * will never ask for it to be closed, resulting
-                        * in a leak and an inconsistent state for some
-                        * channels.
-                        */
-                       enum lttcomm_return_code close_ret;
-                       char path[LTTNG_PATH_MAX];
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry_duplicate(
+                       the_consumer_data.channels_by_session_id_ht->ht,
+                       the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
+                                                                             lttng_ht_seed),
+                       the_consumer_data.channels_by_session_id_ht->match_fct,
+                       &session_id,
+                       &iter.iter,
+                       channel,
+                       channels_by_session_id_ht_node.node)
+               {
+                       ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+                       if (ret) {
+                               /*
+                                * Roll-back the creation of this chunk.
+                                *
+                                * This is important since the session daemon will
+                                * assume that the creation of this chunk failed and
+                                * will never ask for it to be closed, resulting
+                                * in a leak and an inconsistent state for some
+                                * channels.
+                                */
+                               enum lttcomm_return_code close_ret;
+                               char path[LTTNG_PATH_MAX];
+
+                               DBG("Failed to set new trace chunk on existing channels, rolling back");
+                               close_ret =
+                                       lttng_consumer_close_trace_chunk(relayd_id,
+                                                                        session_id,
+                                                                        chunk_id,
+                                                                        chunk_creation_timestamp,
+                                                                        nullptr,
+                                                                        path);
+                               if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+                                       ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+                                           ", chunk_id = %" PRIu64,
+                                           session_id,
+                                           chunk_id);
+                               }
 
-                       DBG("Failed to set new trace chunk on existing channels, rolling back");
-                       close_ret = lttng_consumer_close_trace_chunk(relayd_id,
-                                                                    session_id,
-                                                                    chunk_id,
-                                                                    chunk_creation_timestamp,
-                                                                    nullptr,
-                                                                    path);
-                       if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
-                               ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
-                                   ", chunk_id = %" PRIu64,
-                                   session_id,
-                                   chunk_id);
+                               ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+                               break;
                        }
-
-                       ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
-                       break;
                }
        }
 
@@ -4907,7 +4878,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 error:
        /* Release the reference returned by the "publish" operation. */
        lttng_trace_chunk_put(published_chunk);
@@ -4983,30 +4953,32 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * it; it is only kept around to compare it (by address) to the
         * current chunk found in the session's channels.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-               int ret;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry (
+                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+                       int ret;
 
-               /*
-                * Only change the channel's chunk to NULL if it still
-                * references the chunk being closed. The channel may
-                * reference a newer channel in the case of a session
-                * rotation. When a session rotation occurs, the "next"
-                * chunk is created before the "current" chunk is closed.
-                */
-               if (channel->trace_chunk != chunk) {
-                       continue;
-               }
-               ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
-               if (ret) {
                        /*
-                        * Attempt to close the chunk on as many channels as
-                        * possible.
+                        * Only change the channel's chunk to NULL if it still
+                        * references the chunk being closed. The channel may
+                        * reference a newer channel in the case of a session
+                        * rotation. When a session rotation occurs, the "next"
+                        * chunk is created before the "current" chunk is closed.
                         */
-                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       if (channel->trace_chunk != chunk) {
+                               continue;
+                       }
+                       ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+                       if (ret) {
+                               /*
+                                * Attempt to close the chunk on as many channels as
+                                * possible.
+                                */
+                               ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       }
                }
        }
-
        if (relayd_id) {
                int ret;
                struct consumer_relayd_sock_pair *relayd;
@@ -5026,7 +4998,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
                }
        }
 error_unlock:
-       rcu_read_unlock();
 end:
        /*
         * Release the reference returned by the "find" operation and
@@ -5048,6 +5019,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        const bool is_local_trace = !relayd_id;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        bool chunk_exists_local, chunk_exists_remote;
+       lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -5080,7 +5052,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
                goto end;
        }
 
-       rcu_read_lock();
        relayd = consumer_find_relayd(*relayd_id);
        if (!relayd) {
                ERR("Failed to find relayd %" PRIu64, *relayd_id);
@@ -5102,7 +5073,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
 
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret_code;
 }
@@ -5116,7 +5086,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -5139,12 +5109,10 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
        next:
                pthread_mutex_unlock(&stream->lock);
        }
-       rcu_read_unlock();
        return LTTCOMM_CONSUMERD_SUCCESS;
 
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -5185,56 +5153,56 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum
                goto end;
        }
 
-       rcu_read_lock();
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
-               enum consumer_stream_open_packet_status status;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+                       enum consumer_stream_open_packet_status status;
 
-               pthread_mutex_lock(&stream->lock);
-               if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
-               }
+                       pthread_mutex_lock(&stream->lock);
+                       if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                               goto next;
+                       }
 
-               status = consumer_stream_open_packet(stream);
-               switch (status) {
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel name = %s, session id = %" PRIu64,
-                           stream->key,
-                           stream->chan->name,
-                           stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk = true;
-                       break;
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel name = %s, session id = %" PRIu64,
-                           stream->key,
-                           stream->chan->name,
-                           stream->chan->session_id);
-                       break;
-               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                       /*
-                        * Only unexpected internal errors can lead to this
-                        * failing. Report an unknown error.
-                        */
-                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
-                           ", channel id = %" PRIu64 ", channel name = %s"
-                           ", session id = %" PRIu64,
-                           stream->key,
-                           channel->key,
-                           channel->name,
-                           channel->session_id);
-                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
-                       goto error_unlock;
-               default:
-                       abort();
-               }
+                       status = consumer_stream_open_packet(stream);
+                       switch (status) {
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                               DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel name = %s, session id = %" PRIu64,
+                                   stream->key,
+                                   stream->chan->name,
+                                   stream->chan->session_id);
+                               stream->opened_packet_in_current_trace_chunk = true;
+                               break;
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                               DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel name = %s, session id = %" PRIu64,
+                                   stream->key,
+                                   stream->chan->name,
+                                   stream->chan->session_id);
+                               break;
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                               /*
+                                * Only unexpected internal errors can lead to this
+                                * failing. Report an unknown error.
+                                */
+                               ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                   ", channel id = %" PRIu64 ", channel name = %s"
+                                   ", session id = %" PRIu64,
+                                   stream->key,
+                                   channel->key,
+                                   channel->name,
+                                   channel->session_id);
+                               ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                               goto error_unlock;
+                       default:
+                               abort();
+                       }
 
-       next:
-               pthread_mutex_unlock(&stream->lock);
+               next:
+                       pthread_mutex_unlock(&stream->lock);
+               }
        }
-
 end_rcu_unlock:
-       rcu_read_unlock();
 end:
        return ret;
 
This page took 0.041329 seconds and 4 git commands to generate.