Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index 5aa899b282605a0c058f7f497f9b500ad7e0226b..699778d0e4ef2c144e4bcaba390387750b2f7606 100644 (file)
@@ -8,6 +8,37 @@
  */
 
 #define _LGPL_SOURCE
+#include "cmd.hpp"
+#include "connection.hpp"
+#include "ctf-trace.hpp"
+#include "health-relayd.hpp"
+#include "live.hpp"
+#include "lttng-relayd.hpp"
+#include "session.hpp"
+#include "stream.hpp"
+#include "testpoint.hpp"
+#include "utils.hpp"
+#include "viewer-session.hpp"
+#include "viewer-stream.hpp"
+
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/compat/socket.hpp>
+#include <common/defaults.hpp>
+#include <common/fd-tracker/utils.hpp>
+#include <common/fs-handle.hpp>
+#include <common/futex.hpp>
+#include <common/index/index.hpp>
+#include <common/sessiond-comm/inet.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/urcu.hpp>
+#include <common/uri.hpp>
+#include <common/utils.hpp>
+
+#include <lttng/lttng.h>
+
 #include <fcntl.h>
 #include <getopt.h>
 #include <grp.h>
@@ -18,6 +49,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <string>
 #include <sys/mman.h>
 #include <sys/mount.h>
 #include <sys/resource.h>
 #include <urcu/futex.h>
 #include <urcu/rculist.h>
 #include <urcu/uatomic.h>
-#include <string>
-
-#include <common/common.h>
-#include <common/compat/endian.h>
-#include <common/compat/poll.h>
-#include <common/compat/socket.h>
-#include <common/defaults.h>
-#include <common/fd-tracker/utils.h>
-#include <common/fs-handle.h>
-#include <common/futex.h>
-#include <common/index/index.h>
-#include <common/sessiond-comm/inet.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/uri.h>
-#include <common/utils.h>
-#include <lttng/lttng.h>
 
-#include "cmd.h"
-#include "connection.h"
-#include "ctf-trace.h"
-#include "health-relayd.h"
-#include "live.h"
-#include "lttng-relayd.h"
-#include "session.h"
-#include "stream.h"
-#include "testpoint.h"
-#include "utils.h"
-#include "viewer-session.h"
-#include "viewer-stream.h"
-
-#define SESSION_BUF_DEFAULT_COUNT      16
+#define SESSION_BUF_DEFAULT_COUNT 16
 
 static struct lttng_uri *live_uri;
 
@@ -86,11 +88,9 @@ static pthread_t live_worker_thread;
 static struct relay_conn_queue viewer_conn_queue;
 
 static uint64_t last_relay_viewer_session_id;
-static pthread_mutex_t last_relay_viewer_session_id_lock =
-               PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t last_relay_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
 
-static
-const char *lttng_viewer_command_str(lttng_viewer_command cmd)
+static const char *lttng_viewer_command_str(lttng_viewer_command cmd)
 {
        switch (cmd) {
        case LTTNG_VIEWER_CONNECT:
@@ -116,9 +116,8 @@ const char *lttng_viewer_command_str(lttng_viewer_command cmd)
        }
 }
 
