Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
index 5339553cb5fd7fb7b54d61f8b74814f21e1faf6c..eca03b8203d91c45e4c83e1383fae5fa973f3442 100644 (file)
 
 #include <common/common.hpp>
 #include <common/compat/endian.hpp>
 
 #include <common/common.hpp>
 #include <common/compat/endian.hpp>
-#include <common/compat/fcntl.hpp>
 #include <common/consumer/consumer-metadata-cache.hpp>
 #include <common/consumer/consumer-stream.hpp>
 #include <common/consumer/consumer-timer.hpp>
 #include <common/consumer/consumer.hpp>
 #include <common/index/index.hpp>
 #include <common/optional.hpp>
 #include <common/consumer/consumer-metadata-cache.hpp>
 #include <common/consumer/consumer-stream.hpp>
 #include <common/consumer/consumer-timer.hpp>
 #include <common/consumer/consumer.hpp>
 #include <common/index/index.hpp>
 #include <common/optional.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/shm.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/shm.hpp>
+#include <common/urcu.hpp>
 #include <common/utils.hpp>
 
 #include <lttng/ust-ctl.h>
 #include <lttng/ust-sigbus.h>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
 #include <common/utils.hpp>
 
 #include <lttng/ust-ctl.h>
 #include <lttng/ust-sigbus.h>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -61,6 +64,7 @@ static int add_channel(struct lttng_consumer_channel *channel,
        int ret = 0;
 
        LTTNG_ASSERT(channel);
        int ret = 0;
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(ctx);
 
        if (ctx->on_recv_channel != nullptr) {
        LTTNG_ASSERT(ctx);
 
        if (ctx->on_recv_channel != nullptr) {
@@ -98,6 +102,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu,
        struct lttng_consumer_stream *stream = nullptr;
 
        LTTNG_ASSERT(channel);
        struct lttng_consumer_stream *stream = nullptr;
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(ctx);
 
        stream = consumer_stream_create(channel,
        LTTNG_ASSERT(ctx);
 
        stream = consumer_stream_create(channel,
@@ -167,7 +172,8 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
        stream->globally_visible = 1;
        cds_list_del_init(&stream->send_node);
 
        stream->globally_visible = 1;
        cds_list_del_init(&stream->send_node);
 
-       ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
+       ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a
+                                                                        pointer. */
        if (ret < 0) {
                ERR("Consumer write %s stream to pipe %d",
                    stream->metadata_flag ? "metadata" : "data",
        if (ret < 0) {
                ERR("Consumer write %s stream to pipe %d",
                    stream->metadata_flag ? "metadata" : "data",
@@ -217,6 +223,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
        pthread_mutex_t *current_stream_lock = nullptr;
 
        LTTNG_ASSERT(channel);
        pthread_mutex_t *current_stream_lock = nullptr;
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(ctx);
 
        /*
        LTTNG_ASSERT(ctx);
 
        /*
@@ -354,6 +361,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel,
        struct lttng_ust_ctl_consumer_channel *ust_channel;
 
        LTTNG_ASSERT(channel);
        struct lttng_ust_ctl_consumer_channel *ust_channel;
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(attr);
        LTTNG_ASSERT(ust_chanp);
        LTTNG_ASSERT(channel->buffer_credentials.is_set);
        LTTNG_ASSERT(attr);
        LTTNG_ASSERT(ust_chanp);
        LTTNG_ASSERT(channel->buffer_credentials.is_set);
@@ -472,17 +480,20 @@ static int send_channel_to_sessiond_and_relayd(int sock,
                                               int *relayd_error)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
                                               int *relayd_error)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-       struct lttng_consumer_stream *stream;
        uint64_t net_seq_idx = -1ULL;
 
        LTTNG_ASSERT(channel);
        uint64_t net_seq_idx = -1ULL;
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(sock >= 0);
 
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
        if (channel->relayd_id != (uint64_t) -1ULL) {
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(sock >= 0);
 
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
        if (channel->relayd_id != (uint64_t) -1ULL) {
-               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+               for (auto stream :
+                    lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                        &lttng_consumer_stream::send_node>(
+                            channel->streams.head)) {
                        health_code_update();
 
                        /* Try to send the stream to the relayd if one is available. */
                        health_code_update();
 
                        /* Try to send the stream to the relayd if one is available. */
@@ -528,7 +539,9 @@ static int send_channel_to_sessiond_and_relayd(int sock,
        }
 
        /* The channel was sent successfully to the sessiond at this point. */
        }
 
        /* The channel was sent successfully to the sessiond at this point. */
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
 
                /* Send stream to session daemon. */
                health_code_update();
 
                /* Send stream to session daemon. */
@@ -571,6 +584,7 @@ static int ask_channel(struct lttng_consumer_local_data *ctx,
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(channel);
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(attr);
 
        /*
        LTTNG_ASSERT(attr);
 
        /*
@@ -626,13 +640,15 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
                                  struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
                                  struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
-       struct lttng_consumer_stream *stream, *stmp;
 
        LTTNG_ASSERT(channel);
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(ctx);
 
        /* Send streams to the corresponding thread. */
        LTTNG_ASSERT(ctx);
 
        /* Send streams to the corresponding thread. */
-       cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
 
                /* Sending the stream to the thread. */
                health_code_update();
 
                /* Sending the stream to the thread. */
@@ -659,13 +675,11 @@ static int flush_channel(uint64_t chan_key)
 {
        int ret = 0;
        struct lttng_consumer_channel *channel;
 {
        int ret = 0;
        struct lttng_consumer_channel *channel;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
-       struct lttng_ht_iter iter;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        DBG("UST consumer flush channel key %" PRIu64, chan_key);
 
 
        DBG("UST consumer flush channel key %" PRIu64, chan_key);
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(chan_key);
        if (!channel) {
                ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
        channel = consumer_find_channel(chan_key);
        if (!channel) {
                ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
@@ -673,17 +687,15 @@ static int flush_channel(uint64_t chan_key)
                goto error;
        }
 
                goto error;
        }
 
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
        /* For each stream of the channel id, flush it. */
        /* For each stream of the channel id, flush it. */
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
@@ -720,7 +732,6 @@ static int flush_channel(uint64_t chan_key)
         */
        sample_and_send_channel_buffer_stats(channel);
 error:
         */
        sample_and_send_channel_buffer_stats(channel);
 error:
-       rcu_read_unlock();
        return ret;
 }
 
        return ret;
 }
 
@@ -732,42 +743,33 @@ error:
  */
 static int clear_quiescent_channel(uint64_t chan_key)
 {
  */
 static int clear_quiescent_channel(uint64_t chan_key)
 {
-       int ret = 0;
-       struct lttng_consumer_channel *channel;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
-       struct lttng_ht_iter iter;
-
        DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
 
        DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
 
-       rcu_read_lock();
-       channel = consumer_find_channel(chan_key);
+       const lttng::urcu::read_lock_guard read_lock;
+       auto channel = consumer_find_channel(chan_key);
        if (!channel) {
                ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
        if (!channel) {
                ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
-               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-               goto error;
+               return LTTNG_ERR_UST_CHAN_NOT_FOUND;
        }
 
        }
 
-       ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        /* For each stream of the channel id, clear quiescent state. */
 
        /* For each stream of the channel id, clear quiescent state. */
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
                health_code_update();
 
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                stream->quiescent = false;
                stream->quiescent = false;
-               pthread_mutex_unlock(&stream->lock);
        }
        }
-error:
-       rcu_read_unlock();
-       return ret;
+
+       return 0;
 }
 
 /*
 }
 
 /*
@@ -930,6 +932,8 @@ error:
         */
        consumer_stream_destroy(metadata->metadata_stream, nullptr);
        metadata->metadata_stream = nullptr;
         */
        consumer_stream_destroy(metadata->metadata_stream, nullptr);
        metadata->metadata_stream = nullptr;
+       metadata->metadata_pushed_wait_queue.wake_all();
+
 send_streams_error:
 error_no_stream:
 end:
 send_streams_error:
 error_no_stream:
 end:
@@ -957,7 +961,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
 
        DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
 
 
        DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        LTTNG_ASSERT(!metadata_channel->monitor);
 
 
        LTTNG_ASSERT(!metadata_channel->monitor);
 
@@ -967,7 +971,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
        if (ret < 0) {
                goto error;
        }
        if (ret < 0) {
                goto error;
        }
@@ -1013,9 +1017,9 @@ error_stream:
         */
        consumer_stream_destroy(metadata_stream, nullptr);
        metadata_channel->metadata_stream = nullptr;
         */
        consumer_stream_destroy(metadata_stream, nullptr);
        metadata_channel->metadata_stream = nullptr;
+       metadata_channel->metadata_pushed_wait_queue.wake_all();
 
 error:
 
 error:
-       rcu_read_unlock();
        return ret;
 }
 
        return ret;
 }
 
@@ -1060,13 +1064,12 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
        int ret;
        unsigned use_relayd = 0;
        unsigned long consumed_pos, produced_pos;
        int ret;
        unsigned use_relayd = 0;
        unsigned long consumed_pos, produced_pos;
-       struct lttng_consumer_stream *stream;
 
        LTTNG_ASSERT(path);
        LTTNG_ASSERT(ctx);
        ASSERT_RCU_READ_LOCKED();
 
 
        LTTNG_ASSERT(path);
        LTTNG_ASSERT(ctx);
        ASSERT_RCU_READ_LOCKED();
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id != (uint64_t) -1ULL) {
                use_relayd = 1;
 
        if (relayd_id != (uint64_t) -1ULL) {
                use_relayd = 1;
@@ -1075,11 +1078,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(!channel->monitor);
        DBG("UST consumer snapshot channel %" PRIu64, key);
 
        LTTNG_ASSERT(!channel->monitor);
        DBG("UST consumer snapshot channel %" PRIu64, key);
 
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
 
                /* Lock stream because we are about to change its state. */
                health_code_update();
 
                /* Lock stream because we are about to change its state. */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                LTTNG_ASSERT(channel->trace_chunk);
                if (!lttng_trace_chunk_get(channel->trace_chunk)) {
                        /*
                LTTNG_ASSERT(channel->trace_chunk);
                if (!lttng_trace_chunk_get(channel->trace_chunk)) {
                        /*
@@ -1087,24 +1092,28 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                         * holds a reference to the trace chunk.
                         */
                        ERR("Failed to acquire reference to channel's trace chunk");
                         * holds a reference to the trace chunk.
                         */
                        ERR("Failed to acquire reference to channel's trace chunk");
-                       ret = -1;
-                       goto error_unlock;
+                       return -1;
                }
                LTTNG_ASSERT(!stream->trace_chunk);
                stream->trace_chunk = channel->trace_chunk;
 
                stream->net_seq_idx = relayd_id;
 
                }
                LTTNG_ASSERT(!stream->trace_chunk);
                stream->trace_chunk = channel->trace_chunk;
 
                stream->net_seq_idx = relayd_id;
 
+               /* Close stream output when were are done. */
+               const auto close_stream_output = lttng::make_scope_exit(
+                       [stream]() noexcept { consumer_stream_close_output(stream); });
+
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
-                               goto error_close_stream;
+                               return ret;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
-                               goto error_close_stream;
+                               return ret;
                        }
                        }
+
                        DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
 
                        DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
 
@@ -1119,26 +1128,26 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                                    ", channel name = '%s'",
                                    channel->key,
                                    channel->name);
                                    ", channel name = '%s'",
                                    channel->key,
                                    channel->name);
-                               goto error_unlock;
+                               return ret;
                        }
                }
 
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking UST snapshot");
                        }
                }
 
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking UST snapshot");
-                       goto error_close_stream;
+                       return ret;
                }
 
                ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced UST snapshot position");
                }
 
                ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced UST snapshot position");
