Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / bin / lttng-relayd / main.cpp
index 9793e277808975728d5f6965e12f8648de35a9ff..ea0ed2119b5237684b35695038658fa2ed4ea5b6 100644 (file)
@@ -46,6 +46,7 @@
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 #include <common/utils.hpp>
 
@@ -1320,12 +1321,13 @@ static void *relay_thread_dispatcher(void *data __attribute__((unused)))
                         * the data will be read at some point in time
                         * or wait to the end of the world :)
                         */
-                       ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT
-                                                                                              sizeof
-                                                                                              used
-                                                                                              on a
-                                                                                              pointer.
-                                                                                            */
+                       ret = lttng_write(
+                               relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT
+                                                                                    sizeof
+                                                                                    used
+                                                                                    on a
+                                                                                    pointer.
+                                                                                  */
                        if (ret < 0) {
                                PERROR("write connection pipe");
                                connection_put(new_conn);
@@ -1517,12 +1519,11 @@ static void publish_connection_local_streams(struct relay_connection *conn)
         * session lock.
         */
        pthread_mutex_lock(&session->lock);
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node)
        {
                stream_publish(stream);
        }
-       rcu_read_unlock();
 
        /*
         * Inform the viewer that there are new streams in the session.
@@ -1530,6 +1531,7 @@ static void publish_connection_local_streams(struct relay_connection *conn)
        if (session->viewer_attached) {
                uatomic_set(&session->new_streams, 1);
        }
+
        pthread_mutex_unlock(&session->lock);
 }
 
@@ -2224,21 +2226,25 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
         * to iterate over all streams to find the one associated with
         * the right session_id.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-               if (!stream_get(stream)) {
-                       continue;
-               }
-               if (stream->trace->session->id == msg.session_id) {
-                       pthread_mutex_lock(&stream->lock);
-                       stream->data_pending_check_done = false;
-                       pthread_mutex_unlock(&stream->lock);
-                       DBG("Set begin data pending flag to stream %" PRIu64,
-                           stream->stream_handle);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+                       if (!stream_get(stream)) {
+                               continue;
+                       }
+
+                       if (stream->trace->session->id == msg.session_id) {
+                               pthread_mutex_lock(&stream->lock);
+                               stream->data_pending_check_done = false;
+                               pthread_mutex_unlock(&stream->lock);
+                               DBG("Set begin data pending flag to stream %" PRIu64,
+                                   stream->stream_handle);
+                       }
+
+                       stream_put(stream);
                }
-               stream_put(stream);
        }
-       rcu_read_unlock();
 
        memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
@@ -2299,43 +2305,49 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
         * Iterate over all streams to see if the begin data pending
         * flag is set.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-               if (!stream_get(stream)) {
-                       continue;
-               }
-               if (stream->trace->session->id != msg.session_id) {
-                       stream_put(stream);
-                       continue;
-               }
-               pthread_mutex_lock(&stream->lock);
-               if (!stream->data_pending_check_done) {
-                       uint64_t stream_seq;
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-                       if (session_streams_have_index(conn->session)) {
-                               /*
-                                * Ensure that both the index and stream data have been
-                                * flushed up to the requested point.
-                                */
-                               stream_seq =
-                                       std::min(stream->prev_data_seq, stream->prev_index_seq);
-                       } else {
-                               stream_seq = stream->prev_data_seq;
+               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+                       if (!stream_get(stream)) {
+                               continue;
                        }
-                       if (!stream->closed ||
-                           !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
-                               is_data_inflight = 1;
-                               DBG("Data is still in flight for stream %" PRIu64,
-                                   stream->stream_handle);
-                               pthread_mutex_unlock(&stream->lock);
+
+                       if (stream->trace->session->id != msg.session_id) {
                                stream_put(stream);
-                               break;
+                               continue;
+                       }
+
+                       pthread_mutex_lock(&stream->lock);
+                       if (!stream->data_pending_check_done) {
+                               uint64_t stream_seq;
+
+                               if (session_streams_have_index(conn->session)) {
+                                       /*
+                                        * Ensure that both the index and stream data have been
+                                        * flushed up to the requested point.
+                                        */
+                                       stream_seq = std::min(stream->prev_data_seq,
+                                                             stream->prev_index_seq);
+                               } else {
+                                       stream_seq = stream->prev_data_seq;
+                               }
+
+                               if (!stream->closed ||
+                                   !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+                                       is_data_inflight = 1;
+                                       DBG("Data is still in flight for stream %" PRIu64,
+                                           stream->stream_handle);
+                                       pthread_mutex_unlock(&stream->lock);
+                                       stream_put(stream);
+                                       break;
+                               }
                        }
+
+                       pthread_mutex_unlock(&stream->lock);
+                       stream_put(stream);
                }
-               pthread_mutex_unlock(&stream->lock);
-               stream_put(stream);
        }
-       rcu_read_unlock();
 
        memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
@@ -4161,19 +4173,23 @@ restart:
 exit:
 error:
        /* Cleanup remaining connection object. */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
-               health_code_update();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               session_abort(destroy_conn->session);
+               cds_lfht_for_each_entry (
+                       relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+                       health_code_update();
 
-               /*
-                * No need to grab another ref, because we own
-                * destroy_conn.
-                */
-               relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
+                       session_abort(destroy_conn->session);
+
+                       /*
+                        * No need to grab another ref, because we own
+                        * destroy_conn.
+                        */
+                       relay_thread_close_connection(
+                               &events, destroy_conn->sock->fd, destroy_conn);
+               }
        }
-       rcu_read_unlock();
 
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 error_poll_create:
This page took 0.026747 seconds and 4 git commands to generate.