-static
-const char *lttng_viewer_next_index_return_code_str(
-               enum lttng_viewer_next_index_return_code code)
+static const char *
+lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code)
 {
        switch (code) {
        case LTTNG_VIEWER_INDEX_OK:
@@ -138,9 +137,7 @@ const char *lttng_viewer_next_index_return_code_str(
        }
 }
 
-static
-const char *lttng_viewer_attach_return_code_str(
-               enum lttng_viewer_attach_return_code code)
+static const char *lttng_viewer_attach_return_code_str(enum lttng_viewer_attach_return_code code)
 {
        switch (code) {
        case LTTNG_VIEWER_ATTACH_OK:
@@ -160,9 +157,8 @@ const char *lttng_viewer_attach_return_code_str(
        }
 };
 
-static
-const char *lttng_viewer_get_packet_return_code_str(
-               enum lttng_viewer_get_packet_return_code code)
+static const char *
+lttng_viewer_get_packet_return_code_str(enum lttng_viewer_get_packet_return_code code)
 {
        switch (code) {
        case LTTNG_VIEWER_GET_PACKET_OK:
@@ -181,8 +177,7 @@ const char *lttng_viewer_get_packet_return_code_str(
 /*
  * Cleanup the daemon
  */
-static
-void cleanup_relayd_live(void)
+static void cleanup_relayd_live()
 {
        DBG("Cleaning up");
 
@@ -196,8 +191,7 @@ void cleanup_relayd_live(void)
  * Return the size of the received message or else a negative value on error
  * with errno being set by recvmsg() syscall.
  */
-static
-ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
+static ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
 {
        ssize_t ret;
 
@@ -222,8 +216,7 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
  * Return the size of the sent message or else a negative value on error with
  * errno being set by sendmsg() syscall.
  */
-static
-ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
+static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
 {
        ssize_t ret;
 
@@ -242,32 +235,36 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
  * Returns 1 if new streams got added, 0 if nothing changed, a negative value
  * on error.
  */
-static
-int check_new_streams(struct relay_connection *conn)
+static int check_new_streams(struct relay_connection *conn)
 {
        struct relay_session *session;
-       unsigned long current_val;
        int ret = 0;
 
        if (!conn->viewer_session) {
                goto end;
        }
-       rcu_read_lock();
-       cds_list_for_each_entry_rcu(session,
-                       &conn->viewer_session->session_list,
-                       viewer_session_node) {
-               if (!session_get(session)) {
-                       continue;
-               }
-               current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-               ret = current_val;
-               session_put(session);
-               if (ret == 1) {
-                       goto end;
+
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_list_for_each_entry_rcu(
+                       session, &conn->viewer_session->session_list, viewer_session_node)
+               {
+                       if (!session_get(session)) {
+                               continue;
+                       }
+
+                       ret = uatomic_read(&session->new_streams);
+                       session_put(session);
+                       if (ret == 1) {
+                               goto end;
+                       }
                }
        }
+
 end:
-       rcu_read_unlock();
+       DBG("Viewer connection has%s new streams: socket_fd = %d",
+           ret == 0 ? " no" : "",
+           conn->sock->fd);
        return ret;
 }
 
@@ -277,73 +274,72 @@ end:
  *
  * Return 0 on success or else a negative value.
  */
-static
-ssize_t send_viewer_streams(struct lttcomm_sock *sock,
-               uint64_t session_id, unsigned int ignore_sent_flag)
+static ssize_t
+send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag)
 {
        ssize_t ret;
        struct lttng_ht_iter iter;
        struct relay_viewer_stream *vstream;
 
-       rcu_read_lock();
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-       cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
-                       stream_n.node) {
-               struct ctf_trace *ctf_trace;
-               struct lttng_viewer_stream send_stream = {};
+               cds_lfht_for_each_entry (
+                       viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {
+                       struct ctf_trace *ctf_trace;
+                       struct lttng_viewer_stream send_stream = {};
 
-               health_code_update();
+                       health_code_update();
 
-               if (!viewer_stream_get(vstream)) {
-                       continue;
-               }
+                       if (!viewer_stream_get(vstream)) {
+                               continue;
+                       }
 
-               pthread_mutex_lock(&vstream->stream->lock);
-               /* Ignore if not the same session. */
-               if (vstream->stream->trace->session->id != session_id ||
-                               (!ignore_sent_flag && vstream->sent_flag)) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
-                       viewer_stream_put(vstream);
-                       continue;
-               }
+                       pthread_mutex_lock(&vstream->stream->lock);
+                       /* Ignore if not the same session. */
+                       if (vstream->stream->trace->session->id != session_id ||
+                           (!ignore_sent_flag && vstream->sent_flag)) {
+                               pthread_mutex_unlock(&vstream->stream->lock);
+                               viewer_stream_put(vstream);
+                               continue;
+                       }
 
-               ctf_trace = vstream->stream->trace;
-               send_stream.id = htobe64(vstream->stream->stream_handle);
-               send_stream.ctf_trace_id = htobe64(ctf_trace->id);
-               send_stream.metadata_flag = htobe32(
-                               vstream->stream->is_metadata);
-               if (lttng_strncpy(send_stream.path_name, vstream->path_name,
-                               sizeof(send_stream.path_name))) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
-                       viewer_stream_put(vstream);
-                       ret = -1;       /* Error. */
-                       goto end_unlock;
-               }
-               if (lttng_strncpy(send_stream.channel_name,
-                               vstream->channel_name,
-                               sizeof(send_stream.channel_name))) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
-                       viewer_stream_put(vstream);
-                       ret = -1;       /* Error. */
-                       goto end_unlock;
-               }
+                       ctf_trace = vstream->stream->trace;
+                       send_stream.id = htobe64(vstream->stream->stream_handle);
+                       send_stream.ctf_trace_id = htobe64(ctf_trace->id);
+                       send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
+                       if (lttng_strncpy(send_stream.path_name,
+                                         vstream->path_name,
+                                         sizeof(send_stream.path_name))) {
+                               pthread_mutex_unlock(&vstream->stream->lock);
+                               viewer_stream_put(vstream);
+                               ret = -1; /* Error. */
+                               goto end;
+                       }
+                       if (lttng_strncpy(send_stream.channel_name,
+                                         vstream->channel_name,
+                                         sizeof(send_stream.channel_name))) {
+                               pthread_mutex_unlock(&vstream->stream->lock);
+                               viewer_stream_put(vstream);
+                               ret = -1; /* Error. */
+                               goto end;
+                       }
 
-               DBG("Sending stream %" PRIu64 " to viewer",
-                               vstream->stream->stream_handle);
-               vstream->sent_flag = 1;
-               pthread_mutex_unlock(&vstream->stream->lock);
+                       DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
+                       vstream->sent_flag = true;
+                       pthread_mutex_unlock(&vstream->stream->lock);
 
-               ret = send_response(sock, &send_stream, sizeof(send_stream));
-               viewer_stream_put(vstream);
-               if (ret < 0) {
-                       goto end_unlock;
+                       ret = send_response(sock, &send_stream, sizeof(send_stream));
+                       viewer_stream_put(vstream);
+                       if (ret < 0) {
+                               goto end;
+                       }
                }
        }
 
        ret = 0;
 
-end_unlock:
-       rcu_read_unlock();
+end:
        return ret;
 }
 
@@ -359,17 +355,17 @@ end_unlock:
  * Return 0 on success or else a negative value.
  */
 static int make_viewer_streams(struct relay_session *relay_session,
-               struct relay_viewer_session *viewer_session,
-               enum lttng_viewer_seek seek_t,
-               uint32_t *nb_total,
-               uint32_t *nb_unsent,
-               uint32_t *nb_created,
-               bool *closed)
+                              struct relay_viewer_session *viewer_session,
+                              enum lttng_viewer_seek seek_t,
+                              uint32_t *nb_total,
+                              uint32_t *nb_unsent,
+                              uint32_t *nb_created,
+                              bool *closed)
 {
        int ret;
        struct lttng_ht_iter iter;
        struct ctf_trace *ctf_trace;
-       struct relay_stream *relay_stream = NULL;
+       struct relay_stream *relay_stream = nullptr;
 
        LTTNG_ASSERT(relay_session);
        ASSERT_LOCKED(relay_session->lock);
@@ -382,197 +378,201 @@ static int make_viewer_streams(struct relay_session *relay_session,
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter,
-                       ctf_trace, node.node) {
-               bool trace_has_metadata_stream = false;
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               health_code_update();
-
-               if (!ctf_trace_get(ctf_trace)) {
-                       continue;
-               }
+               cds_lfht_for_each_entry (
+                       relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) {
+                       bool trace_has_metadata_stream = false;
 
-               /*
-                * Iterate over all the streams of the trace to see if we have a
-                * metadata stream.
-                */
-               cds_list_for_each_entry_rcu(relay_stream,
-                               &ctf_trace->stream_list, stream_node)
-               {
-                       bool is_metadata_stream;
-
-                       pthread_mutex_lock(&relay_stream->lock);
-                       is_metadata_stream = relay_stream->is_metadata;
-                       pthread_mutex_unlock(&relay_stream->lock);
+                       health_code_update();
 
-                       if (is_metadata_stream) {
-                               trace_has_metadata_stream = true;
-                               break;
+                       if (!ctf_trace_get(ctf_trace)) {
+                               continue;
                        }
-               }
-
-               relay_stream = NULL;
-
-               /*
-                * If there is no metadata stream in this trace at the moment
-                * and we never sent one to the viewer, skip the trace. We
-                * accept that the viewer will not see this trace at all.
-                */
-               if (!trace_has_metadata_stream &&
-                               !ctf_trace->metadata_stream_sent_to_viewer) {
-                       ctf_trace_put(ctf_trace);
-                       continue;
-               }
 
-               cds_list_for_each_entry_rcu(relay_stream,
-                               &ctf_trace->stream_list, stream_node)
-               {
-                       struct relay_viewer_stream *viewer_stream;
-
-                       if (!stream_get(relay_stream)) {
-                               continue;
+                       /*
+                        * Iterate over all the streams of the trace to see if we have a
+                        * metadata stream.
+                        */
+                       cds_list_for_each_entry_rcu(
+                               relay_stream, &ctf_trace->stream_list, stream_node)
+                       {
+                               bool is_metadata_stream;
+
+                               pthread_mutex_lock(&relay_stream->lock);
+                               is_metadata_stream = relay_stream->is_metadata;
+                               pthread_mutex_unlock(&relay_stream->lock);
+
+                               if (is_metadata_stream) {
+                                       trace_has_metadata_stream = true;
+                                       break;
+                               }
                        }
 
-                       pthread_mutex_lock(&relay_stream->lock);
+                       relay_stream = nullptr;
+
                        /*
-                        * stream published is protected by the session lock.
+                        * If there is no metadata stream in this trace at the moment
+                        * and we never sent one to the viewer, skip the trace. We
+                        * accept that the viewer will not see this trace at all.
                         */
-                       if (!relay_stream->published) {
-                               goto next;
+                       if (!trace_has_metadata_stream &&
+                           !ctf_trace->metadata_stream_sent_to_viewer) {
+                               ctf_trace_put(ctf_trace);
+                               continue;
                        }
-                       viewer_stream = viewer_stream_get_by_id(
-                                       relay_stream->stream_handle);
-                       if (!viewer_stream) {
-                               struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
 
-                               /*
-                                * Save that we sent the metadata stream to the
-                                * viewer. So that we know what trace the viewer
-                                * is aware of.
-                                */
-                               if (relay_stream->is_metadata) {
-                                       ctf_trace->metadata_stream_sent_to_viewer = true;
+                       cds_list_for_each_entry_rcu(
+                               relay_stream, &ctf_trace->stream_list, stream_node)
+                       {
+                               struct relay_viewer_stream *viewer_stream;
+
+                               if (!stream_get(relay_stream)) {
+                                       continue;
                                }
 
+                               pthread_mutex_lock(&relay_stream->lock);
                                /*
-                                * If a rotation is ongoing, use a copy of the
-                                * relay stream's chunk to ensure the stream
-                                * files exist.
-                                *
-                                * Otherwise, the viewer session's current trace
-                                * chunk can be used safely.
+                                * stream published is protected by the session lock.
                                 */
-                               if ((relay_stream->ongoing_rotation.is_set ||
-                                               session_has_ongoing_rotation(relay_session)) &&
-                                               relay_stream->trace_chunk) {
-                                       viewer_stream_trace_chunk = lttng_trace_chunk_copy(
-                                                       relay_stream->trace_chunk);
-                                       if (!viewer_stream_trace_chunk) {
-                                               ret = -1;
-                                               ctf_trace_put(ctf_trace);
-                                               goto error_unlock;
-                                       }
-                               } else {
+                               if (!relay_stream->published) {
+                                       goto next;
+                               }
+                               viewer_stream =
+                                       viewer_stream_get_by_id(relay_stream->stream_handle);
+                               if (!viewer_stream) {
+                                       struct lttng_trace_chunk *viewer_stream_trace_chunk =
+                                               nullptr;
+
                                        /*
-                                        * Transition the viewer session into the newest trace chunk available.
+                                        * Save that we sent the metadata stream to the
+                                        * viewer. So that we know what trace the viewer
+                                        * is aware of.
                                         */
-                                       if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk,
-                                                       relay_stream->trace_chunk)) {
+                                       if (relay_stream->is_metadata) {
+                                               ctf_trace->metadata_stream_sent_to_viewer = true;
+                                       }
 
-                                               ret = viewer_session_set_trace_chunk_copy(
-                                                               viewer_session,
-                                                               relay_stream->trace_chunk);
-                                               if (ret) {
+                                       /*
+                                        * If a rotation is ongoing, use a copy of the
+                                        * relay stream's chunk to ensure the stream
+                                        * files exist.
+                                        *
+                                        * Otherwise, the viewer session's current trace
+                                        * chunk can be used safely.
+                                        */
+                                       if ((relay_stream->ongoing_rotation.is_set ||
+                                            session_has_ongoing_rotation(relay_session)) &&
+                                           relay_stream->trace_chunk) {
+                                               viewer_stream_trace_chunk = lttng_trace_chunk_copy(
+                                                       relay_stream->trace_chunk);
+                                               if (!viewer_stream_trace_chunk) {
                                                        ret = -1;
                                                        ctf_trace_put(ctf_trace);
                                                        goto error_unlock;
                                                }
-                                       }
-
-                                       if (relay_stream->trace_chunk) {
+                                       } else {
                                                /*
-                                                * If the corresponding relay
-                                                * stream's trace chunk is set,
-                                                * the viewer stream will be
-                                                * created under it.
-                                                *
-                                                * Note that a relay stream can
-                                                * have a NULL output trace
-                                                * chunk (for instance, after a
-                                                * clear against a stopped
-                                                * session).
+                                                * Transition the viewer session into the newest
+                                                * trace chunk available.
                                                 */
-                                               const bool reference_acquired = lttng_trace_chunk_get(
-                                                               viewer_session->current_trace_chunk);
+                                               if (!lttng_trace_chunk_ids_equal(
+                                                           viewer_session->current_trace_chunk,
+                                                           relay_stream->trace_chunk)) {
+                                                       ret = viewer_session_set_trace_chunk_copy(
+                                                               viewer_session,
+                                                               relay_stream->trace_chunk);
+                                                       if (ret) {
+                                                               ret = -1;
+                                                               ctf_trace_put(ctf_trace);
+                                                               goto error_unlock;
+                                                       }
+                                               }
 
-                                               LTTNG_ASSERT(reference_acquired);
-                                               viewer_stream_trace_chunk =
+                                               if (relay_stream->trace_chunk) {
+                                                       /*
+                                                        * If the corresponding relay
+                                                        * stream's trace chunk is set,
+                                                        * the viewer stream will be
+                                                        * created under it.
+                                                        *
+                                                        * Note that a relay stream can
+                                                        * have a NULL output trace
+                                                        * chunk (for instance, after a
+                                                        * clear against a stopped
+                                                        * session).
+                                                        */
+                                                       const bool reference_acquired =
+                                                               lttng_trace_chunk_get(
+                                                                       viewer_session
+                                                                               ->current_trace_chunk);
+
+                                                       LTTNG_ASSERT(reference_acquired);
+                                                       viewer_stream_trace_chunk =
                                                                viewer_session->current_trace_chunk;
+                                               }
                                        }
-                               }
 
-                               viewer_stream = viewer_stream_create(
-                                               relay_stream,
-                                               viewer_stream_trace_chunk,
-                                               seek_t);
-                               lttng_trace_chunk_put(viewer_stream_trace_chunk);
-                               viewer_stream_trace_chunk = NULL;
-                               if (!viewer_stream) {
-                                       ret = -1;
-                                       ctf_trace_put(ctf_trace);
-                                       goto error_unlock;
-                               }
+                                       viewer_stream = viewer_stream_create(
+                                               relay_stream, viewer_stream_trace_chunk, seek_t);
+                                       lttng_trace_chunk_put(viewer_stream_trace_chunk);
+                                       viewer_stream_trace_chunk = nullptr;
+                                       if (!viewer_stream) {
+                                               ret = -1;
+                                               ctf_trace_put(ctf_trace);
+                                               goto error_unlock;
+                                       }
 
-                               if (nb_created) {
-                                       /* Update number of created stream counter. */
-                                       (*nb_created)++;
-                               }
-                               /*
-                                * Ensure a self-reference is preserved even
-                                * after we have put our local reference.
-                                */
-                               if (!viewer_stream_get(viewer_stream)) {
-                                       ERR("Unable to get self-reference on viewer stream, logic error.");
-                                       abort();
-                               }
-                       } else {
-                               if (!viewer_stream->sent_flag && nb_unsent) {
-                                       /* Update number of unsent stream counter. */
-                                       (*nb_unsent)++;
-                               }
-                       }
-                       /* Update number of total stream counter. */
-                       if (nb_total) {
-                               if (relay_stream->is_metadata) {
-                                       if (!relay_stream->closed ||
-                                                       relay_stream->metadata_received >
-                                                                       viewer_stream->metadata_sent) {
-                                               (*nb_total)++;
+                                       if (nb_created) {
+                                               /* Update number of created stream counter. */
+                                               (*nb_created)++;
+                                       }
+                                       /*
+                                        * Ensure a self-reference is preserved even
+                                        * after we have put our local reference.
+                                        */
+                                       if (!viewer_stream_get(viewer_stream)) {
+                                               ERR("Unable to get self-reference on viewer stream, logic error.");
+                                               abort();
                                        }
                                } else {
-                                       if (!relay_stream->closed ||
-                                                       !(((int64_t)(relay_stream->prev_data_seq -
-                                                                         relay_stream->last_net_seq_num)) >=
-                                                                       0)) {
-                                               (*nb_total)++;
+                                       if (!viewer_stream->sent_flag && nb_unsent) {
+                                               /* Update number of unsent stream counter. */
+                                               (*nb_unsent)++;
                                        }
                                }
+                               /* Update number of total stream counter. */
+                               if (nb_total) {
+                                       if (relay_stream->is_metadata) {
+                                               if (!relay_stream->closed ||
+                                                   relay_stream->metadata_received >
+                                                           viewer_stream->metadata_sent) {
+                                                       (*nb_total)++;
+                                               }
+                                       } else {
+                                               if (!relay_stream->closed ||
+                                                   !(((int64_t) (relay_stream->prev_data_seq -
+                                                                 relay_stream->last_net_seq_num)) >=
+                                                     0)) {
+                                                       (*nb_total)++;
+                                               }
+                                       }
+                               }
+                               /* Put local reference. */
+                               viewer_stream_put(viewer_stream);
+                       next:
+                               pthread_mutex_unlock(&relay_stream->lock);
+                               stream_put(relay_stream);
                        }
-                       /* Put local reference. */
-                       viewer_stream_put(viewer_stream);
-               next:
-                       pthread_mutex_unlock(&relay_stream->lock);
-                       stream_put(relay_stream);
+                       relay_stream = nullptr;
+                       ctf_trace_put(ctf_trace);
                }
-               relay_stream = NULL;
-               ctf_trace_put(ctf_trace);
        }
 
        ret = 0;
 
 error_unlock:
-       rcu_read_unlock();
 
        if (relay_stream) {
                pthread_mutex_unlock(&relay_stream->lock);
@@ -582,7 +582,7 @@ error_unlock:
        return ret;
 }
 
-int relayd_live_stop(void)
+int relayd_live_stop()
 {
        /* Stop dispatch thread */
        CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
@@ -590,56 +590,7 @@ int relayd_live_stop(void)
        return 0;
 }
 
-/*
- * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
- */
-static
-int create_named_thread_poll_set(struct lttng_poll_event *events,
-               int size, const char *name)
-{
-       int ret;
-
-       if (events == NULL || size == 0) {
-               ret = -1;
-               goto error;
-       }
-
-       ret = fd_tracker_util_poll_create(the_fd_tracker,
-                       name, events, 1, LTTNG_CLOEXEC);
-       if (ret) {
-               PERROR("Failed to create \"%s\" poll file descriptor", name);
-               goto error;
-       }
-
-       /* Add quit pipe */
-       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               goto error;
-       }
-
-       return 0;
-
-error:
-       return ret;
-}
-
-/*
- * Check if the thread quit pipe was triggered.
- *
- * Return 1 if it was triggered else 0;
- */
-static
-int check_thread_quit_pipe(int fd, uint32_t events)
-{
-       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
-               return 1;
-       }
-
-       return 0;
-}
-
-static
-int create_sock(void *data, int *out_fd)
+static int create_sock(void *data, int *out_fd)
 {
        int ret;
        struct lttcomm_sock *sock = (lttcomm_sock *) data;
@@ -654,8 +605,7 @@ end:
        return ret;
 }
 
-static
-int close_sock(void *data, int *in_fd __attribute__((unused)))
+static int close_sock(void *data, int *in_fd __attribute__((unused)))
 {
        struct lttcomm_sock *sock = (lttcomm_sock *) data;
 
@@ -679,16 +629,14 @@ end:
        return ret;
 }
 
-static
-struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
-               const char *name)
+static struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, const char *name)
 {
        int out_fd, ret;
-       struct lttcomm_sock *socks[2] = { listening_sock, NULL };
-       struct lttcomm_sock *new_sock = NULL;
+       struct lttcomm_sock *socks[2] = { listening_sock, nullptr };
+       struct lttcomm_sock *new_sock = nullptr;
 
-       ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd,
-                       (const char **) &name, 1, accept_sock, &socks);
+       ret = fd_tracker_open_unsuspendable_fd(
+               the_fd_tracker, &out_fd, (const char **) &name, 1, accept_sock, &socks);
        if (ret) {
                goto end;
        }
@@ -701,16 +649,15 @@ end:
 /*
  * Create and init socket from uri.
  */
-static
-struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
+static struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
 {
        int ret, sock_fd;
-       struct lttcomm_sock *sock = NULL;
+       struct lttcomm_sock *sock = nullptr;
        char uri_str[LTTNG_PATH_MAX];
-       char *formated_name = NULL;
+       char *formated_name = nullptr;
 
        sock = lttcomm_alloc_sock_from_uri(uri);
-       if (sock == NULL) {
+       if (sock == nullptr) {
                ERR("Allocating socket");
                goto error;
        }
@@ -722,19 +669,21 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
        ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
        uri_str[sizeof(uri_str) - 1] = '\0';
        if (ret >= 0) {
-               ret = asprintf(&formated_name, "%s socket @ %s", name,
-                               uri_str);
+               ret = asprintf(&formated_name, "%s socket @ %s", name, uri_str);
                if (ret < 0) {
-                       formated_name = NULL;
+                       formated_name = nullptr;
                }
        }
 
-       ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
-                       (const char **) (formated_name ? &formated_name : NULL),
-                       1, create_sock, sock);
+       ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker,
+                                              &sock_fd,
+                                              (const char **) (formated_name ? &formated_name :
+                                                                               nullptr),
+                                              1,
+                                              create_sock,
+                                              sock);
        if (ret) {
-               PERROR("Failed to create \"%s\" socket",
-                               formated_name ?: "Unknown");
+               PERROR("Failed to create \"%s\" socket", formated_name ?: "Unknown");
                goto error;
        }
        DBG("Listening on %s socket %d", name, sock->fd);
@@ -748,7 +697,6 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
        ret = sock->ops->listen(sock, -1);
        if (ret < 0) {
                goto error;
-
        }
 
        free(formated_name);
@@ -759,17 +707,16 @@ error:
                lttcomm_destroy_sock(sock);
        }
        free(formated_name);
-       return NULL;
+       return nullptr;
 }
 
 /*
  * This thread manages the listening for new connections on the network
  */
-static
-void *thread_listener(void *data __attribute__((unused)))
+static void *thread_listener(void *data __attribute__((unused)))
 {
-       int i, ret, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int i, ret, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *live_control_sock;
 
@@ -786,8 +733,7 @@ void *thread_listener(void *data __attribute__((unused)))
        }
 
        /* Pass 2 as size here for the thread quit pipe and control sockets. */
-       ret = create_named_thread_poll_set(&events, 2,
-                       "Live listener thread epoll");
+       ret = create_named_thread_poll_set(&events, 2, "Live listener thread epoll");
        if (ret < 0) {
                goto error_create_poll;
        }
@@ -804,12 +750,12 @@ void *thread_listener(void *data __attribute__((unused)))
                goto error_testpoint;
        }
 
-       while (1) {
+       while (true) {
                health_code_update();
 
                DBG("Listener accepting live viewers connections");
 
-restart:
+       restart:
                health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
                health_poll_exit();
@@ -826,15 +772,15 @@ restart:
 
                DBG("Relay new viewer connection received");
                for (i = 0; i < nb_fd; i++) {
-                       health_code_update();
-
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       health_code_update();
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
@@ -851,15 +797,15 @@ restart:
                                struct lttcomm_sock *newsock;
 
                                newsock = accept_live_sock(live_control_sock,
-                                               "Live socket to client");
+                                                          "Live socket to client");
                                if (!newsock) {
                                        PERROR("accepting control sock");
                                        goto error;
                                }
                                DBG("Relay viewer connection accepted socket %d", newsock->fd);
 
-                               ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
-                                               sizeof(val));
+                               ret = setsockopt(
+                                       newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
                                if (ret < 0) {
                                        PERROR("setsockopt inet");
                                        lttcomm_destroy_sock(newsock);
@@ -871,13 +817,12 @@ restart:
                                        goto error;
                                }
                                /* Ownership assumed by the connection. */
-                               newsock = NULL;
+                               newsock = nullptr;
 
                                /* Enqueue request for the dispatcher thread. */
                                cds_wfcq_head_ptr_t head;
                                head.h = &viewer_conn_queue.head;
-                               cds_wfcq_enqueue(head, &viewer_conn_queue.tail,
-                                                &new_conn->qnode);
+                               cds_wfcq_enqueue(head, &viewer_conn_queue.tail, &new_conn->qnode);
 
                                /*
                                 * Wake the dispatch queue futex.
@@ -904,9 +849,8 @@ error_create_poll:
        if (live_control_sock->fd >= 0) {
                int sock_fd = live_control_sock->fd;
 
-               ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
-                               &sock_fd, 1, close_sock,
-                               live_control_sock);
+               ret = fd_tracker_close_unsuspendable_fd(
+                       the_fd_tracker, &sock_fd, 1, close_sock, live_control_sock);
                if (ret) {
                        PERROR("close");
                }
@@ -924,19 +868,18 @@ error_sock_control:
        if (lttng_relay_stop_threads()) {
                ERR("Error stopping threads");
        }
-       return NULL;
+       return nullptr;
 }
 
 /*
  * This thread manages the dispatching of the requests to worker threads
  */
-static
-void *thread_dispatcher(void *data __attribute__((unused)))
+static void *thread_dispatcher(void *data __attribute__((unused)))
 {
        int err = -1;
        ssize_t ret;
        struct cds_wfcq_node *node;
-       struct relay_connection *conn = NULL;
+       struct relay_connection *conn = nullptr;
 
        DBG("[thread] Live viewer relay dispatcher started");
 
@@ -964,15 +907,14 @@ void *thread_dispatcher(void *data __attribute__((unused)))
                        /* Dequeue commands */
                        node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
                                                         &viewer_conn_queue.tail);
-                       if (node == NULL) {
+                       if (node == nullptr) {
                                DBG("Woken up but nothing in the live-viewer "
-                                               "relay command queue");
+                                   "relay command queue");
                                /* Continue thread execution */
                                break;
                        }
-                       conn = caa_container_of(node, struct relay_connection, qnode);
-                       DBG("Dispatching viewer request waiting on sock %d",
-                                       conn->sock->fd);
+                       conn = lttng::utils::container_of(node, &relay_connection::qnode);
+                       DBG("Dispatching viewer request waiting on sock %d", conn->sock->fd);
 
                        /*
                         * Inform worker thread of the new request. This
@@ -980,13 +922,15 @@ void *thread_dispatcher(void *data __attribute__((unused)))
                         * the data will be read at some point in time
                         * or wait to the end of the world :)
                         */
-                       ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
+                       ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn)); /* NOLINT sizeof
+                                                                                     used on a
+                                                                                     pointer. */
                        if (ret < 0) {
                                PERROR("write conn pipe");
                                connection_put(conn);
                                goto error;
                        }
-               } while (node != NULL);
+               } while (node != nullptr);
 
                /* Futex wait on queue. Blocking call on futex() */
                health_poll_entry();
@@ -1008,7 +952,7 @@ error_testpoint:
        if (lttng_relay_stop_threads()) {
                ERR("Error stopping threads");
        }
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -1016,13 +960,12 @@ error_testpoint:
  *
  * Return 0 on success or else negative value.
  */
-static
-int viewer_connect(struct relay_connection *conn)
+static int viewer_connect(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_connect reply, msg;
 
-       conn->version_check_done = 1;
+       conn->version_check_done = true;
 
        health_code_update();
 
@@ -1040,7 +983,8 @@ int viewer_connect(struct relay_connection *conn)
        /* Major versions must be the same */
        if (reply.major != be32toh(msg.major)) {
                DBG("Incompatible major versions ([relayd] %u vs [client] %u)",
-                               reply.major, be32toh(msg.major));
+                   reply.major,
+                   be32toh(msg.major));
                ret = -1;
                goto end;
        }
@@ -1101,77 +1045,80 @@ end:
  *
  * Return 0 on success or else a negative value.
  */
-static
-int viewer_list_sessions(struct relay_connection *conn)
+static int viewer_list_sessions(struct relay_connection *conn)
 {
        int ret = 0;
        struct lttng_viewer_list_sessions session_list;
        struct lttng_ht_iter iter;
        struct relay_session *session;
-       struct lttng_viewer_session *send_session_buf = NULL;
+       struct lttng_viewer_session *send_session_buf = nullptr;
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
 
-       send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+       send_session_buf = calloc<lttng_viewer_session>(SESSION_BUF_DEFAULT_COUNT);
        if (!send_session_buf) {
                return -1;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
-                       session_n.node) {
-               struct lttng_viewer_session *send_session;
+       {
+               lttng::urcu::read_lock_guard read_lock;
 
-               health_code_update();
+               cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) {
+                       struct lttng_viewer_session *send_session;
 
-               pthread_mutex_lock(&session->lock);
-               if (session->connection_closed) {
-                       /* Skip closed session */
-                       goto next_session;
-               }
+                       health_code_update();
+
+                       pthread_mutex_lock(&session->lock);
+                       if (session->connection_closed) {
+                               /* Skip closed session */
+                               goto next_session;
+                       }
 
-               if (count >= buf_count) {
-                       struct lttng_viewer_session *newbuf;
-                       uint32_t new_buf_count = buf_count << 1;
+                       if (count >= buf_count) {
+                               struct lttng_viewer_session *newbuf;
+                               uint32_t new_buf_count = buf_count << 1;
 
-                       newbuf = (lttng_viewer_session *) realloc(send_session_buf,
-                               new_buf_count * sizeof(*send_session_buf));
-                       if (!newbuf) {
+                               newbuf = (lttng_viewer_session *) realloc(
+                                       send_session_buf,
+                                       new_buf_count * sizeof(*send_session_buf));
+                               if (!newbuf) {
+                                       ret = -1;
+                                       goto break_loop;
+                               }
+                               send_session_buf = newbuf;
+                               buf_count = new_buf_count;
+                       }
+                       send_session = &send_session_buf[count];
+                       if (lttng_strncpy(send_session->session_name,
+                                         session->session_name,
+                                         sizeof(send_session->session_name))) {
                                ret = -1;
                                goto break_loop;
                        }
-                       send_session_buf = newbuf;
-                       buf_count = new_buf_count;
-               }
-               send_session = &send_session_buf[count];
-               if (lttng_strncpy(send_session->session_name,
-                               session->session_name,
-                               sizeof(send_session->session_name))) {
-                       ret = -1;
-                       goto break_loop;
-               }
-               if (lttng_strncpy(send_session->hostname, session->hostname,
-                               sizeof(send_session->hostname))) {
-                       ret = -1;
-                       goto break_loop;
-               }
-               send_session->id = htobe64(session->id);
-               send_session->live_timer = htobe32(session->live_timer);
-               if (session->viewer_attached) {
-                       send_session->clients = htobe32(1);
-               } else {
-                       send_session->clients = htobe32(0);
+                       if (lttng_strncpy(send_session->hostname,
+                                         session->hostname,
+                                         sizeof(send_session->hostname))) {
+                               ret = -1;
+                               goto break_loop;
+                       }
+                       send_session->id = htobe64(session->id);
+                       send_session->live_timer = htobe32(session->live_timer);
+                       if (session->viewer_attached) {
+                               send_session->clients = htobe32(1);
+                       } else {
+                               send_session->clients = htobe32(0);
+                       }
+                       send_session->streams = htobe32(session->stream_count);
+                       count++;
+               next_session:
+                       pthread_mutex_unlock(&session->lock);
+                       continue;
+               break_loop:
+                       pthread_mutex_unlock(&session->lock);
+                       break;
                }
-               send_session->streams = htobe32(session->stream_count);
-               count++;
-       next_session:
-               pthread_mutex_unlock(&session->lock);
-               continue;
-       break_loop:
-               pthread_mutex_unlock(&session->lock);
-               break;
        }
-       rcu_read_unlock();
+
        if (ret < 0) {
                goto end_free;
        }
@@ -1187,8 +1134,7 @@ int viewer_list_sessions(struct relay_connection *conn)
 
        health_code_update();
 
-       ret = send_response(conn->sock, send_session_buf,
-                       count * sizeof(*send_session_buf));
+       ret = send_response(conn->sock, send_session_buf, count * sizeof(*send_session_buf));
        if (ret < 0) {
                goto end_free;
        }
@@ -1203,14 +1149,13 @@ end_free:
 /*
  * Send the viewer the list of current streams.
  */
-static
-int viewer_get_new_streams(struct relay_connection *conn)
+static int viewer_get_new_streams(struct relay_connection *conn)
 {
        int ret, send_streams = 0;
        uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
        struct lttng_viewer_new_streams_request request;
        struct lttng_viewer_new_streams_response response;
-       struct relay_session *session = NULL;
+       struct relay_session *session = nullptr;
        uint64_t session_id;
        bool closed = false;
 
@@ -1262,9 +1207,12 @@ int viewer_get_new_streams(struct relay_connection *conn)
                goto send_reply_unlock;
        }
        ret = make_viewer_streams(session,
-                       conn->viewer_session,
-                       LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
-                       &nb_created, &closed);
+                                 conn->viewer_session,
+                                 LTTNG_VIEWER_SEEK_BEGINNING,
+                                 &nb_total,
+                                 &nb_unsent,
+                                 &nb_created,
+                                 &closed);
        if (ret < 0) {
                /*
                 * This is caused by an internal error; propagate the negative
@@ -1273,6 +1221,8 @@ int viewer_get_new_streams(struct relay_connection *conn)
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply_unlock;
        }
+
+       uatomic_set(&session->new_streams, 0);
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
@@ -1331,8 +1281,7 @@ error:
 /*
  * Send the viewer the list of current sessions.
  */
-static
-int viewer_attach_session(struct relay_connection *conn)
+static int viewer_attach_session(struct relay_connection *conn)
 {
        int send_streams = 0;
        ssize_t ret;
@@ -1340,7 +1289,7 @@ int viewer_attach_session(struct relay_connection *conn)
        enum lttng_viewer_seek seek_type;
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
-       struct relay_session *session = NULL;
+       struct relay_session *session = nullptr;
        enum lttng_viewer_attach_return_code viewer_attach_status;
        bool closed = false;
        uint64_t session_id;
@@ -1364,7 +1313,7 @@ int viewer_attach_session(struct relay_connection *conn)
        if (!conn->viewer_session) {
                viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION;
                DBG("Client trying to attach before creating a live viewer session, returning status=%s",
-                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+                   lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
@@ -1372,8 +1321,8 @@ int viewer_attach_session(struct relay_connection *conn)
        if (!session) {
                viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK;
                DBG("Relay session %" PRIu64 " not found, returning status=%s",
-                               session_id,
-                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+                   session_id,
+                   lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
        DBG("Attach relay session ID %" PRIu64 " received", session_id);
@@ -1382,18 +1331,17 @@ int viewer_attach_session(struct relay_connection *conn)
        if (session->live_timer == 0) {
                viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE;
                DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s",
-                               session_id,
-                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+                   session_id,
+                   lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
        send_streams = 1;
-       viewer_attach_status = viewer_session_attach(conn->viewer_session,
-                       session);
+       viewer_attach_status = viewer_session_attach(conn->viewer_session, session);
        if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
                DBG("Error attaching to relay session %" PRIu64 ", returning status=%s",
-                               session_id,
-                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+                   session_id,
+                   lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
@@ -1404,9 +1352,9 @@ int viewer_attach_session(struct relay_connection *conn)
                seek_type = (lttng_viewer_seek) be32toh(request.seek);
                break;
        default:
-               ERR("Wrong seek parameter for relay session %" PRIu64
-                               ", returning status=%s", session_id,
-                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+               ERR("Wrong seek parameter for relay session %" PRIu64 ", returning status=%s",
+                   session_id,
+                   lttng_viewer_attach_return_code_str(viewer_attach_status));
                viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR;
                send_streams = 0;
                goto send_reply;
@@ -1423,15 +1371,14 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
-       ret = make_viewer_streams(session,
-                       conn->viewer_session, seek_type,
-                       &nb_streams, NULL, NULL, &closed);
+       ret = make_viewer_streams(
+               session, conn->viewer_session, seek_type, &nb_streams, nullptr, nullptr, &closed);
        if (ret < 0) {
                goto end_put_session;
        }
        pthread_mutex_unlock(&session->lock);
        session_put(session);
-       session = NULL;
+       session = nullptr;
 
        response.streams_count = htobe32(nb_streams);
        /*
@@ -1445,8 +1392,8 @@ int viewer_attach_session(struct relay_connection *conn)
                response.streams_count = 0;
                viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK;
                ERR("Session %" PRIu64 " is closed, returning status=%s",
-                               session_id,
-                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+                   session_id,
+                   lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
@@ -1495,8 +1442,7 @@ error:
  *
  * Called with rstream lock held.
  */
-static int try_open_index(struct relay_viewer_stream *vstream,
-               struct relay_stream *rstream)
+static int try_open_index(struct relay_viewer_stream *vstream, struct relay_stream *rstream)
 {
        int ret = 0;
        const uint32_t connection_major = rstream->trace->session->major;
@@ -1510,19 +1456,21 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->index_received_seqcount == 0 ||
-                       !vstream->stream_file.trace_chunk) {
+       if (rstream->index_received_seqcount == 0 || !vstream->stream_file.trace_chunk) {
                ret = -ENOENT;
                goto end;
        }
 
        chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
-                       vstream->stream_file.trace_chunk, rstream->path_name,
-                       rstream->channel_name, rstream->tracefile_size,
-                       vstream->current_tracefile_id,
-                       lttng_to_index_major(connection_major, connection_minor),
-                       lttng_to_index_minor(connection_major, connection_minor),
-                       true, &vstream->index_file);
+               vstream->stream_file.trace_chunk,
+               rstream->path_name,
+               rstream->channel_name,
+               rstream->tracefile_size,
+               vstream->current_tracefile_id,
+               lttng_to_index_major(connection_major, connection_minor),
+               lttng_to_index_minor(connection_major, connection_minor),
+               true,
+               &vstream->index_file);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
                        ret = -ENOENT;
@@ -1547,37 +1495,37 @@ end:
  * Called with rstream lock held.
  */
 static int check_index_status(struct relay_viewer_stream *vstream,
-               struct relay_stream *rstream, struct ctf_trace *trace,
-               struct lttng_viewer_index *index)
+                             struct relay_stream *rstream,
+                             struct ctf_trace *trace,
+                             struct lttng_viewer_index *index)
 {
        int ret;
 
        DBG("Check index status: index_received_seqcount %" PRIu64 " "
-                               "index_sent_seqcount %" PRIu64 " "
-                               "for stream %" PRIu64,
-                               rstream->index_received_seqcount,
-                               vstream->index_sent_seqcount,
-                               vstream->stream->stream_handle);
-       if ((trace->session->connection_closed || rstream->closed)
-                       && rstream->index_received_seqcount
-                               == vstream->index_sent_seqcount) {
+           "index_sent_seqcount %" PRIu64 " "
+           "for stream %" PRIu64,
+           rstream->index_received_seqcount,
+           vstream->index_sent_seqcount,
+           vstream->stream->stream_handle);
+       if ((trace->session->connection_closed || rstream->closed) &&
+           rstream->index_received_seqcount == vstream->index_sent_seqcount) {
                /*
                 * Last index sent and session connection or relay
                 * stream are closed.
                 */
                index->status = LTTNG_VIEWER_INDEX_HUP;
                DBG("Check index status: Connection or stream are closed, stream %" PRIu64
-                       ",connection-closed=%d, relay-stream-closed=%d, returning status=%s",
-                       vstream->stream->stream_handle,
-                       trace->session->connection_closed, rstream->closed,
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) index->status));
+                   ",connection-closed=%d, relay-stream-closed=%d, returning status=%s",
+                   vstream->stream->stream_handle,
+                   trace->session->connection_closed,
+                   rstream->closed,
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) index->status));
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
-                       (rstream->index_received_seqcount == 0 ||
-                       (vstream->index_sent_seqcount != 0 &&
-                       rstream->index_received_seqcount
-                               <= vstream->index_sent_seqcount))) {
+                  (rstream->index_received_seqcount == 0 ||
+                   (vstream->index_sent_seqcount != 0 &&
+                    rstream->index_received_seqcount <= vstream->index_sent_seqcount))) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1597,15 +1545,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
                DBG("Check index status: inactive with beacon, for stream %" PRIu64
-                       ", returning status=%s",
-                       vstream->stream->stream_handle,
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) index->status));
+                   ", returning status=%s",
+                   vstream->stream->stream_handle,
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) index->status));
                goto index_ready;
        } else if (rstream->index_received_seqcount == 0 ||
-                       (vstream->index_sent_seqcount != 0 &&
-                       rstream->index_received_seqcount
-                               <= vstream->index_sent_seqcount)) {
+                  (vstream->index_sent_seqcount != 0 &&
+                   rstream->index_received_seqcount <= vstream->index_sent_seqcount)) {
                /*
                 * This checks whether received <= sent seqcount. In
                 * this case, we have not received a beacon. Therefore,
@@ -1619,32 +1566,29 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 */
                index->status = LTTNG_VIEWER_INDEX_RETRY;
                DBG("Check index status:"
-                       "did not received beacon for stream %" PRIu64
-                       ", returning status=%s",
-                       vstream->stream->stream_handle,
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) index->status));
+                   "did not received beacon for stream %" PRIu64 ", returning status=%s",
+                   vstream->stream->stream_handle,
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) index->status));
                goto index_ready;
        } else if (!tracefile_array_seq_in_file(rstream->tfa,
-                       vstream->current_tracefile_id,
-                       vstream->index_sent_seqcount)) {
+                                               vstream->current_tracefile_id,
+                                               vstream->index_sent_seqcount)) {
                /*
                 * The next index we want to send cannot be read either
                 * because we need to perform a rotation, or due to
                 * the producer having overwritten its trace file.
                 */
-               DBG("Viewer stream %" PRIu64 " rotation",
-                               vstream->stream->stream_handle);
+               DBG("Viewer stream %" PRIu64 " rotation", vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
                if (ret == 1) {
                        /* EOF across entire stream. */
                        index->status = LTTNG_VIEWER_INDEX_HUP;
                        DBG("Check index status:"
-                               "reached end of file for stream %" PRIu64
-                               ", returning status=%s",
-                               vstream->stream->stream_handle,
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) index->status));
+                           "reached end of file for stream %" PRIu64 ", returning status=%s",
+                           vstream->stream->stream_handle,
+                           lttng_viewer_next_index_return_code_str(
+                                   (enum lttng_viewer_next_index_return_code) index->status));
                        goto hup;
                }
                /*
@@ -1662,24 +1606,21 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * still unavailable.
                 */
                if (rstream->tracefile_count == 1 &&
-                               !tracefile_array_seq_in_file(
-                                       rstream->tfa,
-                                       vstream->current_tracefile_id,
-                                       vstream->index_sent_seqcount)) {
+                   !tracefile_array_seq_in_file(rstream->tfa,
+                                                vstream->current_tracefile_id,
+                                                vstream->index_sent_seqcount)) {
                        index->status = LTTNG_VIEWER_INDEX_RETRY;
                        DBG("Check index status:"
-                               "tracefile array sequence number %" PRIu64
-                               " not in file for stream %" PRIu64
-                               ", returning status=%s",
-                               vstream->index_sent_seqcount,
-                               vstream->stream->stream_handle,
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) index->status));
+                           "tracefile array sequence number %" PRIu64
+                           " not in file for stream %" PRIu64 ", returning status=%s",
+                           vstream->index_sent_seqcount,
+                           vstream->stream->stream_handle,
+                           lttng_viewer_next_index_return_code_str(
+                                   (enum lttng_viewer_next_index_return_code) index->status));
                        goto index_ready;
                }
