fix: relayd: unaligned access in trace_chunk_registry_ht_key_hash
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.cpp
index 17d2248bec8f9f2c35a1bb6e90f92b694867335c..13c18325d75d363e5da76674734b4e3aedcd0c07 100644 (file)
@@ -17,6 +17,7 @@
 #include <common/defaults.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 
 #include <inttypes.h>
@@ -83,7 +84,7 @@ char *setup_channel_trace_path(struct consumer_output *consumer,
        return pathname;
 error:
        free(pathname);
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -291,18 +292,17 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
 
        /* Destroy any relayd connection */
        if (consumer->type == CONSUMER_DST_NET) {
-               rcu_read_lock();
+               lttng::urcu::read_lock_guard read_lock;
+
                cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-                       int ret;
+                       /* Send destroy relayd command. */
+                       const int ret = consumer_send_destroy_relayd(socket, consumer);
 
-                       /* Send destroy relayd command */
-                       ret = consumer_send_destroy_relayd(socket, consumer);
                        if (ret < 0) {
                                DBG("Unable to send destroy relayd command to consumer");
                                /* Continue since we MUST delete everything at this point. */
                        }
                }
-               rcu_read_unlock();
        }
 }
 
@@ -319,7 +319,9 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o
 
        LTTNG_ASSERT(data);
 
