Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index 869c88d98b645d26783408efca8fff0cc07ae53a..699778d0e4ef2c144e4bcaba390387750b2f7606 100644 (file)
@@ -33,6 +33,7 @@
 #include <common/sessiond-comm/inet.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 #include <common/utils.hpp>
 
@@ -176,7 +177,7 @@ lttng_viewer_get_packet_return_code_str(enum lttng_viewer_get_packet_return_code
 /*
  * Cleanup the daemon
  */
-static void cleanup_relayd_live(void)
+static void cleanup_relayd_live()
 {
        DBG("Cleaning up");
 
@@ -237,28 +238,33 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
 static int check_new_streams(struct relay_connection *conn)
 {
        struct relay_session *session;
-       unsigned long current_val;
        int ret = 0;
 
        if (!conn->viewer_session) {
                goto end;
        }
-       rcu_read_lock();
-       cds_list_for_each_entry_rcu(
-               session, &conn->viewer_session->session_list, viewer_session_node)
+
        {
-               if (!session_get(session)) {
-                       continue;
-               }
-               current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-               ret = current_val;
-               session_put(session);
-               if (ret == 1) {
-                       goto end;
+               lttng::urcu::read_lock_guard read_lock;
+               cds_list_for_each_entry_rcu(
+                       session, &conn->viewer_session->session_list, viewer_session_node)
+               {
+                       if (!session_get(session)) {
+                               continue;
+                       }
+
+                       ret = uatomic_read(&session->new_streams);
+                       session_put(session);
+                       if (ret == 1) {
+                               goto end;
+                       }
                }
        }
+
 end:
-       rcu_read_unlock();
+       DBG("Viewer connection has%s new streams: socket_fd = %d",
+           ret == 0 ? " no" : "",
+           conn->sock->fd);
        return ret;
 }
 
@@ -275,63 +281,65 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int
        struct lttng_ht_iter iter;
        struct relay_viewer_stream *vstream;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry (viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {
-               struct ctf_trace *ctf_trace;
-               struct lttng_viewer_stream send_stream = {};
+               cds_lfht_for_each_entry (
+                       viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {
+                       struct ctf_trace *ctf_trace;
+                       struct lttng_viewer_stream send_stream = {};
 
-               health_code_update();
+                       health_code_update();
 
-               if (!viewer_stream_get(vstream)) {
-                       continue;
-               }
+                       if (!viewer_stream_get(vstream)) {
+                               continue;
+                       }
 
-               pthread_mutex_lock(&vstream->stream->lock);
-               /* Ignore if not the same session. */
-               if (vstream->stream->trace->session->id != session_id ||
-                   (!ignore_sent_flag && vstream->sent_flag)) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
-                       viewer_stream_put(vstream);
-                       continue;
-               }
+                       pthread_mutex_lock(&vstream->stream->lock);
+                       /* Ignore if not the same session. */
+                       if (vstream->stream->trace->session->id != session_id ||
+                           (!ignore_sent_flag && vstream->sent_flag)) {
+                               pthread_mutex_unlock(&vstream->stream->lock);
+                               viewer_stream_put(vstream);
+                               continue;
+                       }
 
-               ctf_trace = vstream->stream->trace;
-               send_stream.id = htobe64(vstream->stream->stream_handle);
-               send_stream.ctf_trace_id = htobe64(ctf_trace->id);
-               send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
-               if (lttng_strncpy(send_stream.path_name,
-                                 vstream->path_name,
-                                 sizeof(send_stream.path_name))) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
-                       viewer_stream_put(vstream);
-                       ret = -1; /* Error. */
-                       goto end_unlock;
-               }
-               if (lttng_strncpy(send_stream.channel_name,
-                                 vstream->channel_name,
-                                 sizeof(send_stream.channel_name))) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
-                       viewer_stream_put(vstream);
-                       ret = -1; /* Error. */
-                       goto end_unlock;
-               }
+                       ctf_trace = vstream->stream->trace;
+                       send_stream.id = htobe64(vstream->stream->stream_handle);
+                       send_stream.ctf_trace_id = htobe64(ctf_trace->id);
+                       send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
+                       if (lttng_strncpy(send_stream.path_name,
+                                         vstream->path_name,
+                                         sizeof(send_stream.path_name))) {
+                               pthread_mutex_unlock(&vstream->stream->lock);
+                               viewer_stream_put(vstream);
+                               ret = -1; /* Error. */
+                               goto end;
+                       }
+                       if (lttng_strncpy(send_stream.channel_name,
+                                         vstream->channel_name,
+                                         sizeof(send_stream.channel_name))) {
+                               pthread_mutex_unlock(&vstream->stream->lock);
+                               viewer_stream_put(vstream);
+                               ret = -1; /* Error. */
+                               goto end;
+                       }
 
-               DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
-               vstream->sent_flag = 1;
-               pthread_mutex_unlock(&vstream->stream->lock);
+                       DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
+                       vstream->sent_flag = true;
+                       pthread_mutex_unlock(&vstream->stream->lock);
 
-               ret = send_response(sock, &send_stream, sizeof(send_stream));
-               viewer_stream_put(vstream);
-               if (ret < 0) {
-                       goto end_unlock;
+                       ret = send_response(sock, &send_stream, sizeof(send_stream));
+                       viewer_stream_put(vstream);
+                       if (ret < 0) {
+                               goto end;
+                       }
                }
        }
 
        ret = 0;
 
-end_unlock:
-       rcu_read_unlock();
+end:
        return ret;
 }
 
@@ -357,7 +365,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
        int ret;
        struct lttng_ht_iter iter;
        struct ctf_trace *ctf_trace;
-       struct relay_stream *relay_stream = NULL;
+       struct relay_stream *relay_stream = nullptr;
 
        LTTNG_ASSERT(relay_session);
        ASSERT_LOCKED(relay_session->lock);
@@ -370,191 +378,201 @@ static int make_viewer_streams(struct relay_session *relay_session,
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (
-               relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) {
-               bool trace_has_metadata_stream = false;
-
-               health_code_update();
-
-               if (!ctf_trace_get(ctf_trace)) {
-                       continue;
-               }
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               /*
-                * Iterate over all the streams of the trace to see if we have a
-                * metadata stream.
-                */
-               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
-               {
-                       bool is_metadata_stream;
+               cds_lfht_for_each_entry (
+                       relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) {
+                       bool trace_has_metadata_stream = false;
 
-                       pthread_mutex_lock(&relay_stream->lock);
-                       is_metadata_stream = relay_stream->is_metadata;
-                       pthread_mutex_unlock(&relay_stream->lock);
+                       health_code_update();
 
-                       if (is_metadata_stream) {
-                               trace_has_metadata_stream = true;
-                               break;
+                       if (!ctf_trace_get(ctf_trace)) {
+                               continue;
                        }
-               }
-
-               relay_stream = NULL;
-
-               /*
-                * If there is no metadata stream in this trace at the moment
-                * and we never sent one to the viewer, skip the trace. We
-                * accept that the viewer will not see this trace at all.
-                */
-               if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) {
-                       ctf_trace_put(ctf_trace);
-                       continue;
-               }
 
-               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
-               {
-                       struct relay_viewer_stream *viewer_stream;
-
-                       if (!stream_get(relay_stream)) {
-                               continue;
+                       /*
+                        * Iterate over all the streams of the trace to see if we have a
+                        * metadata stream.
+                        */
+                       cds_list_for_each_entry_rcu(
+                               relay_stream, &ctf_trace->stream_list, stream_node)
+                       {
+                               bool is_metadata_stream;
+
+                               pthread_mutex_lock(&relay_stream->lock);
+                               is_metadata_stream = relay_stream->is_metadata;
+                               pthread_mutex_unlock(&relay_stream->lock);
+
+                               if (is_metadata_stream) {
+                                       trace_has_metadata_stream = true;
+                                       break;
+                               }
                        }
 
-                       pthread_mutex_lock(&relay_stream->lock);
+                       relay_stream = nullptr;
+
                        /*
-                        * stream published is protected by the session lock.
+                        * If there is no metadata stream in this trace at the moment
+                        * and we never sent one to the viewer, skip the trace. We
+                        * accept that the viewer will not see this trace at all.
                         */
-                       if (!relay_stream->published) {
-                               goto next;
+                       if (!trace_has_metadata_stream &&
+                           !ctf_trace->metadata_stream_sent_to_viewer) {
+                               ctf_trace_put(ctf_trace);
+                               continue;
                        }
-                       viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle);
-                       if (!viewer_stream) {
-                               struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
 
-                               /*
-                                * Save that we sent the metadata stream to the
-                                * viewer. So that we know what trace the viewer
-                                * is aware of.
-                                */
-                               if (relay_stream->is_metadata) {
-                                       ctf_trace->metadata_stream_sent_to_viewer = true;
+                       cds_list_for_each_entry_rcu(
+                               relay_stream, &ctf_trace->stream_list, stream_node)
+                       {
+                               struct relay_viewer_stream *viewer_stream;
+
+                               if (!stream_get(relay_stream)) {
+                                       continue;
                                }
 
+                               pthread_mutex_lock(&relay_stream->lock);
                                /*
-                                * If a rotation is ongoing, use a copy of the
-                                * relay stream's chunk to ensure the stream
-                                * files exist.
-                                *
-                                * Otherwise, the viewer session's current trace
-                                * chunk can be used safely.
+                                * stream published is protected by the session lock.
                                 */
-                               if ((relay_stream->ongoing_rotation.is_set ||
-                                    session_has_ongoing_rotation(relay_session)) &&
-                                   relay_stream->trace_chunk) {
-                                       viewer_stream_trace_chunk =
-                                               lttng_trace_chunk_copy(relay_stream->trace_chunk);
-                                       if (!viewer_stream_trace_chunk) {
-                                               ret = -1;
-                                               ctf_trace_put(ctf_trace);
-                                               goto error_unlock;
+                               if (!relay_stream->published) {
+                                       goto next;
+                               }
+                               viewer_stream =
+                                       viewer_stream_get_by_id(relay_stream->stream_handle);
+                               if (!viewer_stream) {
+                                       struct lttng_trace_chunk *viewer_stream_trace_chunk =
+                                               nullptr;
+
+                                       /*
+                                        * Save that we sent the metadata stream to the
+                                        * viewer. So that we know what trace the viewer
+                                        * is aware of.
+                                        */
+                                       if (relay_stream->is_metadata) {
+                                               ctf_trace->metadata_stream_sent_to_viewer = true;
                                        }
-                               } else {
+
                                        /*
-                                        * Transition the viewer session into the newest trace chunk
-                                        * available.
+                                        * If a rotation is ongoing, use a copy of the
+                                        * relay stream's chunk to ensure the stream
+                                        * files exist.
+                                        *
+                                        * Otherwise, the viewer session's current trace
+                                        * chunk can be used safely.
                                         */
-                                       if (!lttng_trace_chunk_ids_equal(
-                                                   viewer_session->current_trace_chunk,
-                                                   relay_stream->trace_chunk)) {
-                                               ret = viewer_session_set_trace_chunk_copy(
-                                                       viewer_session, relay_stream->trace_chunk);
-                                               if (ret) {
+                                       if ((relay_stream->ongoing_rotation.is_set ||
+                                            session_has_ongoing_rotation(relay_session)) &&
+                                           relay_stream->trace_chunk) {
+                                               viewer_stream_trace_chunk = lttng_trace_chunk_copy(
+                                                       relay_stream->trace_chunk);
+                                               if (!viewer_stream_trace_chunk) {
                                                        ret = -1;
                                                        ctf_trace_put(ctf_trace);
                                                        goto error_unlock;
                                                }
-                                       }
-
-                                       if (relay_stream->trace_chunk) {
+                                       } else {
                                                /*
-                                                * If the corresponding relay
-                                                * stream's trace chunk is set,
-                                                * the viewer stream will be
-                                                * created under it.
-                                                *
-                                                * Note that a relay stream can
-                                                * have a NULL output trace
-                                                * chunk (for instance, after a
-                                                * clear against a stopped
-                                                * session).
+                                                * Transition the viewer session into the newest
+                                                * trace chunk available.
                                                 */
-                                               const bool reference_acquired =
-                                                       lttng_trace_chunk_get(
-                                                               viewer_session->current_trace_chunk);
+                                               if (!lttng_trace_chunk_ids_equal(
+                                                           viewer_session->current_trace_chunk,
+                                                           relay_stream->trace_chunk)) {
+                                                       ret = viewer_session_set_trace_chunk_copy(
+                                                               viewer_session,
+                                                               relay_stream->trace_chunk);
+                                                       if (ret) {
+                                                               ret = -1;
+                                                               ctf_trace_put(ctf_trace);
+                                                               goto error_unlock;
+                                                       }
+                                               }
 
-                                               LTTNG_ASSERT(reference_acquired);
-                                               viewer_stream_trace_chunk =
-                                                       viewer_session->current_trace_chunk;
+                                               if (relay_stream->trace_chunk) {
+                                                       /*
+                                                        * If the corresponding relay
+                                                        * stream's trace chunk is set,
+                                                        * the viewer stream will be
+                                                        * created under it.
+                                                        *
+                                                        * Note that a relay stream can
+                                                        * have a NULL output trace
+                                                        * chunk (for instance, after a
+                                                        * clear against a stopped
+                                                        * session).
+                                                        */
+                                                       const bool reference_acquired =
+                                                               lttng_trace_chunk_get(
+                                                                       viewer_session
+                                                                               ->current_trace_chunk);
+
+                                                       LTTNG_ASSERT(reference_acquired);
+                                                       viewer_stream_trace_chunk =
+                                                               viewer_session->current_trace_chunk;
+                                               }
                                        }
-                               }
 
-                               viewer_stream = viewer_stream_create(
-                                       relay_stream, viewer_stream_trace_chunk, seek_t);
-                               lttng_trace_chunk_put(viewer_stream_trace_chunk);
-                               viewer_stream_trace_chunk = NULL;
-                               if (!viewer_stream) {
-                                       ret = -1;
-                                       ctf_trace_put(ctf_trace);
-                                       goto error_unlock;
-                               }
+                                       viewer_stream = viewer_stream_create(
+                                               relay_stream, viewer_stream_trace_chunk, seek_t);
+                                       lttng_trace_chunk_put(viewer_stream_trace_chunk);
+                                       viewer_stream_trace_chunk = nullptr;
+                                       if (!viewer_stream) {
+                                               ret = -1;
+                                               ctf_trace_put(ctf_trace);
+                                               goto error_unlock;
+                                       }
 
-                               if (nb_created) {
-                                       /* Update number of created stream counter. */
-                                       (*nb_created)++;
-                               }
-                               /*
-                                * Ensure a self-reference is preserved even
-                                * after we have put our local reference.
-                                */
-                               if (!viewer_stream_get(viewer_stream)) {
-                                       ERR("Unable to get self-reference on viewer stream, logic error.");
-                                       abort();
-                               }
-                       } else {
-                               if (!viewer_stream->sent_flag && nb_unsent) {
-                                       /* Update number of unsent stream counter. */
-                                       (*nb_unsent)++;
-                               }
-                       }
-                       /* Update number of total stream counter. */
-                       if (nb_total) {
-                               if (relay_stream->is_metadata) {
-                                       if (!relay_stream->closed ||
-                                           relay_stream->metadata_received >
-                                                   viewer_stream->metadata_sent) {
-                                               (*nb_total)++;
+                                       if (nb_created) {
+                                               /* Update number of created stream counter. */
+                                               (*nb_created)++;
+                                       }
+                                       /*
+                                        * Ensure a self-reference is preserved even
+                                        * after we have put our local reference.
+                                        */
+                                       if (!viewer_stream_get(viewer_stream)) {
+                                               ERR("Unable to get self-reference on viewer stream, logic error.");
+                                               abort();
                                        }
                                } else {
-                                       if (!relay_stream->closed ||
-                                           !(((int64_t) (relay_stream->prev_data_seq -
-                                                         relay_stream->last_net_seq_num)) >= 0)) {
-                                               (*nb_total)++;
+                                       if (!viewer_stream->sent_flag && nb_unsent) {
+                                               /* Update number of unsent stream counter. */
+                                               (*nb_unsent)++;
                                        }
                                }
+                               /* Update number of total stream counter. */
+                               if (nb_total) {
+                                       if (relay_stream->is_metadata) {
+                                               if (!relay_stream->closed ||
+                                                   relay_stream->metadata_received >
+                                                           viewer_stream->metadata_sent) {
+                                                       (*nb_total)++;
+                                               }
+                                       } else {
+                                               if (!relay_stream->closed ||
+                                                   !(((int64_t) (relay_stream->prev_data_seq -
+                                                                 relay_stream->last_net_seq_num)) >=
+                                                     0)) {
+                                                       (*nb_total)++;
+                                               }
+                                       }
+                               }
+                               /* Put local reference. */
+                               viewer_stream_put(viewer_stream);
+                       next:
+                               pthread_mutex_unlock(&relay_stream->lock);
+                               stream_put(relay_stream);
                        }
-                       /* Put local reference. */
-                       viewer_stream_put(viewer_stream);
-               next:
-                       pthread_mutex_unlock(&relay_stream->lock);
-                       stream_put(relay_stream);
+                       relay_stream = nullptr;
+                       ctf_trace_put(ctf_trace);
                }
-               relay_stream = NULL;
-               ctf_trace_put(ctf_trace);
        }
 
        ret = 0;
 
 error_unlock:
-       rcu_read_unlock();
 
        if (relay_stream) {
                pthread_mutex_unlock(&relay_stream->lock);
@@ -564,7 +582,7 @@ error_unlock:
        return ret;
 }
 
-int relayd_live_stop(void)
+int relayd_live_stop()
 {
        /* Stop dispatch thread */
        CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
@@ -614,8 +632,8 @@ end:
 static struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, const char *name)
 {
        int out_fd, ret;
-       struct lttcomm_sock *socks[2] = { listening_sock, NULL };
-       struct lttcomm_sock *new_sock = NULL;
+       struct lttcomm_sock *socks[2] = { listening_sock, nullptr };
+       struct lttcomm_sock *new_sock = nullptr;
 
        ret = fd_tracker_open_unsuspendable_fd(
                the_fd_tracker, &out_fd, (const char **) &name, 1, accept_sock, &socks);
@@ -634,12 +652,12 @@ end:
 static struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
 {
        int ret, sock_fd;
-       struct lttcomm_sock *sock = NULL;
+       struct lttcomm_sock *sock = nullptr;
        char uri_str[LTTNG_PATH_MAX];
-       char *formated_name = NULL;
+       char *formated_name = nullptr;
 
        sock = lttcomm_alloc_sock_from_uri(uri);
-       if (sock == NULL) {
+       if (sock == nullptr) {
                ERR("Allocating socket");
                goto error;
        }
@@ -653,14 +671,14 @@ static struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
        if (ret >= 0) {
                ret = asprintf(&formated_name, "%s socket @ %s", name, uri_str);
                if (ret < 0) {
-                       formated_name = NULL;
+                       formated_name = nullptr;
                }
        }
 
        ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker,
                                               &sock_fd,
                                               (const char **) (formated_name ? &formated_name :
-                                                                               NULL),
+                                                                               nullptr),
                                               1,
                                               create_sock,
                                               sock);
@@ -689,7 +707,7 @@ error:
                lttcomm_destroy_sock(sock);
        }
        free(formated_name);
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -732,7 +750,7 @@ static void *thread_listener(void *data __attribute__((unused)))
                goto error_testpoint;
        }
 
-       while (1) {
+       while (true) {
                health_code_update();
 
                DBG("Listener accepting live viewers connections");
@@ -799,7 +817,7 @@ static void *thread_listener(void *data __attribute__((unused)))
                                        goto error;
                                }
                                /* Ownership assumed by the connection. */
-                               newsock = NULL;
+                               newsock = nullptr;
 
                                /* Enqueue request for the dispatcher thread. */
                                cds_wfcq_head_ptr_t head;
@@ -850,7 +868,7 @@ error_sock_control:
        if (lttng_relay_stop_threads()) {
                ERR("Error stopping threads");
        }
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -861,7 +879,7 @@ static void *thread_dispatcher(void *data __attribute__((unused)))
        int err = -1;
        ssize_t ret;
        struct cds_wfcq_node *node;
-       struct relay_connection *conn = NULL;
+       struct relay_connection *conn = nullptr;
 
        DBG("[thread] Live viewer relay dispatcher started");
 
@@ -889,7 +907,7 @@ static void *thread_dispatcher(void *data __attribute__((unused)))
                        /* Dequeue commands */
                        node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
                                                         &viewer_conn_queue.tail);
-                       if (node == NULL) {
+                       if (node == nullptr) {
                                DBG("Woken up but nothing in the live-viewer "
                                    "relay command queue");
                                /* Continue thread execution */
@@ -904,13 +922,15 @@ static void *thread_dispatcher(void *data __attribute__((unused)))
                         * the data will be read at some point in time
                         * or wait to the end of the world :)
                         */
-                       ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
+                       ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn)); /* NOLINT sizeof
+                                                                                     used on a
+                                                                                     pointer. */
                        if (ret < 0) {
                                PERROR("write conn pipe");
                                connection_put(conn);
                                goto error;
                        }
-               } while (node != NULL);
+               } while (node != nullptr);
 
                /* Futex wait on queue. Blocking call on futex() */
                health_poll_entry();
@@ -932,7 +952,7 @@ error_testpoint:
        if (lttng_relay_stop_threads()) {
                ERR("Error stopping threads");
        }
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -945,7 +965,7 @@ static int viewer_connect(struct relay_connection *conn)
        int ret;
        struct lttng_viewer_connect reply, msg;
 
-       conn->version_check_done = 1;
+       conn->version_check_done = true;
 
        health_code_update();
 
@@ -1031,7 +1051,7 @@ static int viewer_list_sessions(struct relay_connection *conn)
        struct lttng_viewer_list_sessions session_list;
        struct lttng_ht_iter iter;
        struct relay_session *session;
-       struct lttng_viewer_session *send_session_buf = NULL;
+       struct lttng_viewer_session *send_session_buf = nullptr;
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
 
@@ -1040,61 +1060,65 @@ static int viewer_list_sessions(struct relay_connection *conn)
                return -1;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) {
-               struct lttng_viewer_session *send_session;
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               health_code_update();
+               cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) {
+                       struct lttng_viewer_session *send_session;
 
-               pthread_mutex_lock(&session->lock);
-               if (session->connection_closed) {
-                       /* Skip closed session */
-                       goto next_session;
-               }
+                       health_code_update();
+
+                       pthread_mutex_lock(&session->lock);
+                       if (session->connection_closed) {
+                               /* Skip closed session */
+                               goto next_session;
+                       }
 
-               if (count >= buf_count) {
-                       struct lttng_viewer_session *newbuf;
-                       uint32_t new_buf_count = buf_count << 1;
+                       if (count >= buf_count) {
+                               struct lttng_viewer_session *newbuf;
+                               uint32_t new_buf_count = buf_count << 1;
 
-                       newbuf = (lttng_viewer_session *) realloc(
-                               send_session_buf, new_buf_count * sizeof(*send_session_buf));
-                       if (!newbuf) {
+                               newbuf = (lttng_viewer_session *) realloc(
+                                       send_session_buf,
+                                       new_buf_count * sizeof(*send_session_buf));
+                               if (!newbuf) {
+                                       ret = -1;
+                                       goto break_loop;
+                               }
+                               send_session_buf = newbuf;
+                               buf_count = new_buf_count;
+                       }
+                       send_session = &send_session_buf[count];
+                       if (lttng_strncpy(send_session->session_name,
+                                         session->session_name,
+                                         sizeof(send_session->session_name))) {
                                ret = -1;
                                goto break_loop;
                        }
-                       send_session_buf = newbuf;
-                       buf_count = new_buf_count;
-               }
-               send_session = &send_session_buf[count];
-               if (lttng_strncpy(send_session->session_name,
-                                 session->session_name,
-                                 sizeof(send_session->session_name))) {
-                       ret = -1;
-                       goto break_loop;
-               }
-               if (lttng_strncpy(send_session->hostname,
-                                 session->hostname,
-                                 sizeof(send_session->hostname))) {
-                       ret = -1;
-                       goto break_loop;
-               }
-               send_session->id = htobe64(session->id);
-               send_session->live_timer = htobe32(session->live_timer);
-               if (session->viewer_attached) {
-                       send_session->clients = htobe32(1);
-               } else {
-                       send_session->clients = htobe32(0);
+                       if (lttng_strncpy(send_session->hostname,
+                                         session->hostname,
+                                         sizeof(send_session->hostname))) {
+                               ret = -1;
+                               goto break_loop;
+                       }
+                       send_session->id = htobe64(session->id);
+                       send_session->live_timer = htobe32(session->live_timer);
+                       if (session->viewer_attached) {
+                               send_session->clients = htobe32(1);
+                       } else {
+                               send_session->clients = htobe32(0);
+                       }
+                       send_session->streams = htobe32(session->stream_count);
+                       count++;
+               next_session:
+                       pthread_mutex_unlock(&session->lock);
+                       continue;
+               break_loop:
+                       pthread_mutex_unlock(&session->lock);
+                       break;
                }
-               send_session->streams = htobe32(session->stream_count);
-               count++;
-       next_session:
-               pthread_mutex_unlock(&session->lock);
-               continue;
-       break_loop:
-               pthread_mutex_unlock(&session->lock);
-               break;
        }
-       rcu_read_unlock();
+
        if (ret < 0) {
                goto end_free;
        }
@@ -1131,7 +1155,7 @@ static int viewer_get_new_streams(struct relay_connection *conn)
        uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
        struct lttng_viewer_new_streams_request request;
        struct lttng_viewer_new_streams_response response;
-       struct relay_session *session = NULL;
+       struct relay_session *session = nullptr;
        uint64_t session_id;
        bool closed = false;
 
@@ -1197,6 +1221,8 @@ static int viewer_get_new_streams(struct relay_connection *conn)
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply_unlock;
        }
+
+       uatomic_set(&session->new_streams, 0);
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
@@ -1263,7 +1289,7 @@ static int viewer_attach_session(struct relay_connection *conn)
        enum lttng_viewer_seek seek_type;
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
-       struct relay_session *session = NULL;
+       struct relay_session *session = nullptr;
        enum lttng_viewer_attach_return_code viewer_attach_status;
        bool closed = false;
        uint64_t session_id;
@@ -1346,13 +1372,13 @@ static int viewer_attach_session(struct relay_connection *conn)
        }
 
        ret = make_viewer_streams(
-               session, conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed);
+               session, conn->viewer_session, seek_type, &nb_streams, nullptr, nullptr, &closed);
        if (ret < 0) {
                goto end_put_session;
        }
        pthread_mutex_unlock(&session->lock);
        session_put(session);
-       session = NULL;
+       session = nullptr;
 
        response.streams_count = htobe32(nb_streams);
        /*
@@ -1633,13 +1659,14 @@ static int viewer_get_next_index(struct relay_connection *conn)
        struct lttng_viewer_get_next_index request_index;
        struct lttng_viewer_index viewer_index;
        struct ctf_packet_index packet_index;
-       struct relay_viewer_stream *vstream = NULL;
-       struct relay_stream *rstream = NULL;
-       struct ctf_trace *ctf_trace = NULL;
-       struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
+       struct relay_stream *rstream = nullptr;
+       struct ctf_trace *ctf_trace = nullptr;
+       struct relay_viewer_stream *metadata_viewer_stream = nullptr;
        bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
        uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
        enum lttng_trace_chunk_status status;
+       bool attached_sessions_have_new_streams = false;
 
        LTTNG_ASSERT(conn);
 
@@ -1691,6 +1718,17 @@ static int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       ret = check_new_streams(conn);
+       if (ret < 0) {
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams in the attached sessions, returning status=%s",
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               goto send_reply;
+       } else if (ret == 1) {
+               attached_sessions_have_new_streams = true;
+       }
+
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
@@ -1800,6 +1838,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
                 */
                goto send_reply;
        }
+
        /* At this point, ret is 0 thus we will be able to read the index. */
        LTTNG_ASSERT(!ret);
 
@@ -1847,7 +1886,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
                                             rstream->channel_name,
                                             rstream->tracefile_size,
                                             vstream->current_tracefile_id,
-                                            NULL,
+                                            nullptr,
                                             file_path,
                                             sizeof(file_path));
                if (ret < 0) {
@@ -1878,19 +1917,6 @@ static int viewer_get_next_index(struct relay_connection *conn)
                vstream->stream_file.handle = fs_handle;
        }
 
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               ERR("Error checking for new streams before sending new index to stream id %" PRIu64
-                   ", returning status=%s",
-                   (uint64_t) be64toh(request_index.stream_id),
-                   lttng_viewer_next_index_return_code_str(
-                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
-               goto send_reply;
-       } else if (ret == 1) {
-               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-       }
-
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
                viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
@@ -1941,6 +1967,10 @@ send_reply:
                pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
        }
 
+       if (attached_sessions_have_new_streams) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        viewer_index.flags = htobe32(viewer_index.flags);
        viewer_index.status = htobe32(viewer_index.status);
        health_code_update();
@@ -1984,10 +2014,10 @@ static int viewer_get_packet(struct relay_connection *conn)
 {
        int ret;
        off_t lseek_ret;
-       char *reply = NULL;
+       char *reply = nullptr;
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply_header;
-       struct relay_viewer_stream *vstream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
        uint32_t reply_size = sizeof(reply_header);
        uint32_t packet_data_len = 0;
        ssize_t read_len;
@@ -2103,10 +2133,11 @@ static int viewer_get_metadata(struct relay_connection *conn)
        int fd = -1;
        ssize_t read_len;
        uint64_t len = 0;
-       char *data = NULL;
+       char *data = nullptr;
        struct lttng_viewer_get_metadata request;
        struct lttng_viewer_metadata_packet reply;
-       struct relay_viewer_stream *vstream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
+       bool dispose_of_stream = false;
 
        LTTNG_ASSERT(conn);
 
@@ -2135,6 +2166,9 @@ static int viewer_get_metadata(struct relay_connection *conn)
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
+
+       pthread_mutex_lock(&vstream->stream->trace->session->lock);
+       pthread_mutex_lock(&vstream->stream->trace->lock);
        pthread_mutex_lock(&vstream->stream->lock);
        if (!vstream->stream->is_metadata) {
                ERR("Invalid metadata stream");
@@ -2143,11 +2177,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
 
        if (vstream->metadata_sent >= vstream->stream->metadata_received) {
                /*
-                * The live viewers expect to receive a NO_NEW_METADATA
-                * status before a stream disappears, otherwise they abort the
-                * entire live connection when receiving an error status.
-                *
-                * Clear feature resets the metadata_sent to 0 until the
+                * Clear feature resets the metadata_received to 0 until the
                 * same metadata is received again.
                 */
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
@@ -2155,20 +2185,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
                 * The live viewer considers a closed 0 byte metadata stream as
                 * an error.
                 */
-               if (vstream->metadata_sent > 0) {
-                       if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
-                               /*
-                                * Release ownership for the viewer metadata
-                                * stream. Note that this reference is the
-                                * viewer's reference. The vstream still exists
-                                * until the end of the function as
-                                * viewer_stream_get_by_id() took a reference.
-                                */
-                               viewer_stream_put(vstream);
-                       }
-
-                       vstream->stream->no_new_metadata_notified = true;
-               }
+               dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed;
                goto send_reply;
        }
 
@@ -2206,6 +2223,19 @@ static int viewer_get_metadata(struct relay_connection *conn)
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
        if (!vstream->stream_file.trace_chunk) {
+               if (vstream->stream->trace->session->connection_closed) {
+                       /*
+                        * If the connection is closed, there is no way for the metadata stream
+                        * to ever transition back to an active chunk. As such, signal to the viewer
+                        * that there is no new metadata available.
+                        *
+                        * The stream can be disposed-of. On the next execution of this command,
+                        * the relay daemon will reply with an error status since the stream can't
+                        * be found.
+                        */
+                       dispose_of_stream = true;
+               }
+
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                len = 0;
                goto send_reply;
@@ -2223,7 +2253,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
                                             rstream->channel_name,
                                             rstream->tracefile_size,
                                             vstream->current_tracefile_id,
-                                            NULL,
+                                            nullptr,
                                             file_path,
                                             sizeof(file_path));
                if (ret < 0) {
@@ -2336,6 +2366,8 @@ send_reply:
        health_code_update();
        if (vstream) {
                pthread_mutex_unlock(&vstream->stream->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->session->lock);
        }
        ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
@@ -2361,7 +2393,22 @@ end_free:
 end:
        if (vstream) {
                viewer_stream_put(vstream);
+               if (dispose_of_stream) {
+                       /*
+                        * Trigger the destruction of the viewer stream
+                        * by releasing its global reference.
+                        *
+                        * The live viewers expect to receive a NO_NEW_METADATA
+                        * status before a stream disappears, otherwise they abort the
+                        * entire live connection when receiving an error status.
+                        *
+                        * On the next query for this stream, an error will be reported to the
+                        * client.
+                        */
+                       viewer_stream_put(vstream);
+               }
        }
+
        return ret;
 }
 
@@ -2407,7 +2454,7 @@ static int viewer_detach_session(struct relay_connection *conn)
        int ret;
        struct lttng_viewer_detach_session_response response;
        struct lttng_viewer_detach_session_request request;
-       struct relay_session *session = NULL;
+       struct relay_session *session = nullptr;
        uint64_t viewer_session_to_close;
 
        LTTNG_ASSERT(conn);
@@ -2548,7 +2595,7 @@ static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollf
        (void) lttng_poll_del(events, pollfd);
 
        ret = fd_tracker_close_unsuspendable_fd(
-               the_fd_tracker, &pollfd, 1, fd_tracker_util_close_fd, NULL);
+               the_fd_tracker, &pollfd, 1, fd_tracker_util_close_fd, nullptr);
        if (ret < 0) {
                ERR("Closing pollfd %d", pollfd);
        }
@@ -2594,7 +2641,7 @@ static void *thread_worker(void *data __attribute__((unused)))
        }
 
 restart:
-       while (1) {
+       while (true) {
                int i;
 
                health_code_update();
@@ -2640,7 +2687,10 @@ restart:
                                if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
-                                       ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
+                                       ret = lttng_read(live_conn_pipe[0],
+                                                        &conn,
+                                                        sizeof(conn)); /* NOLINT sizeof used on a
+                                                                          pointer. */
                                        if (ret < 0) {
                                                goto error;
                                        }
@@ -2712,12 +2762,15 @@ error:
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 
        /* Cleanup remaining connection object. */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
-               health_code_update();
-               connection_put(destroy_conn);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (
+                       viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+                       health_code_update();
+                       connection_put(destroy_conn);
+               }
        }
-       rcu_read_unlock();
 error_poll_create:
        lttng_ht_destroy(viewer_connections_ht);
 viewer_connections_ht_error:
@@ -2737,20 +2790,20 @@ error_testpoint:
                ERR("Error stopping threads");
        }
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 /*
  * Create the relay command pipe to wake thread_manage_apps.
  * Closed in cleanup().
  */
-static int create_conn_pipe(void)
+static int create_conn_pipe()
 {
        return fd_tracker_util_pipe_open_cloexec(
                the_fd_tracker, "Live connection pipe", live_conn_pipe);
 }
 
-int relayd_live_join(void)
+int relayd_live_join()
 {
        int ret, retval = 0;
        void *status;
@@ -2823,8 +2876,10 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the dispatcher thread */
-       ret = pthread_create(
-               &live_dispatcher_thread, default_pthread_attr(), thread_dispatcher, (void *) NULL);
+       ret = pthread_create(&live_dispatcher_thread,
+                            default_pthread_attr(),
+                            thread_dispatcher,
+                            (void *) nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_create viewer dispatcher");
@@ -2833,7 +2888,7 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the worker thread */
-       ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, NULL);
+       ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_create viewer worker");
@@ -2843,7 +2898,7 @@ int relayd_live_create(struct lttng_uri *uri)
 
        /* Setup the listener thread */
        ret = pthread_create(
-               &live_listener_thread, default_pthread_attr(), thread_listener, (void *) NULL);
+               &live_listener_thread, default_pthread_attr(), thread_listener, (void *) nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_create viewer listener");
This page took 0.040394 seconds and 4 git commands to generate.