-               LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
-                               vstream->current_tracefile_id,
-                               vstream->index_sent_seqcount));
+               LTTNG_ASSERT(tracefile_array_seq_in_file(
+                       rstream->tfa, vstream->current_tracefile_id, vstream->index_sent_seqcount));
        }
        /* ret == 0 means successful so we continue. */
        ret = 0;
@@ -1691,15 +1632,13 @@ index_ready:
        return 1;
 }
 
-static
-void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
-                struct lttng_trace_chunk *new_trace_chunk)
+static void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
+                                               struct lttng_trace_chunk *new_trace_chunk)
 {
        lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
 
        if (new_trace_chunk) {
-               const bool acquired_reference = lttng_trace_chunk_get(
-                               new_trace_chunk);
+               const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
 
                LTTNG_ASSERT(acquired_reference);
        }
@@ -1714,20 +1653,20 @@ void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
  *
  * Return 0 on success or else a negative value.
  */
-static
-int viewer_get_next_index(struct relay_connection *conn)
+static int viewer_get_next_index(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_get_next_index request_index;
        struct lttng_viewer_index viewer_index;
        struct ctf_packet_index packet_index;
-       struct relay_viewer_stream *vstream = NULL;
-       struct relay_stream *rstream = NULL;
-       struct ctf_trace *ctf_trace = NULL;
-       struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
+       struct relay_stream *rstream = nullptr;
+       struct ctf_trace *ctf_trace = nullptr;
+       struct relay_viewer_stream *metadata_viewer_stream = nullptr;
        bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
        uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
        enum lttng_trace_chunk_status status;
+       bool attached_sessions_have_new_streams = false;
 
        LTTNG_ASSERT(conn);
 
@@ -1743,10 +1682,10 @@ int viewer_get_next_index(struct relay_connection *conn)
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
                viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               DBG("Client requested index of unknown stream id %" PRIu64", returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               DBG("Client requested index of unknown stream id %" PRIu64 ", returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
@@ -1755,8 +1694,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        ctf_trace = rstream->trace;
 
        /* metadata_viewer_stream may be NULL. */
-       metadata_viewer_stream =
-                       ctf_trace_get_viewer_metadata_stream(ctf_trace);
+       metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace);
 
        /*
         * Hold the session lock to protect against concurrent changes
@@ -1772,51 +1710,63 @@ int viewer_get_next_index(struct relay_connection *conn)
         */
        if (rstream->is_metadata) {
                viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
-               DBG("Client requested index of a metadata stream id %" PRIu64", returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               DBG("Client requested index of a metadata stream id %" PRIu64
+                   ", returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               goto send_reply;
+       }
+
+       ret = check_new_streams(conn);
+       if (ret < 0) {
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams in the attached sessions, returning status=%s",
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
+       } else if (ret == 1) {
+               attached_sessions_have_new_streams = true;
        }
 
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
-               DBG("Client requested index for stream id %" PRIu64" while a stream rotation is ongoing, returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               DBG("Client requested index for stream id %" PRIu64
+                   " while a stream rotation is ongoing, returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
        if (session_has_ongoing_rotation(rstream->trace->session)) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
-               DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               DBG("Client requested index for stream id %" PRIu64
+                   " while a session rotation is ongoing, returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
        /*
         * Transition the viewer session into the newest trace chunk available.
         */
-       if (!lttng_trace_chunk_ids_equal(
-                       conn->viewer_session->current_trace_chunk,
-                       rstream->trace_chunk)) {
+       if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+                                        rstream->trace_chunk)) {
                DBG("Relay stream and viewer chunk ids differ");
 
-               ret = viewer_session_set_trace_chunk_copy(
-                               conn->viewer_session,
-                               rstream->trace_chunk);
+               ret = viewer_session_set_trace_chunk_copy(conn->viewer_session,
+                                                         rstream->trace_chunk);
                if (ret) {
                        viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
                        ERR("Error copying trace chunk for stream id %" PRIu64
-                               ", returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+                           ", returning status=%s",
+                           (uint64_t) be64toh(request_index.stream_id),
+                           lttng_viewer_next_index_return_code_str(
+                                   (enum lttng_viewer_next_index_return_code) viewer_index.status));
                        goto send_reply;
                }
        }
@@ -1833,53 +1783,49 @@ int viewer_get_next_index(struct relay_connection *conn)
         * after a session's destruction.
         */
        if (vstream->stream_file.trace_chunk) {
-               status = lttng_trace_chunk_get_id(
-                               vstream->stream_file.trace_chunk,
-                               &stream_file_chunk_id);
+               status = lttng_trace_chunk_get_id(vstream->stream_file.trace_chunk,
+                                                 &stream_file_chunk_id);
                LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
        }
        if (conn->viewer_session->current_trace_chunk) {
-               status = lttng_trace_chunk_get_id(
-                               conn->viewer_session->current_trace_chunk,
-                               &viewer_session_chunk_id);
+               status = lttng_trace_chunk_get_id(conn->viewer_session->current_trace_chunk,
+                                                 &viewer_session_chunk_id);
                LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
        }
 
        viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
-                       conn->viewer_session->current_trace_chunk,
-                       vstream->stream_file.trace_chunk);
+               conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk);
        viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
-                       vstream->last_seen_rotation_count + 1;
+               vstream->last_seen_rotation_count + 1;
 
        if (viewer_stream_and_session_in_same_chunk) {
                DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
-                               vstream->stream_file.trace_chunk ?
-                                               std::to_string(stream_file_chunk_id).c_str() :
-                                               "None",
-                               conn->viewer_session->current_trace_chunk ?
-                                               std::to_string(viewer_session_chunk_id).c_str() :
-                                               "None");
+                   vstream->stream_file.trace_chunk ?
+                           std::to_string(stream_file_chunk_id).c_str() :
+                           "None",
+                   conn->viewer_session->current_trace_chunk ?
+                           std::to_string(viewer_session_chunk_id).c_str() :
+                           "None");
        } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
                DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
