Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
index 3b42438d8826a1d2e246cf2c403a92107f699d79..f02380fbcddc39368a88373adba462506907753a 100644 (file)
@@ -13,7 +13,6 @@
 #include <common/buffer-view.hpp>
 #include <common/common.hpp>
 #include <common/compat/endian.hpp>
 #include <common/buffer-view.hpp>
 #include <common/common.hpp>
 #include <common/compat/endian.hpp>
-#include <common/compat/fcntl.hpp>
 #include <common/consumer/consumer-stream.hpp>
 #include <common/consumer/consumer-timer.hpp>
 #include <common/consumer/consumer.hpp>
 #include <common/consumer/consumer-stream.hpp>
 #include <common/consumer/consumer-timer.hpp>
 #include <common/consumer/consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/optional.hpp>
 #include <common/pipe.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/optional.hpp>
 #include <common/pipe.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/urcu.hpp>
 #include <common/utils.hpp>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
 #include <common/utils.hpp>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -51,7 +54,7 @@ extern int consumer_poll_timeout;
 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
-       int infd = stream->wait_fd;
+       const int infd = stream->wait_fd;
 
        ret = kernctl_snapshot(infd);
        /*
 
        ret = kernctl_snapshot(infd);
        /*
@@ -85,7 +88,7 @@ int lttng_kconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stre
 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
 {
        int ret;
 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
 {
        int ret;
-       int infd = stream->wait_fd;
+       const int infd = stream->wait_fd;
 
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
 
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
@@ -103,7 +106,7 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
 {
        int ret;
 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
 {
        int ret;
-       int infd = stream->wait_fd;
+       const int infd = stream->wait_fd;
 
        ret = kernctl_snapshot_get_consumed(infd, pos);
        if (ret != 0) {
 
        ret = kernctl_snapshot_get_consumed(infd, pos);
        if (ret != 0) {
@@ -144,14 +147,13 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                                            uint64_t nb_packets_per_stream)
 {
        int ret;
                                            uint64_t nb_packets_per_stream)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
 
        DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        /* Prevent channel modifications while we perform the snapshot.*/
 
        DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        /* Prevent channel modifications while we perform the snapshot.*/
-       pthread_mutex_lock(&channel->lock);
+       const lttng::pthread::lock_guard channe_lock(channel->lock);
 
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Splice is not supported yet for channel snapshot. */
        if (channel->output != CONSUMER_CHANNEL_MMAP) {
 
        /* Splice is not supported yet for channel snapshot. */
        if (channel->output != CONSUMER_CHANNEL_MMAP) {
@@ -161,7 +163,9 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                goto end;
        }
 
                goto end;
        }
 
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                unsigned long consumed_pos, produced_pos;
 
                health_code_update();
                unsigned long consumed_pos, produced_pos;
 
                health_code_update();
@@ -169,7 +173,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                /*
                 * Lock stream because we are about to change its state.
                 */
                /*
                 * Lock stream because we are about to change its state.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                LTTNG_ASSERT(channel->trace_chunk);
                if (!lttng_trace_chunk_get(channel->trace_chunk)) {
 
                LTTNG_ASSERT(channel->trace_chunk);
                if (!lttng_trace_chunk_get(channel->trace_chunk)) {
@@ -179,7 +183,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                         */
                        ERR("Failed to acquire reference to channel's trace chunk");
                        ret = -1;
                         */
                        ERR("Failed to acquire reference to channel's trace chunk");
                        ret = -1;
-                       goto end_unlock;
+                       goto end;
                }
                LTTNG_ASSERT(!stream->trace_chunk);
                stream->trace_chunk = channel->trace_chunk;
                }
                LTTNG_ASSERT(!stream->trace_chunk);
                stream->trace_chunk = channel->trace_chunk;
@@ -190,16 +194,21 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                 */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                 */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
+
+               /* Close stream output when were are done. */
+               const auto close_stream_output = lttng::make_scope_exit(
+                       [stream]() noexcept { consumer_stream_close_output(stream); });
+
                if (relayd_id != (uint64_t) -1ULL) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
                if (relayd_id != (uint64_t) -1ULL) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
-                               goto error_close_stream_output;
+                               goto end;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
-                               goto error_close_stream_output;
+                               goto end;
                        }
                        DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
                        }
                        DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
@@ -216,27 +225,27 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                        ret = kernctl_buffer_flush(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Failed to flush kernel stream");
                        ret = kernctl_buffer_flush(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Failed to flush kernel stream");
-                               goto error_close_stream_output;
+                               goto end;
                        }
                        }
-                       goto end_unlock;
+                       goto end;
                }
 
                ret = lttng_kconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot");
                }
 
                ret = lttng_kconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot");
-                       goto error_close_stream_output;
+                       goto end;
                }
 
                ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced kernel snapshot position");
                }
 
                ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced kernel snapshot position");
-                       goto error_close_stream_output;
+                       goto end;
                }
 
                ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd kernel snapshot position");
                }
 
                ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd kernel snapshot position");
