Replace explicit rcu_read_lock/unlock with lttng::urcu::read_lock_guard
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.cpp
index e8434702532273e5fd5e06093d15d7417eed8ccb..13c18325d75d363e5da76674734b4e3aedcd0c07 100644 (file)
@@ -17,6 +17,7 @@
 #include <common/defaults.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 
 #include <inttypes.h>
@@ -291,18 +292,17 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
 
        /* Destroy any relayd connection */
        if (consumer->type == CONSUMER_DST_NET) {
-               rcu_read_lock();
+               lttng::urcu::read_lock_guard read_lock;
+
                cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-                       int ret;
+                       /* Send destroy relayd command. */
+                       const int ret = consumer_send_destroy_relayd(socket, consumer);
 
-                       /* Send destroy relayd command */
-                       ret = consumer_send_destroy_relayd(socket, consumer);
                        if (ret < 0) {
                                DBG("Unable to send destroy relayd command to consumer");
                                /* Continue since we MUST delete everything at this point. */
                        }
                }
-               rcu_read_unlock();
        }
 }
 
@@ -319,6 +319,8 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o
 
        LTTNG_ASSERT(data);
 
+       lttng::urcu::read_lock_guard read_lock;
+
        if (output == nullptr || data->cmd_sock < 0) {
                /*
                 * Not an error. Possible there is simply not spawned consumer or it's
@@ -327,9 +329,7 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o
                goto error;
        }
 
-       rcu_read_lock();
        socket = consumer_find_socket(data->cmd_sock, output);
-       rcu_read_unlock();
        if (socket == nullptr) {
                socket = consumer_allocate_socket(&data->cmd_sock);
                if (socket == nullptr) {
@@ -339,9 +339,7 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o
 
                socket->registered = 0;
                socket->lock = &data->lock;
-               rcu_read_lock();
                consumer_add_socket(socket, output);
-               rcu_read_unlock();
        }
 
        socket->type = data->type;
@@ -544,12 +542,14 @@ void consumer_destroy_output_sockets(struct consumer_output *obj)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
-               consumer_del_socket(socket, obj);
-               consumer_destroy_socket(socket);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
+                       consumer_del_socket(socket, obj);
+                       consumer_destroy_socket(socket);
+               }
        }
-       rcu_read_unlock();
 }
 
 /*
@@ -636,32 +636,33 @@ int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *s
        LTTNG_ASSERT(dst);
        LTTNG_ASSERT(src);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
-               /* Ignore socket that are already there. */
-               copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
-               if (copy_sock) {
-                       continue;
-               }
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               /* Create new socket object. */
-               copy_sock = consumer_allocate_socket(socket->fd_ptr);
-               if (copy_sock == nullptr) {
-                       rcu_read_unlock();
-                       ret = -ENOMEM;
-                       goto error;
-               }
+               cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
+                       /* Ignore socket that are already there. */
+                       copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
+                       if (copy_sock) {
+                               continue;
+                       }
 
-               copy_sock->registered = socket->registered;
-               /*
-                * This is valid because this lock is shared accross all consumer
-                * object being the global lock of the consumer data structure of the
-                * session daemon.
-                */
-               copy_sock->lock = socket->lock;
-               consumer_add_socket(copy_sock, dst);
+                       /* Create new socket object. */
+                       copy_sock = consumer_allocate_socket(socket->fd_ptr);
+                       if (copy_sock == nullptr) {
+                               ret = -ENOMEM;
+                               goto error;
+                       }
+
+                       copy_sock->registered = socket->registered;
+                       /*
+                        * This is valid because this lock is shared accross all consumer
+                        * object being the global lock of the consumer data structure of the
+                        * session daemon.
+                        */
+                       copy_sock->lock = socket->lock;
+                       consumer_add_socket(copy_sock, dst);
+               }
        }
-       rcu_read_unlock();
 
 error:
        return ret;
@@ -1268,33 +1269,35 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum
        msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
        msg.u.data_pending.session_id = session_id;
 