-                               vstream->stream_file.trace_chunk ?
-                                               std::to_string(stream_file_chunk_id).c_str() :
-                                               "None",
-                               conn->viewer_session->current_trace_chunk ?
-                                               std::to_string(viewer_session_chunk_id).c_str() :
-                                               "None");
+                   vstream->stream_file.trace_chunk ?
+                           std::to_string(stream_file_chunk_id).c_str() :
+                           "None",
+                   conn->viewer_session->current_trace_chunk ?
+                           std::to_string(viewer_session_chunk_id).c_str() :
+                           "None");
        } else {
                DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
-                               vstream->stream_file.trace_chunk ?
-                                               std::to_string(stream_file_chunk_id).c_str() :
-                                               "None",
-                               conn->viewer_session->current_trace_chunk ?
-                                               std::to_string(viewer_session_chunk_id).c_str() :
-                                               "None");
+                   vstream->stream_file.trace_chunk ?
+                           std::to_string(stream_file_chunk_id).c_str() :
+                           "None",
+                   conn->viewer_session->current_trace_chunk ?
+                           std::to_string(viewer_session_chunk_id).c_str() :
+                           "None");
 
                viewer_stream_rotate_to_trace_chunk(vstream,
-                               conn->viewer_session->current_trace_chunk);
-               vstream->last_seen_rotation_count =
-                               rstream->completed_rotation_count;
+                                                   conn->viewer_session->current_trace_chunk);
+               vstream->last_seen_rotation_count = rstream->completed_rotation_count;
        }
 
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
@@ -1892,37 +1838,36 @@ int viewer_get_next_index(struct relay_connection *conn)
                 */
                goto send_reply;
        }