-                       goto error_close_stream_output;
+                       goto end;
                }
 
                consumed_pos = consumer_get_consume_start_pos(
                }
 
                consumed_pos = consumer_get_consume_start_pos(
@@ -255,7 +264,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
-                                       goto error_close_stream_output;
+                                       goto end;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
@@ -263,21 +272,29 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                                continue;
                        }
 
                                continue;
                        }
 
+                       /* Put the subbuffer once we are done. */
+                       const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+                               const auto put_ret = kernctl_put_subbuf(stream->wait_fd);
+                               if (put_ret < 0) {
+                                       ERR("Snapshot kernctl_put_subbuf");
+                               }
+                       });
+
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               goto error_put_subbuf;
+                               goto end;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               goto error_put_subbuf;
+                               goto end;
                        }
 
                        ret = get_current_subbuf_addr(stream, &subbuf_addr);
                        if (ret) {
                        }
 
                        ret = get_current_subbuf_addr(stream, &subbuf_addr);
                        if (ret) {
-                               goto error_put_subbuf;
+                               goto end;
                        }
 
                        subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
                        }
 
                        subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
@@ -302,34 +319,15 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                                }
                        }
 
                                }
                        }
 
-                       ret = kernctl_put_subbuf(stream->wait_fd);
-                       if (ret < 0) {
-                               ERR("Snapshot kernctl_put_subbuf");
-                               goto error_close_stream_output;
-                       }
                        consumed_pos += stream->max_sb_size;
                }
                        consumed_pos += stream->max_sb_size;
                }
-
-               consumer_stream_close_output(stream);
-               pthread_mutex_unlock(&stream->lock);
        }
 
        /* All good! */
        ret = 0;
        goto end;
 
        }
 
        /* All good! */
        ret = 0;
        goto end;
 
-error_put_subbuf:
-       ret = kernctl_put_subbuf(stream->wait_fd);
-       if (ret < 0) {
-               ERR("Snapshot kernctl_put_subbuf error path");
-       }
-error_close_stream_output:
-       consumer_stream_close_output(stream);
-end_unlock:
-       pthread_mutex_unlock(&stream->lock);
 end:
 end:
-       rcu_read_unlock();
-       pthread_mutex_unlock(&channel->lock);
        return ret;
 }
 
        return ret;
 }
 
@@ -354,7 +352,7 @@ static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *meta
 
        DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
 
 
        DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
 
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        metadata_stream = metadata_channel->metadata_stream;
        LTTNG_ASSERT(metadata_stream);
 
        metadata_stream = metadata_channel->metadata_stream;
        LTTNG_ASSERT(metadata_stream);
@@ -415,7 +413,6 @@ error_snapshot:
        metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
        consumer_stream_destroy(metadata_stream, nullptr);
        metadata_channel->metadata_stream = nullptr;
        metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
        consumer_stream_destroy(metadata_stream, nullptr);
        metadata_channel->metadata_stream = nullptr;
-       rcu_read_unlock();
        return ret;
 }
 
        return ret;
 }
 
@@ -455,14 +452,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        /* relayd needs RCU read-side protection */
        health_code_update();
 
        /* relayd needs RCU read-side protection */
