waiter: modernize the waiter interface
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
index ea3e3619106654d556bdbf543afca0ce75308bd2..3f40d03ed3d846cb8fa95788c94bf555be5fb28f 100644 (file)
@@ -12,7 +12,6 @@
 
 #include <common/common.hpp>
 #include <common/compat/endian.hpp>
-#include <common/compat/fcntl.hpp>
 #include <common/consumer/consumer-metadata-cache.hpp>
 #include <common/consumer/consumer-stream.hpp>
 #include <common/consumer/consumer-timer.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/shm.hpp>
+#include <common/urcu.hpp>
 #include <common/utils.hpp>
 
 #include <lttng/ust-ctl.h>
 #include <lttng/ust-sigbus.h>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -63,7 +64,7 @@ static int add_channel(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(ctx);
 
-       if (ctx->on_recv_channel != NULL) {
+       if (ctx->on_recv_channel != nullptr) {
                ret = ctx->on_recv_channel(channel);
                if (ret == 0) {
                        ret = consumer_add_channel(channel, ctx);
@@ -95,7 +96,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu,
                                                     int *_alloc_ret)
 {
        int alloc_ret;
-       struct lttng_consumer_stream *stream = NULL;
+       struct lttng_consumer_stream *stream = nullptr;
 
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(ctx);
@@ -111,7 +112,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu,
                                        &alloc_ret,
                                        channel->type,
                                        channel->monitor);
-       if (stream == NULL) {
+       if (stream == nullptr) {
                switch (alloc_ret) {
                case -ENOENT:
                        /*
@@ -167,7 +168,8 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
        stream->globally_visible = 1;
        cds_list_del_init(&stream->send_node);
 
-       ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
+       ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a
+                                                                        pointer. */
        if (ret < 0) {
                ERR("Consumer write %s stream to pipe %d",
                    stream->metadata_flag ? "metadata" : "data",
@@ -214,7 +216,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
        int ret, cpu = 0;
        struct lttng_ust_ctl_consumer_stream *ustream;
        struct lttng_consumer_stream *stream;
-       pthread_mutex_t *current_stream_lock = NULL;
+       pthread_mutex_t *current_stream_lock = nullptr;
 
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(ctx);
@@ -302,7 +304,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                        }
                }
                pthread_mutex_unlock(&stream->lock);
-               current_stream_lock = NULL;
+               current_stream_lock = nullptr;
        }
 
        return 0;
@@ -539,7 +541,7 @@ static int send_channel_to_sessiond_and_relayd(int sock,
        }
 
        /* Tell sessiond there is no more stream. */
-       ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL);
+       ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr);
        if (ret < 0) {
                goto error;
        }
@@ -665,7 +667,7 @@ static int flush_channel(uint64_t chan_key)
 
        DBG("UST consumer flush channel key %" PRIu64, chan_key);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(chan_key);
        if (!channel) {
                ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
@@ -720,7 +722,6 @@ static int flush_channel(uint64_t chan_key)
         */
        sample_and_send_channel_buffer_stats(channel);
 error:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -740,7 +741,7 @@ static int clear_quiescent_channel(uint64_t chan_key)
 
        DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(chan_key);
        if (!channel) {
                ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
@@ -766,7 +767,6 @@ static int clear_quiescent_channel(uint64_t chan_key)
                pthread_mutex_unlock(&stream->lock);
        }
 error:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -928,8 +928,10 @@ error:
         * the stream is still in the local stream list of the channel. This call
         * will make sure to clean that list.
         */
-       consumer_stream_destroy(metadata->metadata_stream, NULL);
-       metadata->metadata_stream = NULL;
+       consumer_stream_destroy(metadata->metadata_stream, nullptr);
+       metadata->metadata_stream = nullptr;
+       metadata->metadata_pushed_wait_queue.wake_all();
+
 send_streams_error:
 error_no_stream:
 end:
@@ -957,7 +959,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
 
        DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        LTTNG_ASSERT(!metadata_channel->monitor);
 
@@ -967,7 +969,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
        if (ret < 0) {
                goto error;
        }
@@ -1011,11 +1013,11 @@ error_stream:
         * Clean up the stream completely because the next snapshot will use a
         * new metadata stream.
         */
-       consumer_stream_destroy(metadata_stream, NULL);
-       metadata_channel->metadata_stream = NULL;
+       consumer_stream_destroy(metadata_stream, nullptr);
+       metadata_channel->metadata_stream = nullptr;
+       metadata_channel->metadata_pushed_wait_queue.wake_all();
 
 error:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1066,7 +1068,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(ctx);
        ASSERT_RCU_READ_LOCKED();
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id != (uint64_t) -1ULL) {
                use_relayd = 1;
@@ -1217,7 +1219,6 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                pthread_mutex_unlock(&stream->lock);
        }
 
-       rcu_read_unlock();
        return 0;
 
 error_put_subbuf:
@@ -1228,7 +1229,6 @@ error_close_stream:
        consumer_stream_close_output(stream);
 error_unlock:
        pthread_mutex_unlock(&stream->lock);
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1254,7 +1254,7 @@ int lttng_ustconsumer_recv_metadata(int sock,
                                    uint64_t len,
                                    uint64_t version,
                                    struct lttng_consumer_channel *channel,
-                                   int timer,
+                                   bool invoked_by_timer,
                                    int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
@@ -1306,7 +1306,7 @@ int lttng_ustconsumer_recv_metadata(int sock,
                 * channel is under a snapshot session type. No need to update
                 * the stream position in that scenario.
                 */
-               if (channel->metadata_stream != NULL) {
+               if (channel->metadata_stream != nullptr) {
                        pthread_mutex_lock(&channel->metadata_stream->lock);
                        metadata_stream_reset_cache_consumed_position(channel->metadata_stream);
                        pthread_mutex_unlock(&channel->metadata_stream->lock);
@@ -1342,13 +1342,8 @@ int lttng_ustconsumer_recv_metadata(int sock,
        if (!wait) {
                goto end_free;
        }
-       while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
-               DBG("Waiting for metadata to be flushed");
 
-               health_code_update();
-
-               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
-       }
+       consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
 
 end_free:
        free(metadata_str);
@@ -1368,7 +1363,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        int ret_func;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
-       struct lttng_consumer_channel *channel = NULL;
+       struct lttng_consumer_channel *channel = nullptr;
 
        health_code_update();
 
@@ -1400,7 +1395,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        /* relayd needs RCU read-side lock */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
@@ -1432,7 +1427,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get relayd reference if exists. */
                relayd = consumer_find_relayd(index);
-               if (relayd == NULL) {
+               if (relayd == nullptr) {
                        DBG("Unable to find relayd %" PRIu64, index);
                        ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                }
@@ -1455,7 +1450,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
-               rcu_read_unlock();
                return -ENOSYS;
        }
        case LTTNG_CONSUMER_DATA_PENDING:
@@ -1495,7 +1489,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                channel = consumer_allocate_channel(
                        msg.u.ask_channel.key,
                        msg.u.ask_channel.session_id,
-                       msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
+                       msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr,
                        msg.u.ask_channel.pathname,
                        msg.u.ask_channel.name,
                        msg.u.ask_channel.relayd_id,
@@ -1794,7 +1788,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(
-                       sock, key, offset, len, version, found_channel, 0, 1);
+                       sock, key, offset, len, version, found_channel, false, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
@@ -1876,7 +1870,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                uint64_t key = msg.u.discarded_events.channel_key;
 
                DBG("UST consumer discarded events command for session id %" PRIu64, id);
-               rcu_read_lock();
                pthread_mutex_lock(&the_consumer_data.lock);
 
                ht = the_consumer_data.stream_list_ht;
@@ -1903,7 +1896,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
                pthread_mutex_unlock(&the_consumer_data.lock);
-               rcu_read_unlock();
 
                DBG("UST consumer discarded events command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -1932,7 +1924,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                uint64_t key = msg.u.lost_packets.channel_key;
 
                DBG("UST consumer lost packets command for session id %" PRIu64, id);
-               rcu_read_lock();
                pthread_mutex_lock(&the_consumer_data.lock);
 
                ht = the_consumer_data.stream_list_ht;
@@ -1958,7 +1949,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
                pthread_mutex_unlock(&the_consumer_data.lock);
-               rcu_read_unlock();
 
                DBG("UST consumer lost packets command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -2131,8 +2121,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
                const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
                        msg.u.create_trace_chunk.override_name :
-                       NULL;
-               struct lttng_directory_handle *chunk_directory_handle = NULL;
+                       nullptr;
+               struct lttng_directory_handle *chunk_directory_handle = nullptr;
 
                /*
                 * The session daemon will only provide a chunk directory file
@@ -2172,12 +2162,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
 
                ret_code = lttng_consumer_create_trace_chunk(
-                       !is_local_trace ? &relayd_id : NULL,
+                       !is_local_trace ? &relayd_id : nullptr,
                        msg.u.create_trace_chunk.session_id,
                        msg.u.create_trace_chunk.chunk_id,
                        (time_t) msg.u.create_trace_chunk.creation_timestamp,
                        chunk_override_name,
-                       msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL,
+                       msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr,
                        chunk_directory_handle);
                lttng_directory_handle_put(chunk_directory_handle);
                goto end_msg_sessiond;
@@ -2192,11 +2182,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int ret;
 
                ret_code = lttng_consumer_close_trace_chunk(
-                       msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL,
+                       msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr,
                        msg.u.close_trace_chunk.session_id,
                        msg.u.close_trace_chunk.chunk_id,
                        (time_t) msg.u.close_trace_chunk.close_timestamp,
-                       msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL,
+                       msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr,
                        closed_trace_chunk_path);
                reply.ret_code = ret_code;
                reply.path_length = strlen(closed_trace_chunk_path) + 1;
@@ -2215,7 +2205,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
 
                ret_code = lttng_consumer_trace_chunk_exists(
-                       msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL,
+                       msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr,
                        msg.u.trace_chunk_exists.session_id,
                        msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
@@ -2279,7 +2269,7 @@ end_channel_error:
        {
                int ret_send_status;
 
-               ret_send_status = consumer_send_status_channel(sock, NULL);
+               ret_send_status = consumer_send_status_channel(sock, nullptr);
                if (ret_send_status < 0) {
                        /* Stop everything if session daemon can not be notified. */
                        goto error_fatal;
@@ -2295,7 +2285,6 @@ error_fatal:
        goto end;
 
 end:
-       rcu_read_unlock();
        health_code_update();
        return ret_func;
 }
@@ -2559,7 +2548,9 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                ret = write_len;
                goto end;
        }
+
        stream->ust_metadata_pushed += write_len;
+       stream->chan->metadata_pushed_wait_queue.wake_all();
 
        LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
        ret = write_len;
@@ -2608,7 +2599,7 @@ lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                status = SYNC_METADATA_STATUS_ERROR;
@@ -2864,7 +2855,7 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
 
        subbuffer->buffer.buffer =
                lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
-       LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL);
+       LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr);
 end:
        return ret;
 }
@@ -3212,15 +3203,17 @@ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
 
        DBG("UST consumer closing all metadata streams");
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
-               health_code_update();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               pthread_mutex_lock(&stream->chan->lock);
-               lttng_ustconsumer_close_metadata(stream->chan);
-               pthread_mutex_unlock(&stream->chan->lock);
+               cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+                       health_code_update();
+
+                       pthread_mutex_lock(&stream->chan->lock);
+                       lttng_ustconsumer_close_metadata(stream->chan);
+                       pthread_mutex_unlock(&stream->chan->lock);
+               }
        }
-       rcu_read_unlock();
 }
 
 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
@@ -3245,7 +3238,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                                       struct lttng_consumer_channel *channel,
-                                      int timer,
+                                      bool invoked_by_timer,
                                       int wait)
 {
        struct lttcomm_metadata_request_msg request;
@@ -3350,8 +3343,14 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        health_code_update();
 
-       ret = lttng_ustconsumer_recv_metadata(
-               ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
+       ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+                                             key,
+                                             offset,
+                                             len,
+                                             version,
+                                             channel,
+                                             invoked_by_timer,
+                                             wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
This page took 0.029262 seconds and 4 git commands to generate.