+
        /* At this point, ret is 0 thus we will be able to read the index. */
        LTTNG_ASSERT(!ret);
 
        /* Try to open an index if one is needed for that stream. */
        ret = try_open_index(vstream, rstream);
        if (ret == -ENOENT) {
-              if (rstream->closed) {
+               if (rstream->closed) {
                        viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
                        DBG("Cannot open index for stream id %" PRIu64
-                               "stream is closed, returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+                           "stream is closed, returning status=%s",
+                           (uint64_t) be64toh(request_index.stream_id),
+                           lttng_viewer_next_index_return_code_str(
+                                   (enum lttng_viewer_next_index_return_code) viewer_index.status));
                        goto send_reply;
-              } else {
+               } else {
                        viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
-                       DBG("Cannot open index for stream id %" PRIu64
-                               ", returning status=%s",
-                               (uint64_t) be64toh(request_index.stream_id),
-                               lttng_viewer_next_index_return_code_str(
-                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
+                       DBG("Cannot open index for stream id %" PRIu64 ", returning status=%s",
+                           (uint64_t) be64toh(request_index.stream_id),
+                           lttng_viewer_next_index_return_code_str(
+                                   (enum lttng_viewer_next_index_return_code) viewer_index.status));
                        goto send_reply;
-              }
+               }
        }
        if (ret < 0) {
                viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               ERR("Error opening index for stream id %" PRIu64
-                       ", returning status=%s",
-                       (uint64_t) be64toh(request_index.stream_id),
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               ERR("Error opening index for stream id %" PRIu64 ", returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
@@ -1938,9 +1883,12 @@ int viewer_get_next_index(struct relay_connection *conn)
                struct fs_handle *fs_handle;
 
                ret = utils_stream_file_path(rstream->path_name,
-                               rstream->channel_name, rstream->tracefile_size,
-                               vstream->current_tracefile_id, NULL, file_path,
-                               sizeof(file_path));
+                                            rstream->channel_name,
+                                            rstream->tracefile_size,
+                                            vstream->current_tracefile_id,
+                                            nullptr,
+                                            file_path,
+                                            sizeof(file_path));
                if (ret < 0) {
                        goto error_put;
                }
@@ -1951,17 +1899,16 @@ int viewer_get_next_index(struct relay_connection *conn)
                 * per-pid buffers) and a clear command has been performed.
                 */
                status = lttng_trace_chunk_open_fs_handle(
-                               vstream->stream_file.trace_chunk,
-                               file_path, O_RDONLY, 0, &fs_handle, true);
+                       vstream->stream_file.trace_chunk, file_path, O_RDONLY, 0, &fs_handle, true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-                       if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
-                                       rstream->closed) {
+                       if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && rstream->closed) {
                                viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
                                DBG("Cannot find trace chunk file and stream is closed for stream id %" PRIu64
-                                       ", returning status=%s",
-                                       (uint64_t) be64toh(request_index.stream_id),
-                                       lttng_viewer_next_index_return_code_str(
-                                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
+                                   ", returning status=%s",
+                                   (uint64_t) be64toh(request_index.stream_id),
+                                   lttng_viewer_next_index_return_code_str(
+                                           (enum lttng_viewer_next_index_return_code)
+                                                   viewer_index.status));
                                goto send_reply;
                        }
                        PERROR("Failed to open trace file for viewer stream");
@@ -1970,35 +1917,20 @@ int viewer_get_next_index(struct relay_connection *conn)
                vstream->stream_file.handle = fs_handle;
        }
 
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               ERR("Error checking for new streams before sending new index to stream id %" PRIu64
-                       ", returning status=%s",
-                       (uint64_t) be64toh(request_index.stream_id),
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
-               goto send_reply;
-       } else if (ret == 1) {
-               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-       }
-
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
                viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               ERR("Relay error reading index file for stream id %" PRIu64
-                       ", returning status=%s",
-                       (uint64_t) be64toh(request_index.stream_id),
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               ERR("Relay error reading index file for stream id %" PRIu64 ", returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        } else {
                viewer_index.status = LTTNG_VIEWER_INDEX_OK;
-               DBG("Read index file for stream id %" PRIu64
-                       ", returning status=%s",
-                       (uint64_t) be64toh(request_index.stream_id),
-                       lttng_viewer_next_index_return_code_str(
-                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               DBG("Read index file for stream id %" PRIu64 ", returning status=%s",
+                   (uint64_t) be64toh(request_index.stream_id),
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
                vstream->index_sent_seqcount++;
        }
 
@@ -2006,8 +1938,8 @@ int viewer_get_next_index(struct relay_connection *conn)
         * Indexes are stored in big endian, no need to switch before sending.
         */
        DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
-               rstream->stream_handle,
-               (uint64_t) be64toh(packet_index.offset));
+           rstream->stream_handle,
+           (uint64_t) be64toh(packet_index.offset));
        viewer_index.offset = packet_index.offset;
        viewer_index.packet_size = packet_index.packet_size;
        viewer_index.content_size = packet_index.content_size;
@@ -2024,18 +1956,21 @@ send_reply:
 
        if (metadata_viewer_stream) {
                pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
-               DBG("get next index metadata check: recv %" PRIu64
-                               " sent %" PRIu64,
-                       metadata_viewer_stream->stream->metadata_received,
-                       metadata_viewer_stream->metadata_sent);
+               DBG("get next index metadata check: recv %" PRIu64 " sent %" PRIu64,
+                   metadata_viewer_stream->stream->metadata_received,
+                   metadata_viewer_stream->metadata_sent);
                if (!metadata_viewer_stream->stream->metadata_received ||
-                               metadata_viewer_stream->stream->metadata_received >
-                                       metadata_viewer_stream->metadata_sent) {
+                   metadata_viewer_stream->stream->metadata_received >
+                           metadata_viewer_stream->metadata_sent) {
                        viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
                }
                pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
        }
 
+       if (attached_sessions_have_new_streams) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        viewer_index.flags = htobe32(viewer_index.flags);
        viewer_index.status = htobe32(viewer_index.status);
        health_code_update();
@@ -2048,8 +1983,8 @@ send_reply:
 
        if (vstream) {
                DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-                               vstream->index_sent_seqcount,
-                               vstream->stream->stream_handle);
+                   vstream->index_sent_seqcount,
+                   vstream->stream->stream_handle);
        }
 end:
        if (metadata_viewer_stream) {
@@ -2075,15 +2010,14 @@ error_put:
  *
  * Return 0 on success or else a negative value.
  */
-static
-int viewer_get_packet(struct relay_connection *conn)
+static int viewer_get_packet(struct relay_connection *conn)
 {
        int ret;
        off_t lseek_ret;
-       char *reply = NULL;
+       char *reply = nullptr;
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply_header;
-       struct relay_viewer_stream *vstream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
        uint32_t reply_size = sizeof(reply_header);
        uint32_t packet_data_len = 0;
        ssize_t read_len;
@@ -2092,8 +2026,7 @@ int viewer_get_packet(struct relay_connection *conn)
 
        health_code_update();
 
-       ret = recv_request(conn->sock, &get_packet_info,
-                       sizeof(get_packet_info));
+       ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info));
        if (ret < 0) {
                goto end;
        }
@@ -2106,42 +2039,44 @@ int viewer_get_packet(struct relay_connection *conn)
        vstream = viewer_stream_get_by_id(stream_id);
        if (!vstream) {
                get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
-               DBG("Client requested packet of unknown stream id %" PRIu64
-                       ", returning status=%s", stream_id,
-                       lttng_viewer_get_packet_return_code_str(get_packet_status));
+               DBG("Client requested packet of unknown stream id %" PRIu64 ", returning status=%s",
+                   stream_id,
+                   lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto send_reply_nolock;
        } else {
                packet_data_len = be32toh(get_packet_info.len);
                reply_size += packet_data_len;
        }
 
-       reply = (char *) zmalloc(reply_size);
+       reply = zmalloc<char>(reply_size);
        if (!reply) {
                get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Falled to allocate reply, returning status=%s",
-                       lttng_viewer_get_packet_return_code_str(get_packet_status));
+                      lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       lseek_ret = fs_handle_seek(vstream->stream_file.handle,
-                       be64toh(get_packet_info.offset), SEEK_SET);
+       lseek_ret = fs_handle_seek(
+               vstream->stream_file.handle, be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
                get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Failed to seek file system handle of viewer stream %" PRIu64
-                      " to offset %" PRIu64", returning status=%s", stream_id,
-                       (uint64_t) be64toh(get_packet_info.offset),
-                       lttng_viewer_get_packet_return_code_str(get_packet_status));
+                      " to offset %" PRIu64 ", returning status=%s",
+                      stream_id,
+                      (uint64_t) be64toh(get_packet_info.offset),
+                      lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
-       read_len = fs_handle_read(vstream->stream_file.handle,
-                       reply + sizeof(reply_header), packet_data_len);
+       read_len = fs_handle_read(
+               vstream->stream_file.handle, reply + sizeof(reply_header), packet_data_len);
        if (read_len < packet_data_len) {
                get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
-                      ", offset: %" PRIu64 ", returning status=%s", stream_id,
+                      ", offset: %" PRIu64 ", returning status=%s",
+                      stream_id,
                       (uint64_t) be64toh(get_packet_info.offset),
-                       lttng_viewer_get_packet_return_code_str(get_packet_status));
+                      lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
 
@@ -2167,8 +2102,7 @@ send_reply_nolock:
                ret = send_response(conn->sock, reply, reply_size);
        } else {
                /* No reply to send. */
-               ret = send_response(conn->sock, &reply_header,
-                               reply_size);
+               ret = send_response(conn->sock, &reply_header, reply_size);
        }
 
        health_code_update();
@@ -2193,17 +2127,17 @@ end:
  *
  * Return 0 on success else a negative value.
  */
-static
-int viewer_get_metadata(struct relay_connection *conn)
+static int viewer_get_metadata(struct relay_connection *conn)
 {
        int ret = 0;
        int fd = -1;
        ssize_t read_len;
        uint64_t len = 0;
-       char *data = NULL;
+       char *data = nullptr;
        struct lttng_viewer_get_metadata request;
        struct lttng_viewer_metadata_packet reply;
-       struct relay_viewer_stream *vstream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
+       bool dispose_of_stream = false;
 
        LTTNG_ASSERT(conn);
 
@@ -2228,10 +2162,13 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * find it.
                 */
                DBG("Client requested metadata of unknown stream id %" PRIu64,
-                               (uint64_t) be64toh(request.stream_id));
+                   (uint64_t) be64toh(request.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
+
+       pthread_mutex_lock(&vstream->stream->trace->session->lock);
+       pthread_mutex_lock(&vstream->stream->trace->lock);
        pthread_mutex_lock(&vstream->stream->lock);
        if (!vstream->stream->is_metadata) {
                ERR("Invalid metadata stream");
@@ -2240,11 +2177,7 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        if (vstream->metadata_sent >= vstream->stream->metadata_received) {
                /*
-                * The live viewers expect to receive a NO_NEW_METADATA
-                * status before a stream disappears, otherwise they abort the
-                * entire live connection when receiving an error status.
-                *
-                * Clear feature resets the metadata_sent to 0 until the
+                * Clear feature resets the metadata_received to 0 until the
                 * same metadata is received again.
                 */
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
@@ -2252,33 +2185,18 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * The live viewer considers a closed 0 byte metadata stream as
                 * an error.
                 */
-               if (vstream->metadata_sent > 0) {
-                       if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
-                               /*
-                                * Release ownership for the viewer metadata
-                                * stream. Note that this reference is the
-                                * viewer's reference. The vstream still exists
-                                * until the end of the function as
-                                * viewer_stream_get_by_id() took a reference.
-                                */
-                               viewer_stream_put(vstream);
-                       }
-
-                       vstream->stream->no_new_metadata_notified = true;
-               }
+               dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed;
                goto send_reply;
        }
 
        if (vstream->stream->trace_chunk &&
-                       !lttng_trace_chunk_ids_equal(
-                               conn->viewer_session->current_trace_chunk,
-                               vstream->stream->trace_chunk)) {
+           !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+                                        vstream->stream->trace_chunk)) {
                /* A rotation has occurred on the relay stream. */
                DBG("Metadata relay stream and viewer chunk ids differ");
 
-               ret = viewer_session_set_trace_chunk_copy(
-                               conn->viewer_session,
-                               vstream->stream->trace_chunk);
+               ret = viewer_session_set_trace_chunk_copy(conn->viewer_session,
+                                                         vstream->stream->trace_chunk);
                if (ret) {
                        reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                        goto send_reply;
@@ -2286,30 +2204,42 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
 
        if (conn->viewer_session->current_trace_chunk &&
-                       !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
-                                       vstream->stream_file.trace_chunk)) {
+           !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+                                        vstream->stream_file.trace_chunk)) {
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
-                               "vsession chunk %p vstream chunk %p",
-                               conn->viewer_session->current_trace_chunk,
-                               vstream->stream_file.trace_chunk);
+                   "vsession chunk %p vstream chunk %p",
+                   conn->viewer_session->current_trace_chunk,
+                   vstream->stream_file.trace_chunk);
                lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
-               acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
+               acquired_reference =
+                       lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
                LTTNG_ASSERT(acquired_reference);
-               vstream->stream_file.trace_chunk =
-                       conn->viewer_session->current_trace_chunk;
+               vstream->stream_file.trace_chunk = conn->viewer_session->current_trace_chunk;
                viewer_stream_close_files(vstream);
        }
 
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
        if (!vstream->stream_file.trace_chunk) {
+               if (vstream->stream->trace->session->connection_closed) {
+                       /*
+                        * If the connection is closed, there is no way for the metadata stream
+                        * to ever transition back to an active chunk. As such, signal to the viewer
+                        * that there is no new metadata available.
+                        *
+                        * The stream can be disposed-of. On the next execution of this command,
+                        * the relay daemon will reply with an error status since the stream can't
+                        * be found.
+                        */
+                       dispose_of_stream = true;
+               }
+
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                len = 0;
                goto send_reply;
-       } else if (vstream->stream_file.trace_chunk &&
-                       !vstream->stream_file.handle && len > 0) {
+       } else if (vstream->stream_file.trace_chunk && !vstream->stream_file.handle && len > 0) {
                /*
                 * Either this is the first time the metadata file is read, or a
                 * rotation of the corresponding relay stream has occurred.
@@ -2320,9 +2250,12 @@ int viewer_get_metadata(struct relay_connection *conn)
                struct relay_stream *rstream = vstream->stream;
 
                ret = utils_stream_file_path(rstream->path_name,
-                               rstream->channel_name, rstream->tracefile_size,
-                               vstream->current_tracefile_id, NULL, file_path,
-                               sizeof(file_path));
+                                            rstream->channel_name,
+                                            rstream->tracefile_size,
+                                            vstream->current_tracefile_id,
+                                            nullptr,
+                                            file_path,
+                                            sizeof(file_path));
                if (ret < 0) {
                        goto error;
                }
@@ -2333,8 +2266,7 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * per-pid buffers) and a clear command has been performed.
                 */
                status = lttng_trace_chunk_open_fs_handle(
-                               vstream->stream_file.trace_chunk,
-                               file_path, O_RDONLY, 0, &fs_handle, true);
+                       vstream->stream_file.trace_chunk, file_path, O_RDONLY, 0, &fs_handle, true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
                                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
@@ -2365,12 +2297,12 @@ int viewer_get_metadata(struct relay_connection *conn)
                         * safe to assume that
                         * `metadata_received` > `metadata_sent`.
                         */
-                       const off_t seek_ret = fs_handle_seek(fs_handle,
-                                       vstream->metadata_sent, SEEK_SET);
+                       const off_t seek_ret =
+                               fs_handle_seek(fs_handle, vstream->metadata_sent, SEEK_SET);
 
                        if (seek_ret < 0) {
                                PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64,
-                                               vstream->metadata_sent);
+                                      vstream->metadata_sent);
                                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                                goto send_reply;
                        }