-                       goto error_close_stream;
+                       return ret;
                }
 
                ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd UST snapshot position");
                }
 
                ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd UST snapshot position");
-                       goto error_close_stream;
+                       return ret;
                }
 
                /*
                }
 
                /*
@@ -1164,29 +1173,37 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("lttng_ust_ctl_get_subbuf snapshot");
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("lttng_ust_ctl_get_subbuf snapshot");
-                                       goto error_close_stream;
+                                       return ret;
                                }
                                }
+
                                DBG("UST consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
                                stream->chan->lost_packets++;
                                continue;
                        }
 
                                DBG("UST consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
                                stream->chan->lost_packets++;
                                continue;
                        }
 
+                       /* Put the subbuffer once we are done. */
+                       const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+                               if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
+                                       ERR("Snapshot lttng_ust_ctl_put_subbuf");
+                               }
+                       });
+
                        ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
                        if (ret < 0) {
                                ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
                        ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
                        if (ret < 0) {
                                ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
-                               goto error_put_subbuf;
+                               return ret;
                        }
 
                        ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
                        }
 
                        ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
-                               goto error_put_subbuf;
+                               return ret;
                        }
 
                        ret = get_current_subbuf_addr(stream, &subbuf_addr);
                        if (ret) {
                        }
 
                        ret = get_current_subbuf_addr(stream, &subbuf_addr);
                        if (ret) {
-                               goto error_put_subbuf;
+                               return ret;
                        }
 
                        subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
                        }
 
                        subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
