Rename C++ header files to .hpp
[lttng-tools.git] / src / common / consumer / consumer.cpp
index 5730fd45c28dfc97438bda6774af69950f348758..685d55f838120a4b94cf407d9e420e03cf294e32 100644 (file)
@@ -7,42 +7,42 @@
  *
  */
 
-#include "common/index/ctf-index.h"
 #define _LGPL_SOURCE
+#include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
+#include <signal.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include <inttypes.h>
-#include <signal.h>
 
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/utils.h>
-#include <common/time.h>
-#include <common/compat/poll.h>
-#include <common/compat/endian.h>
-#include <common/index/index.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-testpoint.h>
-#include <common/align.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/trace-chunk.h>
-#include <common/trace-chunk-registry.h>
-#include <common/string-utils/format.h>
-#include <common/dynamic-array.h>
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/align.hpp>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-testpoint.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/dynamic-array.hpp>
+#include <common/index/ctf-index.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/string-utils/format.hpp>
+#include <common/time.hpp>
+#include <common/trace-chunk-registry.hpp>
+#include <common/trace-chunk.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
 
 lttng_consumer_global_data the_consumer_data;
 
@@ -173,7 +173,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
                        send_node) {
-               cds_list_del(&stream->send_node);
                /*
                 * Once a stream is added to this list, the buffers were created so we
                 * have a guarantee that this call will succeed. Setting the monitor
@@ -246,6 +245,8 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        struct lttng_ht_node_u64 *node;
        struct lttng_consumer_channel *channel = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        /* -1ULL keys are lookup failures */
        if (key == (uint64_t) -1ULL) {
                return NULL;
@@ -521,6 +522,7 @@ void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
 {
        LTTNG_ASSERT(relayd);
+       ASSERT_RCU_READ_LOCKED();
 
        /* Set destroy flag for this object */
        uatomic_set(&relayd->destroy_flag, 1);
@@ -633,6 +635,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
        struct lttng_ht_iter iter;
 
        LTTNG_ASSERT(relayd);
+       ASSERT_RCU_READ_LOCKED();
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx,
                        &iter);
@@ -690,6 +693,8 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
        struct lttng_ht_node_u64 *node;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        /* Negative keys are lookup failures */
        if (key == (uint64_t) -1ULL) {
                goto error;
@@ -3398,7 +3403,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
         */
        if (stream->rotate_ready) {
                DBG("Rotate stream before consuming data");
-               ret = lttng_consumer_rotate_stream(ctx, stream);
+               ret = lttng_consumer_rotate_stream(stream);
                if (ret < 0) {
                        ERR("Stream rotation error before consuming data");
                        goto end;
@@ -3454,7 +3459,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
         */
        rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
        if (rotation_ret == 1) {
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               rotation_ret = lttng_consumer_rotate_stream(stream);
                if (rotation_ret < 0) {
                        ret = rotation_ret;
                        ERR("Stream rotation error after consuming data");
@@ -3558,18 +3563,24 @@ error:
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
- void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
-               struct lttng_consumer_local_data *ctx, int sock,
+void consumer_add_relayd_socket(uint64_t net_seq_idx,
+               int sock_type,
+               struct lttng_consumer_local_data *ctx,
+               int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
-               uint64_t relayd_session_id)
+               uint64_t sessiond_id,
+               uint64_t relayd_session_id,
+               uint32_t relayd_version_major,
+               uint32_t relayd_version_minor,
+               enum lttcomm_sock_proto relayd_socket_protocol)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        LTTNG_ASSERT(ctx);
-       LTTNG_ASSERT(relayd_sock);
+       LTTNG_ASSERT(sock >= 0);
+       ASSERT_RCU_READ_LOCKED();
 
        DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
 
@@ -3638,54 +3649,25 @@ error:
        switch (sock_type) {
        case LTTNG_STREAM_CONTROL:
                /* Copy received lttcomm socket */
-               lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
-               ret = lttcomm_create_sock(&relayd->control_sock.sock);
-               /* Handle create_sock error. */
-               if (ret < 0) {
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto error;
-               }
-               /*
-                * Close the socket created internally by
-                * lttcomm_create_sock, so we can replace it by the one
-                * received from sessiond.
-                */
-               if (close(relayd->control_sock.sock.fd)) {
-                       PERROR("close");
-               }
+               ret = lttcomm_populate_sock_from_open_socket(
+                               &relayd->control_sock.sock, fd,
+                               relayd_socket_protocol);
 
-               /* Assign new file descriptor */
-               relayd->control_sock.sock.fd = fd;
                /* Assign version values. */
-               relayd->control_sock.major = relayd_sock->major;
-               relayd->control_sock.minor = relayd_sock->minor;
+               relayd->control_sock.major = relayd_version_major;
+               relayd->control_sock.minor = relayd_version_minor;
 
                relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
-               lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
-               ret = lttcomm_create_sock(&relayd->data_sock.sock);
-               /* Handle create_sock error. */
-               if (ret < 0) {
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto error;
-               }
-               /*
-                * Close the socket created internally by
-                * lttcomm_create_sock, so we can replace it by the one
-                * received from sessiond.
-                */
-               if (close(relayd->data_sock.sock.fd)) {
-                       PERROR("close");
-               }
-
-               /* Assign new file descriptor */
-               relayd->data_sock.sock.fd = fd;
+               ret = lttcomm_populate_sock_from_open_socket(
+                               &relayd->data_sock.sock, fd,
+                               relayd_socket_protocol);
                /* Assign version values. */
-               relayd->data_sock.major = relayd_sock->major;
-               relayd->data_sock.minor = relayd_sock->minor;
+               relayd->data_sock.major = relayd_version_major;
+               relayd->data_sock.minor = relayd_version_minor;
                break;
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
@@ -3693,6 +3675,11 @@ error:
                goto error;
        }
 
+       if (ret < 0) {
+               ret_code = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
        DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
@@ -3749,6 +3736,8 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
        struct lttng_ht_iter iter;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        /* Iterate over all relayd since they are indexed by net_seq_idx. */
        cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
                        relayd, node.node) {
@@ -3996,8 +3985,7 @@ end:
  * Returns 0 on success, < 0 on error
  */
 int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
-               uint64_t key, uint64_t relayd_id, uint32_t metadata,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t key, uint64_t relayd_id)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -4013,6 +4001,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        struct lttng_dynamic_pointer_array streams_packet_to_open;
        size_t stream_idx;
 
+       ASSERT_RCU_READ_LOCKED();
+
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        lttng_dynamic_array_init(&stream_rotation_positions,
@@ -4545,8 +4535,7 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre
  * Perform the rotation a local stream file.
  */
 static
-int rotate_local_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int rotate_local_stream(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
 
@@ -4585,8 +4574,7 @@ end:
  *
  * Return 0 on success, a negative number of error.
  */
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
@@ -4621,7 +4609,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        }
 
        if (stream->net_seq_idx == (uint64_t) -1ULL) {
-               ret = rotate_local_stream(ctx, stream);
+               ret = rotate_local_stream(stream);
                if (ret < 0) {
                        ERR("Failed to rotate stream, ret = %i", ret);
                        goto error;
@@ -4665,13 +4653,15 @@ error:
  * Returns 0 on success, < 0 on error
  */
 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
-               uint64_t key, struct lttng_consumer_local_data *ctx)
+               uint64_t key)
 {
        int ret;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
 
+       ASSERT_RCU_READ_LOCKED();
+
        rcu_read_lock();
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
@@ -4692,7 +4682,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
                }
                DBG("Consumer rotate ready stream %" PRIu64, stream->key);
 
-               ret = lttng_consumer_rotate_stream(ctx, stream);
+               ret = lttng_consumer_rotate_stream(stream);
                pthread_mutex_unlock(&stream->lock);
                pthread_mutex_unlock(&stream->chan->lock);
                if (ret) {
This page took 0.02687 seconds and 4 git commands to generate.