@@ -2378,7 +2310,7 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
 
        reply.len = htobe64(len);
-       data = (char *) zmalloc(len);
+       data = zmalloc<char>(len);
        if (!data) {
                PERROR("viewer metadata zmalloc");
                goto error;
@@ -2407,12 +2339,12 @@ int viewer_get_metadata(struct relay_connection *conn)
                         * attempt to parse an incomplete (incoherent) metadata
                         * stream, which would result in an error.
                         */
-                       const off_t seek_ret = fs_handle_seek(
-                                       vstream->stream_file.handle, -read_len,
-                                       SEEK_CUR);
+                       const off_t seek_ret =
+                               fs_handle_seek(vstream->stream_file.handle, -read_len, SEEK_CUR);
 
                        DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd",
-                                       len, read_len);
+                           len,
+                           read_len);
                        read_len = 0;
                        len = 0;
                        if (seek_ret < 0) {
@@ -2434,6 +2366,8 @@ send_reply:
        health_code_update();
        if (vstream) {
                pthread_mutex_unlock(&vstream->stream->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->session->lock);
        }
        ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
@@ -2448,8 +2382,9 @@ send_reply:
                }
        }
 
-       DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
-                       (uint64_t) be64toh(request.stream_id));
+       DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64,
+           len,
+           (uint64_t) be64toh(request.stream_id));
 
        DBG("Metadata sent");
 