-       rcu_read_lock();
+       const lttng::urcu::read_lock_guard read_lock;
 
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
 
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
-               uint32_t major = msg.u.relayd_sock.major;
-               uint32_t minor = msg.u.relayd_sock.minor;
-               enum lttcomm_sock_proto protocol =
+               const uint32_t major = msg.u.relayd_sock.major;
+               const uint32_t minor = msg.u.relayd_sock.minor;
+               const lttcomm_sock_proto protocol =
                        (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
 
                /* Session daemon status message are handled in the following call. */
                        (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
 
                /* Session daemon status message are handled in the following call. */
@@ -543,7 +540,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                if (ctx->on_recv_channel != nullptr) {
                health_code_update();
 
                if (ctx->on_recv_channel != nullptr) {
-                       int ret_recv_channel = ctx->on_recv_channel(new_channel);
+                       const int ret_recv_channel = ctx->on_recv_channel(new_channel);
                        if (ret_recv_channel == 0) {
                                ret_add_channel = consumer_add_channel(new_channel, ctx);
                        } else if (ret_recv_channel < 0) {
                        if (ret_recv_channel == 0) {
                                ret_add_channel = consumer_add_channel(new_channel, ctx);
                        } else if (ret_recv_channel < 0) {
@@ -710,7 +707,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                pthread_mutex_lock(&new_stream->lock);
                if (ctx->on_recv_stream) {
 
                pthread_mutex_lock(&new_stream->lock);
                if (ctx->on_recv_stream) {
-                       int ret_recv_stream = ctx->on_recv_stream(new_stream);
+                       const int ret_recv_stream = ctx->on_recv_stream(new_stream);
                        if (ret_recv_stream < 0) {
                                pthread_mutex_unlock(&new_stream->lock);
                                pthread_mutex_unlock(&channel->lock);
                        if (ret_recv_stream < 0) {
                                pthread_mutex_unlock(&new_stream->lock);
                                pthread_mutex_unlock(&channel->lock);
@@ -783,7 +780,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
 
                health_code_update();
 
-               ret_pipe_write = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
+               ret_pipe_write =
+                       lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); /* NOLINT
+                                                                                          sizeof
+                                                                                          used on a
+                                                                                          pointer.
+                                                                                        */
                if (ret_pipe_write < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                            new_stream->metadata_flag ? "metadata" : "data",
                if (ret_pipe_write < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                            new_stream->metadata_flag ? "metadata" : "data",
@@ -867,12 +869,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
-               rcu_read_unlock();
                return -ENOSYS;
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
        {
                return -ENOSYS;
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
        {
-               uint64_t index = msg.u.destroy_relayd.net_seq_idx;
+               const uint64_t index = msg.u.destroy_relayd.net_seq_idx;
                struct consumer_relayd_sock_pair *relayd;
                int ret_send_status;
 
                struct consumer_relayd_sock_pair *relayd;
                int ret_send_status;
 
@@ -912,7 +913,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_DATA_PENDING:
        {
                int32_t ret_data_pending;
        case LTTNG_CONSUMER_DATA_PENDING:
        {
                int32_t ret_data_pending;
-               uint64_t id = msg.u.data_pending.session_id;
+               const uint64_t id = msg.u.data_pending.session_id;
                ssize_t ret_send;
 
                DBG("Kernel consumer data pending command for id %" PRIu64, id);
                ssize_t ret_send;
 
                DBG("Kernel consumer data pending command for id %" PRIu64, id);
@@ -938,7 +939,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
                struct lttng_consumer_channel *channel;
        case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
        {
                struct lttng_consumer_channel *channel;
-               uint64_t key = msg.u.snapshot_channel.key;
+               const uint64_t key = msg.u.snapshot_channel.key;
                int ret_send_status;
 
                channel = consumer_find_channel(key);
                int ret_send_status;
 
                channel = consumer_find_channel(key);
@@ -985,7 +986,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
-               uint64_t key = msg.u.destroy_channel.key;
+               const uint64_t key = msg.u.destroy_channel.key;
                struct lttng_consumer_channel *channel;
                int ret_send_status;
 
                struct lttng_consumer_channel *channel;
                int ret_send_status;
 
@@ -1031,8 +1032,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ssize_t ret;
                uint64_t count;
                struct lttng_consumer_channel *channel;
                ssize_t ret;
                uint64_t count;
                struct lttng_consumer_channel *channel;
-               uint64_t id = msg.u.discarded_events.session_id;
-               uint64_t key = msg.u.discarded_events.channel_key;
+               const uint64_t id = msg.u.discarded_events.session_id;
+               const uint64_t key = msg.u.discarded_events.channel_key;
 
                DBG("Kernel consumer discarded events command for session id %" PRIu64
                    ", channel key %" PRIu64,
 
                DBG("Kernel consumer discarded events command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -1063,8 +1064,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ssize_t ret;
                uint64_t count;
                struct lttng_consumer_channel *channel;
                ssize_t ret;
                uint64_t count;
                struct lttng_consumer_channel *channel;
-               uint64_t id = msg.u.lost_packets.session_id;
-               uint64_t key = msg.u.lost_packets.channel_key;
+               const uint64_t id = msg.u.lost_packets.session_id;
+               const uint64_t key = msg.u.lost_packets.channel_key;
 
                DBG("Kernel consumer lost packets command for session id %" PRIu64
                    ", channel key %" PRIu64,
 
                DBG("Kernel consumer lost packets command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -1143,7 +1144,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ROTATE_CHANNEL:
        {
                struct lttng_consumer_channel *channel;
        case LTTNG_CONSUMER_ROTATE_CHANNEL:
        {
                struct lttng_consumer_channel *channel;
-               uint64_t key = msg.u.rotate_channel.key;
+               const uint64_t key = msg.u.rotate_channel.key;
                int ret_send_status;
 
                DBG("Consumer rotate channel %" PRIu64, key);
                int ret_send_status;
 
                DBG("Consumer rotate channel %" PRIu64, key);
@@ -1189,7 +1190,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_CLEAR_CHANNEL:
        {
                struct lttng_consumer_channel *channel;
        case LTTNG_CONSUMER_CLEAR_CHANNEL:
        {
                struct lttng_consumer_channel *channel;
-               uint64_t key = msg.u.clear_channel.key;
+               const uint64_t key = msg.u.clear_channel.key;
                int ret_send_status;
 
                channel = consumer_find_channel(key);
                int ret_send_status;
 
                channel = consumer_find_channel(key);
@@ -1383,7 +1384,6 @@ end_msg_sessiond:
 
 end:
        health_code_update();
 
 end:
        health_code_update();
-       rcu_read_unlock();
        return ret_func;
 }
 
        return ret_func;
 }
 
This page took 0.032108 seconds and 4 git commands to generate.