-       /* Send command for each consumer */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_socket_send(socket, &msg, sizeof(msg));
-               if (ret < 0) {
-                       pthread_mutex_unlock(socket->lock);
-                       goto error_unlock;
-               }
+       {
+               /* Send command for each consumer. */
+               lttng::urcu::read_lock_guard read_lock;
 
-               /*
-                * No need for a recv reply status because the answer to the command is
-                * the reply status message.
-                */
+               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto error_unlock;
+                       }
+
+                       /*
+                        * No need for a recv reply status because the answer to the command is
+                        * the reply status message.
+                        */
+                       ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto error_unlock;
+                       }
 
-               ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
-               if (ret < 0) {
                        pthread_mutex_unlock(socket->lock);
-                       goto error_unlock;
-               }
-               pthread_mutex_unlock(socket->lock);
 
-               if (ret_code == 1) {
-                       break;
+                       if (ret_code == 1) {
+                               break;
+                       }
                }
        }
-       rcu_read_unlock();
 
        DBG("Consumer data is %s pending for session id %" PRIu64,
            ret_code == 1 ? "" : "NOT",
@@ -1302,7 +1305,6 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum
        return ret_code;
 
 error_unlock:
-       rcu_read_unlock();
        return -1;
 }
 
@@ -1580,35 +1582,41 @@ int consumer_get_discarded_events(uint64_t session_id,
 
        *discarded = 0;
 
-       /* Send command for each consumer */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-               uint64_t consumer_discarded = 0;
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_socket_send(socket, &msg, sizeof(msg));
-               if (ret < 0) {
-                       pthread_mutex_unlock(socket->lock);
-                       goto end;
-               }
+       /* Send command for each consumer. */
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+                       uint64_t consumer_discarded = 0;
+
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
+
+                       /*
+                        * No need for a recv reply status because the answer to the
+                        * command is the reply status message.
+                        */
+                       ret = consumer_socket_recv(
+                               socket, &consumer_discarded, sizeof(consumer_discarded));
+                       if (ret < 0) {
+                               ERR("get discarded events");
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
 
-               /*
-                * No need for a recv reply status because the answer to the
-                * command is the reply status message.
-                */
-               ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
-               if (ret < 0) {
-                       ERR("get discarded events");
                        pthread_mutex_unlock(socket->lock);
-                       goto end;
+                       *discarded += consumer_discarded;
                }
-               pthread_mutex_unlock(socket->lock);
-               *discarded += consumer_discarded;
        }
+
        ret = 0;
        DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1636,35 +1644,38 @@ int consumer_get_lost_packets(uint64_t session_id,
 
        *lost = 0;
 
-       /* Send command for each consumer */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-               uint64_t consumer_lost = 0;
-               pthread_mutex_lock(socket->lock);
-               ret = consumer_socket_send(socket, &msg, sizeof(msg));
-               if (ret < 0) {
-                       pthread_mutex_unlock(socket->lock);
-                       goto end;
-               }
+       /* Send command for each consumer. */
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               /*
-                * No need for a recv reply status because the answer to the
-                * command is the reply status message.
-                */
-               ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
-               if (ret < 0) {
-                       ERR("get lost packets");
+               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+                       uint64_t consumer_lost = 0;
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
+                       if (ret < 0) {
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
+
+                       /*
+                        * No need for a recv reply status because the answer to the
+                        * command is the reply status message.
+                        */
+                       ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
+                       if (ret < 0) {
+                               ERR("get lost packets");
+                               pthread_mutex_unlock(socket->lock);
+                               goto end;
+                       }
                        pthread_mutex_unlock(socket->lock);
-                       goto end;
+                       *lost += consumer_lost;
                }
-               pthread_mutex_unlock(socket->lock);
-               *lost += consumer_lost;
        }
+
        ret = 0;
        DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
 
 end:
-       rcu_read_unlock();
        return ret;
 }
 
This page took 0.026553 seconds and 4 git commands to generate.