-       if (output == NULL || data->cmd_sock < 0) {
+       lttng::urcu::read_lock_guard read_lock;
+
+       if (output == nullptr || data->cmd_sock < 0) {
                /*
                 * Not an error. Possible there is simply not spawned consumer or it's
                 * disabled for the tracing session asking the socket.
@@ -327,21 +329,17 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o
                goto error;
        }
 
-       rcu_read_lock();
        socket = consumer_find_socket(data->cmd_sock, output);
-       rcu_read_unlock();
-       if (socket == NULL) {
+       if (socket == nullptr) {
                socket = consumer_allocate_socket(&data->cmd_sock);
-               if (socket == NULL) {
+               if (socket == nullptr) {
                        ret = -1;
                        goto error;
                }
 
                socket->registered = 0;
                socket->lock = &data->lock;
-               rcu_read_lock();
                consumer_add_socket(socket, output);
-               rcu_read_unlock();
        }
 
        socket->type = data->type;
@@ -363,7 +361,7 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits,
                                                        const struct consumer_output *consumer)
 {
        int consumer_fd;
-       struct consumer_socket *socket = NULL;
+       struct consumer_socket *socket = nullptr;
 
        ASSERT_RCU_READ_LOCKED();
 
@@ -397,18 +395,18 @@ struct consumer_socket *consumer_find_socket(int key, const struct consumer_outp
 {
        struct lttng_ht_iter iter;
        struct lttng_ht_node_ulong *node;
-       struct consumer_socket *socket = NULL;
+       struct consumer_socket *socket = nullptr;
 
        ASSERT_RCU_READ_LOCKED();
 
        /* Negative keys are lookup failures */
-       if (key < 0 || consumer == NULL) {
-               return NULL;
+       if (key < 0 || consumer == nullptr) {
+               return nullptr;
        }
 
        lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node != NULL) {
+       if (node != nullptr) {
                socket = lttng::utils::container_of(node, &consumer_socket::node);
        }
 
@@ -420,12 +418,12 @@ struct consumer_socket *consumer_find_socket(int key, const struct consumer_outp
  */
 struct consumer_socket *consumer_allocate_socket(int *fd)
 {
-       struct consumer_socket *socket = NULL;
+       struct consumer_socket *socket = nullptr;
 
        LTTNG_ASSERT(fd);
 
        socket = zmalloc<consumer_socket>();
-       if (socket == NULL) {
+       if (socket == nullptr) {
                PERROR("zmalloc consumer socket");
                goto error;
        }
@@ -510,16 +508,16 @@ void consumer_destroy_socket(struct consumer_socket *sock)
  */
 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
 {
-       struct consumer_output *output = NULL;
+       struct consumer_output *output = nullptr;
 
        output = zmalloc<consumer_output>();
-       if (output == NULL) {
+       if (output == nullptr) {
                PERROR("zmalloc consumer_output");
                goto error;
        }
 
        /* By default, consumer output is enabled */
-       output->enabled = 1;
+       output->enabled = true;
        output->type = type;
        output->net_seq_index = (uint64_t) -1ULL;
        urcu_ref_init(&output->ref);
@@ -544,12 +542,14 @@ void consumer_destroy_output_sockets(struct consumer_output *obj)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
-               consumer_del_socket(socket, obj);
-               consumer_destroy_socket(socket);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
+                       consumer_del_socket(socket, obj);
+                       consumer_destroy_socket(socket);
+               }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -599,7 +599,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src)
        LTTNG_ASSERT(src);
 
        output = consumer_create_output(src->type);
-       if (output == NULL) {
+       if (output == nullptr) {
                goto end;
        }
        output->enabled = src->enabled;
@@ -619,7 +619,7 @@ end:
 
 error_put:
        consumer_output_put(output);
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -636,32 +636,33 @@ int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *s
        LTTNG_ASSERT(dst);
        LTTNG_ASSERT(src);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
-               /* Ignore socket that are already there. */
-               copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
-               if (copy_sock) {
-                       continue;
-               }
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               /* Create new socket object. */
-               copy_sock = consumer_allocate_socket(socket->fd_ptr);
-               if (copy_sock == NULL) {
-                       rcu_read_unlock();
-                       ret = -ENOMEM;
-                       goto error;
-               }
+               cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
+                       /* Ignore socket that are already there. */
+                       copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
+                       if (copy_sock) {
+                               continue;
+                       }
 
-               copy_sock->registered = socket->registered;
-               /*
-                * This is valid because this lock is shared accross all consumer
-                * object being the global lock of the consumer data structure of the
-                * session daemon.
-                */
-               copy_sock->lock = socket->lock;
-               consumer_add_socket(copy_sock, dst);
+                       /* Create new socket object. */
+                       copy_sock = consumer_allocate_socket(socket->fd_ptr);
+                       if (copy_sock == nullptr) {
+                               ret = -ENOMEM;
+                               goto error;
+                       }
+
+                       copy_sock->registered = socket->registered;
+                       /*
+                        * This is valid because this lock is shared accross all consumer
+                        * object being the global lock of the consumer data structure of the
+                        * session daemon.
+                        */
+                       copy_sock->lock = socket->lock;
+                       consumer_add_socket(copy_sock, dst);
+               }
        }
-       rcu_read_unlock();
 
 error:
        return ret;
@@ -678,7 +679,7 @@ int consumer_set_network_uri(const struct ltt_session *session,
                             struct lttng_uri *uri)
 {
        int ret;
-       struct lttng_uri *dst_uri = NULL;
+       struct lttng_uri *dst_uri = nullptr;
 
        /* Code flow error safety net. */
        LTTNG_ASSERT(output);
@@ -1268,33 +1269,35 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum
        msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
        msg.u.data_pending.session_id = session_id;
 
-       /* Send command for each consumer */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_socket_send(socket, &msg, sizeof(msg));
-               if (ret < 0) {
-                       pthread_mutex_unlock(socket->lock);
-                       goto error_unlock;
-               }
+       {
+               /* Send command for each consumer. */
+               lttng::urcu::read_lock_guard read_lock;
 
-               /*
-                * No need for a recv reply status because the answer to the command is
-                * the reply status message.
-                */
+               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto error_unlock;
+                       }
+
+                       /*
+                        * No need for a recv reply status because the answer to the command is
+                        * the reply status message.
+                        */
+                       ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto error_unlock;
+                       }
 
-               ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
-               if (ret < 0) {
                        pthread_mutex_unlock(socket->lock);
-                       goto error_unlock;
-               }
-               pthread_mutex_unlock(socket->lock);
 
-               if (ret_code == 1) {
-                       break;
+                       if (ret_code == 1) {
+                               break;
+                       }
                }
        }
-       rcu_read_unlock();
 
        DBG("Consumer data is %s pending for session id %" PRIu64,
            ret_code == 1 ? "" : "NOT",
@@ -1302,7 +1305,6 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum
        return ret_code;
 
 error_unlock:
-       rcu_read_unlock();
        return -1;
 }
 
@@ -1580,35 +1582,41 @@ int consumer_get_discarded_events(uint64_t session_id,
 
        *discarded = 0;
 
-       /* Send command for each consumer */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-               uint64_t consumer_discarded = 0;
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_socket_send(socket, &msg, sizeof(msg));
-               if (ret < 0) {
-                       pthread_mutex_unlock(socket->lock);
-                       goto end;
-               }
+       /* Send command for each consumer. */
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+                       uint64_t consumer_discarded = 0;
+
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
+
+                       /*
+                        * No need for a recv reply status because the answer to the
+                        * command is the reply status message.
+                        */
+                       ret = consumer_socket_recv(
+                               socket, &consumer_discarded, sizeof(consumer_discarded));
+                       if (ret < 0) {
+                               ERR("get discarded events");
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
 
-               /*
-                * No need for a recv reply status because the answer to the
-                * command is the reply status message.
-                */
-               ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
-               if (ret < 0) {
-                       ERR("get discarded events");
                        pthread_mutex_unlock(socket->lock);
-                       goto end;
+                       *discarded += consumer_discarded;
                }
-               pthread_mutex_unlock(socket->lock);
-               *discarded += consumer_discarded;
        }
+
        ret = 0;
        DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1636,35 +1644,38 @@ int consumer_get_lost_packets(uint64_t session_id,
 
        *lost = 0;
 
-       /* Send command for each consumer */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-               uint64_t consumer_lost = 0;
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_socket_send(socket, &msg, sizeof(msg));
-               if (ret < 0) {
-                       pthread_mutex_unlock(socket->lock);
-                       goto end;
-               }
+       /* Send command for each consumer. */
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               /*
-                * No need for a recv reply status because the answer to the
-                * command is the reply status message.
-                */
-               ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
-               if (ret < 0) {
-                       ERR("get lost packets");
+               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+                       uint64_t consumer_lost = 0;
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
+
+                       /*
+                        * No need for a recv reply status because the answer to the
+                        * command is the reply status message.
+                        */
+                       ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
+                       if (ret < 0) {
+                               ERR("get lost packets");
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
                        pthread_mutex_unlock(socket->lock);
-                       goto end;
+                       *lost += consumer_lost;
                }
-               pthread_mutex_unlock(socket->lock);
-               *lost += consumer_lost;
        }
+
        ret = 0;
        DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1811,8 +1822,8 @@ int consumer_create_trace_chunk(struct consumer_socket *socket,
        int ret;
        enum lttng_trace_chunk_status chunk_status;
        struct lttng_credentials chunk_credentials;
-       const struct lttng_directory_handle *chunk_directory_handle = NULL;
-       struct lttng_directory_handle *domain_handle = NULL;
+       const struct lttng_directory_handle *chunk_directory_handle = nullptr;
+       struct lttng_directory_handle *domain_handle = nullptr;
        int domain_dirfd;
        const char *chunk_name;
        bool chunk_name_overridden;
This page took 0.031806 seconds and 4 git commands to generate.