@@ -1194,42 +1211,22 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                                stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
                                stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
-                                       ret = -EPERM;
-                                       goto error_put_subbuf;
+                                       return -EPERM;
                                }
                        } else {
                                if (read_len != padded_len) {
                                }
                        } else {
                                if (read_len != padded_len) {
-                                       ret = -EPERM;
-                                       goto error_put_subbuf;
+                                       return -EPERM;
                                }
                        }
 
                                }
                        }
 
-                       ret = lttng_ust_ctl_put_subbuf(stream->ustream);
-                       if (ret < 0) {
-                               ERR("Snapshot lttng_ust_ctl_put_subbuf");
-                               goto error_close_stream;
-                       }
                        consumed_pos += stream->max_sb_size;
                }
 
                /* Simply close the stream so we can use it on the next snapshot. */
                consumer_stream_close_output(stream);
                        consumed_pos += stream->max_sb_size;
                }
 
                /* Simply close the stream so we can use it on the next snapshot. */
                consumer_stream_close_output(stream);
-               pthread_mutex_unlock(&stream->lock);
        }
 
        }
 
-       rcu_read_unlock();
        return 0;
        return 0;
-
-error_put_subbuf:
-       if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
-               ERR("Snapshot lttng_ust_ctl_put_subbuf");
-       }
-error_close_stream:
-       consumer_stream_close_output(stream);
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       rcu_read_unlock();
-       return ret;
 }
 
 static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
 }
 
 static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
