Replace explicit rcu_read_lock/unlock with lttng::urcu::read_lock_guard
[lttng-tools.git] / src / bin / lttng-relayd / main.cpp
index 9793e277808975728d5f6965e12f8648de35a9ff..e3eb9d72ef9c59a68ddc561abe9a01bf41559d90 100644 (file)
@@ -46,6 +46,7 @@
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 #include <common/utils.hpp>
 
@@ -1517,12 +1518,11 @@ static void publish_connection_local_streams(struct relay_connection *conn)
         * session lock.
         */
        pthread_mutex_lock(&session->lock);
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node)
        {
                stream_publish(stream);
        }
-       rcu_read_unlock();
 
        /*
         * Inform the viewer that there are new streams in the session.
@@ -2224,21 +2224,25 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
         * to iterate over all streams to find the one associated with
         * the right session_id.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-               if (!stream_get(stream)) {
-                       continue;
-               }
-               if (stream->trace->session->id == msg.session_id) {
-                       pthread_mutex_lock(&stream->lock);
-                       stream->data_pending_check_done = false;
-                       pthread_mutex_unlock(&stream->lock);
-                       DBG("Set begin data pending flag to stream %" PRIu64,
-                           stream->stream_handle);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+                       if (!stream_get(stream)) {
+                               continue;
+                       }
+
+                       if (stream->trace->session->id == msg.session_id) {
+                               pthread_mutex_lock(&stream->lock);
+                               stream->data_pending_check_done = false;
+                               pthread_mutex_unlock(&stream->lock);
+                               DBG("Set begin data pending flag to stream %" PRIu64,
+                                   stream->stream_handle);
+                       }
+
+                       stream_put(stream);
                }
-               stream_put(stream);
        }
-       rcu_read_unlock();
 
        memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
@@ -2299,43 +2303,49 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
         * Iterate over all streams to see if the begin data pending
         * flag is set.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-               if (!stream_get(stream)) {
-                       continue;
-               }
-               if (stream->trace->session->id != msg.session_id) {
-                       stream_put(stream);
-                       continue;
-               }
-               pthread_mutex_lock(&stream->lock);
-               if (!stream->data_pending_check_done) {
-                       uint64_t stream_seq;
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-                       if (session_streams_have_index(conn->session)) {
-                               /*
-                                * Ensure that both the index and stream data have been
-                                * flushed up to the requested point.
-                                */
-                               stream_seq =
-                                       std::min(stream->prev_data_seq, stream->prev_index_seq);
-                       } else {
-                               stream_seq = stream->prev_data_seq;
+               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+                       if (!stream_get(stream)) {
+                               continue;
                        }
-                       if (!stream->closed ||
-                           !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
-                               is_data_inflight = 1;
-                               DBG("Data is still in flight for stream %" PRIu64,
-                                   stream->stream_handle);
-                               pthread_mutex_unlock(&stream->lock);
+
+                       if (stream->trace->session->id != msg.session_id) {
                                stream_put(stream);
-                               break;
+                               continue;
+                       }
+
+                       pthread_mutex_lock(&stream->lock);
+                       if (!stream->data_pending_check_done) {
+                               uint64_t stream_seq;
+
+                               if (session_streams_have_index(conn->session)) {
+                                       /*
+                                        * Ensure that both the index and stream data have been
+                                        * flushed up to the requested point.
+                                        */
+                                       stream_seq = std::min(stream->prev_data_seq,
+                                                             stream->prev_index_seq);
+                               } else {
+                                       stream_seq = stream->prev_data_seq;
+                               }
+
+                               if (!stream->closed ||
+                                   !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+                                       is_data_inflight = 1;
+                                       DBG("Data is still in flight for stream %" PRIu64,
+                                           stream->stream_handle);
+                                       pthread_mutex_unlock(&stream->lock);
+                                       stream_put(stream);
+                                       break;
+                               }
                        }
+
+                       pthread_mutex_unlock(&stream->lock);
+                       stream_put(stream);
                }
-               pthread_mutex_unlock(&stream->lock);
-               stream_put(stream);
        }
-       rcu_read_unlock();
 
        memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
@@ -4161,19 +4171,23 @@ restart:
 exit:
 error:
        /* Cleanup remaining connection object. */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
-               health_code_update();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               session_abort(destroy_conn->session);
+               cds_lfht_for_each_entry (
+                       relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+                       health_code_update();
 
-               /*
-                * No need to grab another ref, because we own
-                * destroy_conn.
-                */
-               relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
+                       session_abort(destroy_conn->session);
+
+                       /*
+                        * No need to grab another ref, because we own
+                        * destroy_conn.
+                        */
+                       relay_thread_close_connection(
+                               &events, destroy_conn->sock->fd, destroy_conn);
+               }
        }
-       rcu_read_unlock();
 
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 error_poll_create:
This page took 0.025078 seconds and 4 git commands to generate.