@@ -2458,7 +2393,22 @@ end_free:
 end:
        if (vstream) {
                viewer_stream_put(vstream);
+               if (dispose_of_stream) {
+                       /*
+                        * Trigger the destruction of the viewer stream
+                        * by releasing its global reference.
+                        *
+                        * The live viewers expect to receive a NO_NEW_METADATA
+                        * status before a stream disappears, otherwise they abort the
+                        * entire live connection when receiving an error status.
+                        *
+                        * On the next query for this stream, an error will be reported to the
+                        * client.
+                        */
+                       viewer_stream_put(vstream);
+               }
        }
+
        return ret;
 }
 
@@ -2467,8 +2417,7 @@ end:
  *
  * Return 0 on success or else a negative value.
  */
-static
-int viewer_create_session(struct relay_connection *conn)
+static int viewer_create_session(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_create_session_response resp;
@@ -2500,13 +2449,12 @@ end:
  *
  * Return 0 on success or else a negative value.
  */
-static
-int viewer_detach_session(struct relay_connection *conn)
+static int viewer_detach_session(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_detach_session_response response;
        struct lttng_viewer_detach_session_request request;
-       struct relay_session *session = NULL;
+       struct relay_session *session = nullptr;
        uint64_t viewer_session_to_close;
 
        LTTNG_ASSERT(conn);
@@ -2533,8 +2481,7 @@ int viewer_detach_session(struct relay_connection *conn)
 
        session = session_get_by_id(be64toh(request.session_id));
        if (!session) {
-               DBG("Relay session %" PRIu64 " not found",
-                               (uint64_t) be64toh(request.session_id));
+               DBG("Relay session %" PRIu64 " not found", (uint64_t) be64toh(request.session_id));
                response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
                goto send_reply;
        }
@@ -2569,8 +2516,7 @@ end:
 /*
  * live_relay_unknown_command: send -1 if received unknown command
  */
-static
-void live_relay_unknown_command(struct relay_connection *conn)
+static void live_relay_unknown_command(struct relay_connection *conn)
 {
        struct lttcomm_relayd_generic_reply reply;
 
@@ -2582,13 +2528,10 @@ void live_relay_unknown_command(struct relay_connection *conn)
 /*
  * Process the commands received on the control socket
  */
-static
-int process_control(struct lttng_viewer_cmd *recv_hdr,
-               struct relay_connection *conn)
+static int process_control(struct lttng_viewer_cmd *recv_hdr, struct relay_connection *conn)
 {
        int ret = 0;
-       lttng_viewer_command cmd =
-                       (lttng_viewer_command) be32toh(recv_hdr->cmd);
+       lttng_viewer_command cmd = (lttng_viewer_command) be32toh(recv_hdr->cmd);
 
        /*
         * Make sure we've done the version check before any command other then
@@ -2596,13 +2539,15 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
         */
        if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
                ERR("Viewer on connection %d requested %s command before version check",
-                       conn->sock->fd, lttng_viewer_command_str(cmd));
+                   conn->sock->fd,
+                   lttng_viewer_command_str(cmd));
                ret = -1;
                goto end;
        }
 
        DBG("Processing %s viewer command from connection %d",
-                       lttng_viewer_command_str(cmd), conn->sock->fd);
+           lttng_viewer_command_str(cmd),
+           conn->sock->fd);
 
        switch (cmd) {
        case LTTNG_VIEWER_CONNECT:
@@ -2633,8 +2578,7 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
                ret = viewer_detach_session(conn);
                break;
        default:
-               ERR("Received unknown viewer command (%u)",
-                               be32toh(recv_hdr->cmd));
+               ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
                live_relay_unknown_command(conn);
                ret = -1;
                goto end;
@@ -2644,15 +2588,14 @@ end:
        return ret;
 }
 
-static
-void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 {
        int ret;
 
        (void) lttng_poll_del(events, pollfd);
 
-       ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
-                       fd_tracker_util_close_fd, NULL);
+       ret = fd_tracker_close_unsuspendable_fd(
+               the_fd_tracker, &pollfd, 1, fd_tracker_util_close_fd, nullptr);
        if (ret < 0) {
                ERR("Closing pollfd %d", pollfd);
        }
@@ -2661,8 +2604,7 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 /*
  * This thread does the actual work
  */
-static
-void *thread_worker(void *data __attribute__((unused)))
+static void *thread_worker(void *data __attribute__((unused)))
 {
        int ret, err = -1;
        uint32_t nb_fd;
@@ -2688,8 +2630,7 @@ void *thread_worker(void *data __attribute__((unused)))
                goto viewer_connections_ht_error;
        }
 
-       ret = create_named_thread_poll_set(&events, 2,
-                       "Live viewer worker thread epoll");
+       ret = create_named_thread_poll_set(&events, 2, "Live viewer worker thread epoll");
        if (ret < 0) {
                goto error_poll_create;
        }
@@ -2700,7 +2641,7 @@ void *thread_worker(void *data __attribute__((unused)))
        }
 
 restart:
-       while (1) {
+       while (true) {
                int i;
 
                health_code_update();
@@ -2729,14 +2670,14 @@ restart:
                 */
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
@@ -2747,13 +2688,14 @@ restart:
                                        struct relay_connection *conn;
 
                                        ret = lttng_read(live_conn_pipe[0],
-                                                       &conn, sizeof(conn));
+                                                        &conn,
+                                                        sizeof(conn)); /* NOLINT sizeof used on a
+                                                                          pointer. */
                                        if (ret < 0) {
                                                goto error;
                                        }
-                                       ret = lttng_poll_add(&events,
-                                                       conn->sock->fd,
-                                                       LPOLLIN | LPOLLRDHUP);
+                                       ret = lttng_poll_add(
+                                               &events, conn->sock->fd, LPOLLIN | LPOLLRDHUP);
                                        if (ret) {
                                                ERR("Failed to add new live connection file descriptor to poll set");
                                                goto error;
@@ -2764,7 +2706,9 @@ restart:
                                        ERR("Relay live pipe error");
                                        goto error;
                                } else {
-                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       ERR("Unexpected poll events %u for sock %d",
+                                           revents,
+                                           pollfd);
                                        goto error;
                                }
                        } else {
@@ -2777,8 +2721,8 @@ restart:
                                }
 
                                if (revents & LPOLLIN) {
-                                       ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
-                                                       sizeof(recv_hdr), 0);
+                                       ret = conn->sock->ops->recvmsg(
+                                               conn->sock, &recv_hdr, sizeof(recv_hdr), 0);
                                        if (ret <= 0) {
                                                /* Connection closed. */
                                                cleanup_connection_pollfd(&events, pollfd);
@@ -2792,7 +2736,8 @@ restart:
                                                        cleanup_connection_pollfd(&events, pollfd);
                                                        /* Put "create" ownership reference. */
                                                        connection_put(conn);
-                                                       DBG("Viewer connection closed with %d", pollfd);
+                                                       DBG("Viewer connection closed with %d",
+                                                           pollfd);
                                                }
                                        }
                                } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
@@ -2800,7 +2745,9 @@ restart:
                                        /* Put "create" ownership reference. */
                                        connection_put(conn);
                                } else {
-                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       ERR("Unexpected poll events %u for sock %d",
+                                           revents,
+                                           pollfd);
                                        connection_put(conn);
                                        goto error;
                                }
@@ -2815,14 +2762,15 @@ error:
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 
        /* Cleanup remaining connection object. */
-       rcu_read_lock();
-       cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter,
-                       destroy_conn,
-                       sock_n.node) {
-               health_code_update();
-               connection_put(destroy_conn);
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (
+                       viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+                       health_code_update();
+                       connection_put(destroy_conn);
+               }
        }
-       rcu_read_unlock();
 error_poll_create:
        lttng_ht_destroy(viewer_connections_ht);
 viewer_connections_ht_error:
@@ -2842,20 +2790,20 @@ error_testpoint:
                ERR("Error stopping threads");
        }
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 /*
  * Create the relay command pipe to wake thread_manage_apps.
  * Closed in cleanup().
  */
-static int create_conn_pipe(void)
+static int create_conn_pipe()
 {
-       return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
-                       "Live connection pipe", live_conn_pipe);
+       return fd_tracker_util_pipe_open_cloexec(
+               the_fd_tracker, "Live connection pipe", live_conn_pipe);
 }
 
-int relayd_live_join(void)
+int relayd_live_join()
 {
        int ret, retval = 0;
        void *status;
@@ -2928,8 +2876,10 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the dispatcher thread */
-       ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(),
-                       thread_dispatcher, (void *) NULL);
+       ret = pthread_create(&live_dispatcher_thread,
+                            default_pthread_attr(),
+                            thread_dispatcher,
+                            (void *) nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_create viewer dispatcher");
@@ -2938,8 +2888,7 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the worker thread */
-       ret = pthread_create(&live_worker_thread, default_pthread_attr(),
-                       thread_worker, NULL);
+       ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_create viewer worker");
@@ -2948,8 +2897,8 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the listener thread */
-       ret = pthread_create(&live_listener_thread, default_pthread_attr(),
-                       thread_listener, (void *) NULL);
+       ret = pthread_create(
+               &live_listener_thread, default_pthread_attr(), thread_listener, (void *) nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_create viewer listener");
This page took 0.095539 seconds and 4 git commands to generate.