@@ -1254,7 +1251,7 @@ int lttng_ustconsumer_recv_metadata(int sock,
                                    uint64_t len,
                                    uint64_t version,
                                    struct lttng_consumer_channel *channel,
                                    uint64_t len,
                                    uint64_t version,
                                    struct lttng_consumer_channel *channel,
-                                   int timer,
+                                   bool invoked_by_timer,
                                    int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
                                    int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
@@ -1342,13 +1339,8 @@ int lttng_ustconsumer_recv_metadata(int sock,
        if (!wait) {
                goto end_free;
        }
        if (!wait) {
                goto end_free;
        }
-       while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
-               DBG("Waiting for metadata to be flushed");
-
-               health_code_update();
 
 
-               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
-       }
+       consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
 
 end_free:
        free(metadata_str);
 
 end_free:
        free(metadata_str);
@@ -1400,14 +1392,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        /* relayd needs RCU read-side lock */
        health_code_update();
 
        /* relayd needs RCU read-side lock */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
 
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
-               uint32_t major = msg.u.relayd_sock.major;
-               uint32_t minor = msg.u.relayd_sock.minor;
-               enum lttcomm_sock_proto protocol =
+               const uint32_t major = msg.u.relayd_sock.major;
+               const uint32_t minor = msg.u.relayd_sock.minor;
+               const lttcomm_sock_proto protocol =
                        (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
 
                /* Session daemon status message are handled in the following call. */
                        (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
 
                /* Session daemon status message are handled in the following call. */
@@ -1425,7 +1417,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
        {
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
        {
-               uint64_t index = msg.u.destroy_relayd.net_seq_idx;
+               const uint64_t index = msg.u.destroy_relayd.net_seq_idx;
                struct consumer_relayd_sock_pair *relayd;
 
                DBG("UST consumer destroying relayd %" PRIu64, index);
                struct consumer_relayd_sock_pair *relayd;
 
                DBG("UST consumer destroying relayd %" PRIu64, index);
@@ -1455,14 +1447,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
-               rcu_read_unlock();
                return -ENOSYS;
        }
        case LTTNG_CONSUMER_DATA_PENDING:
        {
                int is_data_pending;
                ssize_t ret_send;
                return -ENOSYS;
        }
        case LTTNG_CONSUMER_DATA_PENDING:
        {
                int is_data_pending;
                ssize_t ret_send;
-               uint64_t id = msg.u.data_pending.session_id;
+               const uint64_t id = msg.u.data_pending.session_id;
 
                DBG("UST consumer data pending command for id %" PRIu64, id);
 
 
                DBG("UST consumer data pending command for id %" PRIu64, id);
 
@@ -1635,7 +1626,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_GET_CHANNEL:
        {
                int ret, relayd_err = 0;
        case LTTNG_CONSUMER_GET_CHANNEL:
        {
                int ret, relayd_err = 0;
-               uint64_t key = msg.u.get_channel.key;
+               const uint64_t key = msg.u.get_channel.key;
                struct lttng_consumer_channel *found_channel;
 
                found_channel = consumer_find_channel(key);
                struct lttng_consumer_channel *found_channel;
 
                found_channel = consumer_find_channel(key);
@@ -1695,7 +1686,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
-               uint64_t key = msg.u.destroy_channel.key;
+               const uint64_t key = msg.u.destroy_channel.key;
 
                /*
                 * Only called if streams have not been sent to stream
 
                /*
                 * Only called if streams have not been sent to stream
@@ -1741,10 +1732,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
-               uint64_t len = msg.u.push_metadata.len;
-               uint64_t key = msg.u.push_metadata.key;
-               uint64_t offset = msg.u.push_metadata.target_offset;
-               uint64_t version = msg.u.push_metadata.version;
+               const uint64_t len = msg.u.push_metadata.len;
+               const uint64_t key = msg.u.push_metadata.key;
+               const uint64_t offset = msg.u.push_metadata.target_offset;
+               const uint64_t version = msg.u.push_metadata.version;
                struct lttng_consumer_channel *found_channel;
 
                DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
                struct lttng_consumer_channel *found_channel;
 
                DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
@@ -1794,7 +1785,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(
-                       sock, key, offset, len, version, found_channel, 0, 1);
+                       sock, key, offset, len, version, found_channel, false, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
@@ -1820,7 +1811,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
                struct lttng_consumer_channel *found_channel;
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
                struct lttng_consumer_channel *found_channel;
-               uint64_t key = msg.u.snapshot_channel.key;
+               const uint64_t key = msg.u.snapshot_channel.key;
                int ret_send;
 
                found_channel = consumer_find_channel(key);
                int ret_send;
 
                found_channel = consumer_find_channel(key);
@@ -1869,41 +1860,36 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                int ret = 0;
                uint64_t discarded_events;
        {
                int ret = 0;
                uint64_t discarded_events;
-               struct lttng_ht_iter iter;
-               struct lttng_ht *ht;
-               struct lttng_consumer_stream *stream;
-               uint64_t id = msg.u.discarded_events.session_id;
-               uint64_t key = msg.u.discarded_events.channel_key;
+               const auto id = msg.u.discarded_events.session_id;
+               const auto key = msg.u.discarded_events.channel_key;
 
                DBG("UST consumer discarded events command for session id %" PRIu64, id);
 
                DBG("UST consumer discarded events command for session id %" PRIu64, id);
-               rcu_read_lock();
-               pthread_mutex_lock(&the_consumer_data.lock);
-
-               ht = the_consumer_data.stream_list_ht;
-
-               /*
-                * We only need a reference to the channel, but they are not
-                * directly indexed, so we just use the first matching stream
-                * to extract the information we need, we default to 0 if not
-                * found (no events are dropped if the channel is not yet in
-                * use).
-                */
-               discarded_events = 0;
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
                {
                {
-                       if (stream->chan->key == key) {
-                               discarded_events = stream->chan->discarded_events;
-                               break;
+                       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+                       const auto ht = the_consumer_data.stream_list_ht;
+
+                       /*
+                        * We only need a reference to the channel, but they are not
+                        * directly indexed, so we just use the first matching stream
+                        * to extract the information we need, we default to 0 if not
+                        * found (no events are dropped if the channel is not yet in
+                        * use).
+                        */
+                       discarded_events = 0;
+                       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                                    lttng_consumer_stream,
+                                    decltype(lttng_consumer_stream::node_channel_id),
+                                    &lttng_consumer_stream::node_session_id,
+                                    std::uint64_t>(*ht->ht,
+                                                   &id,
+                                                   ht->hash_fct(&id, lttng_ht_seed),
+                                                   ht->match_fct)) {
+                               if (stream->chan->key == key) {
+                                       discarded_events = stream->chan->discarded_events;
+                                       break;
+                               }
                        }
                }
                        }
                }
-               pthread_mutex_unlock(&the_consumer_data.lock);
-               rcu_read_unlock();
 
                DBG("UST consumer discarded events command for session id %" PRIu64
                    ", channel key %" PRIu64,
 
                DBG("UST consumer discarded events command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -1925,40 +1911,35 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                int ret;
                uint64_t lost_packets;
        {
                int ret;
                uint64_t lost_packets;
-               struct lttng_ht_iter iter;
-               struct lttng_ht *ht;
-               struct lttng_consumer_stream *stream;
-               uint64_t id = msg.u.lost_packets.session_id;
-               uint64_t key = msg.u.lost_packets.channel_key;
+               const auto id = msg.u.lost_packets.session_id;
+               const auto key = msg.u.lost_packets.channel_key;
 
                DBG("UST consumer lost packets command for session id %" PRIu64, id);
 
                DBG("UST consumer lost packets command for session id %" PRIu64, id);
-               rcu_read_lock();
-               pthread_mutex_lock(&the_consumer_data.lock);
-
-               ht = the_consumer_data.stream_list_ht;
-
-               /*
-                * We only need a reference to the channel, but they are not
-                * directly indexed, so we just use the first matching stream
-                * to extract the information we need, we default to 0 if not
-                * found (no packets lost if the channel is not yet in use).
-                */
-               lost_packets = 0;
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
                {
                {
-                       if (stream->chan->key == key) {
-                               lost_packets = stream->chan->lost_packets;
-                               break;
+                       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+                       const auto ht = the_consumer_data.stream_list_ht;
+
+                       /*
+                        * We only need a reference to the channel, but they are not
+                        * directly indexed, so we just use the first matching stream
+                        * to extract the information we need, we default to 0 if not
+                        * found (no packets lost if the channel is not yet in use).
+                        */
+                       lost_packets = 0;
+                       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                                    lttng_consumer_stream,
+                                    decltype(lttng_consumer_stream::node_session_id),
+                                    &lttng_consumer_stream::node_session_id,
+                                    std::uint64_t>(*ht->ht,
+                                                   &id,
+                                                   ht->hash_fct(&id, lttng_ht_seed),
+                                                   ht->match_fct)) {
+                               if (stream->chan->key == key) {
+                                       lost_packets = stream->chan->lost_packets;
+                                       break;
+                               }
                        }
                }
                        }
                }
-               pthread_mutex_unlock(&the_consumer_data.lock);
-               rcu_read_unlock();
 
                DBG("UST consumer lost packets command for session id %" PRIu64
                    ", channel key %" PRIu64,
 
                DBG("UST consumer lost packets command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -2024,7 +2005,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ROTATE_CHANNEL:
        {
                struct lttng_consumer_channel *found_channel;
        case LTTNG_CONSUMER_ROTATE_CHANNEL:
        {
                struct lttng_consumer_channel *found_channel;
-               uint64_t key = msg.u.rotate_channel.key;
+               const uint64_t key = msg.u.rotate_channel.key;
                int ret_send_status;
 
                found_channel = consumer_find_channel(key);
                int ret_send_status;
 
                found_channel = consumer_find_channel(key);
@@ -2077,7 +2058,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_CLEAR_CHANNEL:
        {
                struct lttng_consumer_channel *found_channel;
        case LTTNG_CONSUMER_CLEAR_CHANNEL:
        {
                struct lttng_consumer_channel *found_channel;
-               uint64_t key = msg.u.clear_channel.key;
+               const uint64_t key = msg.u.clear_channel.key;
                int ret_send_status;
 
                found_channel = consumer_find_channel(key);
                int ret_send_status;
 
                found_channel = consumer_find_channel(key);
@@ -2295,7 +2276,6 @@ error_fatal:
        goto end;
 
 end:
        goto end;
 
 end:
-       rcu_read_unlock();
        health_code_update();
        return ret_func;
 }
        health_code_update();
        return ret_func;
 }
@@ -2559,7 +2539,9 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                ret = write_len;
                goto end;
        }
                ret = write_len;
                goto end;
        }
+
        stream->ust_metadata_pushed += write_len;
        stream->ust_metadata_pushed += write_len;
+       stream->chan->metadata_pushed_wait_queue.wake_all();
 
        LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
        ret = write_len;
 
        LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
        ret = write_len;
@@ -2608,7 +2590,7 @@ lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                status = SYNC_METADATA_STATUS_ERROR;
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                status = SYNC_METADATA_STATUS_ERROR;
@@ -3204,23 +3186,23 @@ end:
  */
 void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
 {
  */
 void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        LTTNG_ASSERT(metadata_ht);
        LTTNG_ASSERT(metadata_ht->ht);
 
        DBG("UST consumer closing all metadata streams");
 
        LTTNG_ASSERT(metadata_ht);
        LTTNG_ASSERT(metadata_ht->ht);
 
        DBG("UST consumer closing all metadata streams");
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
                health_code_update();
 
                pthread_mutex_lock(&stream->chan->lock);
                health_code_update();
 
                pthread_mutex_lock(&stream->chan->lock);
+               pthread_mutex_lock(&stream->lock);
                lttng_ustconsumer_close_metadata(stream->chan);
                lttng_ustconsumer_close_metadata(stream->chan);
+               pthread_mutex_unlock(&stream->lock);
                pthread_mutex_unlock(&stream->chan->lock);
        }
                pthread_mutex_unlock(&stream->chan->lock);
        }
-       rcu_read_unlock();
 }
 
 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
 }
 
 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
@@ -3245,16 +3227,17 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                                       struct lttng_consumer_channel *channel,
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                                       struct lttng_consumer_channel *channel,
-                                      int timer,
+                                      bool invoked_by_timer,
                                       int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
                                       int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
-       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+       const lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        uint64_t len, key, offset, version;
        int ret;
 
        LTTNG_ASSERT(channel);
        uint64_t len, key, offset, version;
        int ret;
 
        LTTNG_ASSERT(channel);
+       LTTNG_ASSERT(!channel->is_deleted);
        LTTNG_ASSERT(channel->metadata_cache);
 
        memset(&request, 0, sizeof(request));
        LTTNG_ASSERT(channel->metadata_cache);
 
        memset(&request, 0, sizeof(request));
@@ -3350,8 +3333,14 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        health_code_update();
 
 
        health_code_update();
 
-       ret = lttng_ustconsumer_recv_metadata(
-               ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
+       ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+                                             key,
+                                             offset,
+                                             len,
+                                             version,
+                                             channel,
+                                             invoked_by_timer,
+                                             wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
This page took 0.03574 seconds and 4 git commands to generate.