Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index 786feaae83651acf86f3115b0cb0f3530552cf2a..079ac6b0b2a6e4d2e5a5ddb55c6244eb1c3d6c01 100644 (file)
 #include <common/fs-handle.hpp>
 #include <common/futex.hpp>
 #include <common/index/index.hpp>
+#include <common/make-unique-wrapper.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/sessiond-comm/inet.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 #include <common/utils.hpp>
 
@@ -236,29 +239,77 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
  */
 static int check_new_streams(struct relay_connection *conn)
 {
-       struct relay_session *session;
-       unsigned long current_val;
        int ret = 0;
 
        if (!conn->viewer_session) {
                goto end;
        }
-       rcu_read_lock();
-       cds_list_for_each_entry_rcu(
-               session, &conn->viewer_session->session_list, viewer_session_node)
-       {
+
+       for (auto *session :
+            lttng::urcu::rcu_list_iteration_adapter<relay_session,
+                                                    &relay_session::viewer_session_node>(
+                    conn->viewer_session->session_list)) {
                if (!session_get(session)) {
                        continue;
                }
-               current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-               ret = current_val;
+
+               ret = uatomic_read(&session->new_streams);
                session_put(session);
                if (ret == 1) {
                        goto end;
                }
        }
+
+end:
+       DBG("Viewer connection has%s new streams: socket_fd = %d",
+           ret == 0 ? " no" : "",
+           conn->sock->fd);
+       return ret;
+}
+
+/*
+ * Sends one viewer stream to the given socket.
+ *
+ * This function needs to be called with the stream locked.
+ *
+ * Return 0 on success, or else a negative value.
+ */
+static ssize_t send_one_viewer_stream(struct lttcomm_sock *sock,
+                                     struct relay_viewer_stream *vstream)
+{
+       struct ctf_trace *ctf_trace;
+       struct lttng_viewer_stream send_stream = {};
+       ssize_t ret = -1;
+
+       ASSERT_LOCKED(vstream->stream->lock);
+
+       ctf_trace = vstream->stream->trace;
+       send_stream.id = htobe64(vstream->stream->stream_handle);
+       send_stream.ctf_trace_id = htobe64(ctf_trace->id);
+       send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
+       if (lttng_strncpy(
+                   send_stream.path_name, vstream->path_name, sizeof(send_stream.path_name))) {
+               ret = -1; /* Error. */
+               goto end;
+       }
+       if (lttng_strncpy(send_stream.channel_name,
+                         vstream->channel_name,
+                         sizeof(send_stream.channel_name))) {
+               ret = -1; /* Error. */
+               goto end;
+       }
+
+       DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
+       vstream->sent_flag = true;
+
+       ret = send_response(sock, &send_stream, sizeof(send_stream));
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = 0;
+
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -268,19 +319,18 @@ end:
  *
  * Return 0 on success or else a negative value.
  */
-static ssize_t
-send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag)
+static ssize_t send_viewer_streams(struct lttcomm_sock *sock,
+                                  uint64_t session_id,
+                                  unsigned int ignore_sent_flag,
+                                  struct relay_viewer_session *viewer_session)
 {
        ssize_t ret;
-       struct lttng_ht_iter iter;
-       struct relay_viewer_stream *vstream;
-
-       rcu_read_lock();
-
-       cds_lfht_for_each_entry (viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {
-               struct ctf_trace *ctf_trace;
-               struct lttng_viewer_stream send_stream = {};
 
+       for (auto *vstream :
+            lttng::urcu::lfht_iteration_adapter<relay_viewer_stream,
+                                                decltype(relay_viewer_stream::stream_n),
+                                                &relay_viewer_stream::stream_n>(
+                    *viewer_streams_ht->ht)) {
                health_code_update();
 
                if (!viewer_stream_get(vstream)) {
@@ -296,42 +346,56 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int
                        continue;
                }
 
-               ctf_trace = vstream->stream->trace;
-               send_stream.id = htobe64(vstream->stream->stream_handle);
-               send_stream.ctf_trace_id = htobe64(ctf_trace->id);
-               send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
-               if (lttng_strncpy(send_stream.path_name,
-                                 vstream->path_name,
-                                 sizeof(send_stream.path_name))) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
+               ret = send_one_viewer_stream(sock, vstream);
+               pthread_mutex_unlock(&vstream->stream->lock);
+               if (ret < 0) {
                        viewer_stream_put(vstream);
-                       ret = -1; /* Error. */
-                       goto end_unlock;
+                       goto end;
                }
-               if (lttng_strncpy(send_stream.channel_name,
-                                 vstream->channel_name,
-                                 sizeof(send_stream.channel_name))) {
+
+               pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+               cds_list_del_rcu(&vstream->viewer_stream_node);
+               pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+               viewer_stream_put(vstream);
+       }
+
+       /*
+        * Any remaining streams that have been seen, but are perhaps unpublished
+        * due to a session being destroyed in between attach and get_new_streams.
+        */
+       for (auto *vstream :
+            lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+                                                    &relay_viewer_stream::viewer_stream_node>(
+                    viewer_session->unannounced_stream_list)) {
+               health_code_update();
+               if (!viewer_stream_get(vstream)) {
+                       continue;
+               }
+
+               pthread_mutex_lock(&vstream->stream->lock);
+               if (vstream->stream->trace->session->id != session_id) {
                        pthread_mutex_unlock(&vstream->stream->lock);
                        viewer_stream_put(vstream);
-                       ret = -1; /* Error. */
-                       goto end_unlock;
+                       continue;
                }
 
-               DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
-               vstream->sent_flag = true;
+               ret = send_one_viewer_stream(sock, vstream);
                pthread_mutex_unlock(&vstream->stream->lock);
-
-               ret = send_response(sock, &send_stream, sizeof(send_stream));
-               viewer_stream_put(vstream);
                if (ret < 0) {
-                       goto end_unlock;
+                       viewer_stream_put(vstream);
+                       goto end;
                }
+
+               pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+               cds_list_del_rcu(&vstream->viewer_stream_node);
+               viewer_stream_put(vstream);
+               pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+               viewer_stream_put(vstream);
        }
 
        ret = 0;
 
-end_unlock:
-       rcu_read_unlock();
+end:
        return ret;
 }
 
@@ -346,18 +410,15 @@ end_unlock:
  *
  * Return 0 on success or else a negative value.
  */
-static int make_viewer_streams(struct relay_session *relay_session,
-                              struct relay_viewer_session *viewer_session,
-                              enum lttng_viewer_seek seek_t,
-                              uint32_t *nb_total,
-                              uint32_t *nb_unsent,
-                              uint32_t *nb_created,
-                              bool *closed)
+int make_viewer_streams(struct relay_session *relay_session,
+                       struct relay_viewer_session *viewer_session,
+                       enum lttng_viewer_seek seek_t,
+                       unsigned int *nb_total,
+                       unsigned int *nb_unsent,
+                       unsigned int *nb_created,
+                       bool *closed)
 {
        int ret;
-       struct lttng_ht_iter iter;
-       struct ctf_trace *ctf_trace;
-       struct relay_stream *relay_stream = nullptr;
 
        LTTNG_ASSERT(relay_session);
        ASSERT_LOCKED(relay_session->lock);
@@ -366,32 +427,85 @@ static int make_viewer_streams(struct relay_session *relay_session,
                *closed = true;
        }
 
+       /*
+        * Check unannounced viewer streams for any that have been seen but are no longer published.
+        */
+       for (auto *viewer_stream :
+            lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+                                                    &relay_viewer_stream::viewer_stream_node>(
+                    viewer_session->unannounced_stream_list)) {
+               if (!viewer_stream_get(viewer_stream)) {
+                       DBG("Couldn't get reference for viewer_stream");
+                       continue;
+               }
+
+               if (viewer_stream->sent_flag) {
+                       ERR("logic error -> viewer stream %ld is in unannounced_stream_list is marked as sent",
+                           viewer_stream->stream->stream_handle);
+                       abort();
+               }
+
+               if (viewer_stream->stream->published) {
+                       /*
+                        * This stream should be handled later when iterating via the
+                        * ctf_traces
+                        */
+                       viewer_stream_put(viewer_stream);
+                       continue;
+               }
+
+               if (viewer_stream->stream->trace->session->id != relay_session->id) {
+                       viewer_stream_put(viewer_stream);
+                       continue;
+               }
+
+               if (nb_unsent) {
+                       (*nb_unsent)++;
+               }
+
+               if (nb_total) {
+                       (*nb_total)++;
+               }
+
+               viewer_stream_put(viewer_stream);
+       }
+
        /*
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (
-               relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) {
+       for (auto *raw_ctf_trace : lttng::urcu::
+                    lfht_iteration_adapter<ctf_trace, decltype(ctf_trace::node), &ctf_trace::node>(
+                            *relay_session->ctf_traces_ht->ht)) {
                bool trace_has_metadata_stream = false;
 
                health_code_update();
 
-               if (!ctf_trace_get(ctf_trace)) {
+               if (!ctf_trace_get(raw_ctf_trace)) {
                        continue;
                }
 
+               auto ctf_trace =
+                       lttng::make_unique_wrapper<struct ctf_trace, ctf_trace_put>(raw_ctf_trace);
+               /*
+                * The trace metadata state may be updated while iterating over all the
+                * relay streams associated with the trace, so the lock is required.
+                */
+               const lttng::pthread::lock_guard ctf_trace_lock(ctf_trace->lock);
+
                /*
                 * Iterate over all the streams of the trace to see if we have a
                 * metadata stream.
                 */
-               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
-               {
+               for (auto *stream :
+                    lttng::urcu::rcu_list_iteration_adapter<relay_stream,
+                                                            &relay_stream::stream_node>(
+                            ctf_trace->stream_list)) {
                        bool is_metadata_stream;
 
-                       pthread_mutex_lock(&relay_stream->lock);
-                       is_metadata_stream = relay_stream->is_metadata;
-                       pthread_mutex_unlock(&relay_stream->lock);
+                       pthread_mutex_lock(&stream->lock);
+                       is_metadata_stream = stream->is_metadata;
+                       pthread_mutex_unlock(&stream->lock);
 
                        if (is_metadata_stream) {
                                trace_has_metadata_stream = true;
@@ -399,34 +513,37 @@ static int make_viewer_streams(struct relay_session *relay_session,
                        }
                }
 
-               relay_stream = nullptr;
-
                /*
                 * If there is no metadata stream in this trace at the moment
                 * and we never sent one to the viewer, skip the trace. We
                 * accept that the viewer will not see this trace at all.
                 */
                if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) {
-                       ctf_trace_put(ctf_trace);
                        continue;
                }
 
-               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
-               {
+               for (auto *raw_stream :
+                    lttng::urcu::rcu_list_iteration_adapter<relay_stream,
+                                                            &relay_stream::stream_node>(
+                            ctf_trace->stream_list)) {
                        struct relay_viewer_stream *viewer_stream;
 
-                       if (!stream_get(relay_stream)) {
+                       if (!stream_get(raw_stream)) {
                                continue;
                        }
 
-                       pthread_mutex_lock(&relay_stream->lock);
+                       auto stream =
+                               lttng::make_unique_wrapper<relay_stream, stream_put>(raw_stream);
+                       raw_stream = nullptr;
+
+                       const lttng::pthread::lock_guard stream_lock(stream->lock);
                        /*
                         * stream published is protected by the session lock.
                         */
-                       if (!relay_stream->published) {
-                               goto next;
+                       if (!stream->published) {
+                               continue;
                        }
-                       viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle);
+                       viewer_stream = viewer_stream_get_by_id(stream->stream_handle);
                        if (!viewer_stream) {
                                struct lttng_trace_chunk *viewer_stream_trace_chunk = nullptr;
 
@@ -435,7 +552,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * viewer. So that we know what trace the viewer
                                 * is aware of.
                                 */
-                               if (relay_stream->is_metadata) {
+                               if (stream->is_metadata) {
                                        ctf_trace->metadata_stream_sent_to_viewer = true;
                                }
 
@@ -447,34 +564,32 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * Otherwise, the viewer session's current trace
                                 * chunk can be used safely.
                                 */
-                               if ((relay_stream->ongoing_rotation.is_set ||
+                               if ((stream->ongoing_rotation.is_set ||
                                     session_has_ongoing_rotation(relay_session)) &&
-                                   relay_stream->trace_chunk) {
+                                   stream->trace_chunk) {
                                        viewer_stream_trace_chunk =
-                                               lttng_trace_chunk_copy(relay_stream->trace_chunk);
+                                               lttng_trace_chunk_copy(stream->trace_chunk);
                                        if (!viewer_stream_trace_chunk) {
                                                ret = -1;
-                                               ctf_trace_put(ctf_trace);
-                                               goto error_unlock;
+                                               goto end;
                                        }
                                } else {
                                        /*
-                                        * Transition the viewer session into the newest trace chunk
-                                        * available.
+                                        * Transition the viewer session into the newest
+                                        * trace chunk available.
                                         */
                                        if (!lttng_trace_chunk_ids_equal(
                                                    viewer_session->current_trace_chunk,
-                                                   relay_stream->trace_chunk)) {
+                                                   stream->trace_chunk)) {
                                                ret = viewer_session_set_trace_chunk_copy(
-                                                       viewer_session, relay_stream->trace_chunk);
+                                                       viewer_session, stream->trace_chunk);
                                                if (ret) {
                                                        ret = -1;
-                                                       ctf_trace_put(ctf_trace);
-                                                       goto error_unlock;
+                                                       goto end;
                                                }
                                        }
 
-                                       if (relay_stream->trace_chunk) {
+                                       if (stream->trace_chunk) {
                                                /*
                                                 * If the corresponding relay
                                                 * stream's trace chunk is set,
@@ -498,13 +613,29 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                }
 
                                viewer_stream = viewer_stream_create(
-                                       relay_stream, viewer_stream_trace_chunk, seek_t);
+                                       stream.get(), viewer_stream_trace_chunk, seek_t);
                                lttng_trace_chunk_put(viewer_stream_trace_chunk);
                                viewer_stream_trace_chunk = nullptr;
                                if (!viewer_stream) {
                                        ret = -1;
-                                       ctf_trace_put(ctf_trace);
-                                       goto error_unlock;
+                                       goto end;
+                               }
+
+                               /*
+                                * Add the new stream to the list of streams to publish for
+                                * this session.
+                                */
+                               pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+                               cds_list_add_rcu(&viewer_stream->viewer_stream_node,
+                                                &viewer_session->unannounced_stream_list);
+                               pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+                               /*
+                                * Get for the unannounced stream list, this should be
+                                * put when the unannounced stream is sent.
+                                */
+                               if (!viewer_stream_get(viewer_stream)) {
+                                       ERR("Unable to get self-reference on viewer stream");
+                                       abort();
                                }
 
                                if (nb_created) {
@@ -527,40 +658,28 @@ static int make_viewer_streams(struct relay_session *relay_session,
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               if (relay_stream->is_metadata) {
-                                       if (!relay_stream->closed ||
-                                           relay_stream->metadata_received >
+                               if (stream->is_metadata) {
+                                       if (!stream->closed ||
+                                           stream->metadata_received >
                                                    viewer_stream->metadata_sent) {
                                                (*nb_total)++;
                                        }
                                } else {
-                                       if (!relay_stream->closed ||
-                                           !(((int64_t) (relay_stream->prev_data_seq -
-                                                         relay_stream->last_net_seq_num)) >= 0)) {
+                                       if (!stream->closed ||
+                                           !(((int64_t) (stream->prev_data_seq -
+                                                         stream->last_net_seq_num)) >= 0)) {
                                                (*nb_total)++;
                                        }
                                }
                        }
                        /* Put local reference. */
                        viewer_stream_put(viewer_stream);
-               next:
-                       pthread_mutex_unlock(&relay_stream->lock);
-                       stream_put(relay_stream);
                }
-               relay_stream = nullptr;
-               ctf_trace_put(ctf_trace);
        }
 
        ret = 0;
 
-error_unlock:
-       rcu_read_unlock();
-
-       if (relay_stream) {
-               pthread_mutex_unlock(&relay_stream->lock);
-               stream_put(relay_stream);
-       }
-
+end:
        return ret;
 }
 
@@ -1031,8 +1150,6 @@ static int viewer_list_sessions(struct relay_connection *conn)
 {
        int ret = 0;
        struct lttng_viewer_list_sessions session_list;
-       struct lttng_ht_iter iter;
-       struct relay_session *session;
        struct lttng_viewer_session *send_session_buf = nullptr;
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
@@ -1042,8 +1159,10 @@ static int viewer_list_sessions(struct relay_connection *conn)
                return -1;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) {
+       for (auto *session :
+            lttng::urcu::lfht_iteration_adapter<relay_session,
+                                                decltype(relay_session::session_n),
+                                                &relay_session::session_n>(*sessions_ht->ht)) {
                struct lttng_viewer_session *send_session;
 
                health_code_update();
@@ -1056,7 +1175,7 @@ static int viewer_list_sessions(struct relay_connection *conn)
 
                if (count >= buf_count) {
                        struct lttng_viewer_session *newbuf;
-                       uint32_t new_buf_count = buf_count << 1;
+                       const uint32_t new_buf_count = buf_count << 1;
 
                        newbuf = (lttng_viewer_session *) realloc(
                                send_session_buf, new_buf_count * sizeof(*send_session_buf));
@@ -1096,7 +1215,7 @@ static int viewer_list_sessions(struct relay_connection *conn)
                pthread_mutex_unlock(&session->lock);
                break;
        }
-       rcu_read_unlock();
+
        if (ret < 0) {
                goto end_free;
        }
@@ -1130,7 +1249,7 @@ end_free:
 static int viewer_get_new_streams(struct relay_connection *conn)
 {
        int ret, send_streams = 0;
-       uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
+       unsigned int nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
        struct lttng_viewer_new_streams_request request;
        struct lttng_viewer_new_streams_response response;
        struct relay_session *session = nullptr;
@@ -1199,11 +1318,13 @@ static int viewer_get_new_streams(struct relay_connection *conn)
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply_unlock;
        }
+
+       uatomic_set(&session->new_streams, 0);
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
        /* Only send back the newly created streams with the unsent ones. */
-       nb_streams = nb_created + nb_unsent;
+       nb_streams = nb_unsent + nb_created;
        response.streams_count = htobe32(nb_streams);
 
        /*
@@ -1241,7 +1362,7 @@ send_reply:
         * streams that were not sent from that point will be sent to
         * the viewer.
         */
-       ret = send_viewer_streams(conn->sock, session_id, 0);
+       ret = send_viewer_streams(conn->sock, session_id, 0, conn->viewer_session);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -1261,7 +1382,7 @@ static int viewer_attach_session(struct relay_connection *conn)
 {
        int send_streams = 0;
        ssize_t ret;
-       uint32_t nb_streams = 0;
+       unsigned int nb_streams = 0;
        enum lttng_viewer_seek seek_type;
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
@@ -1394,7 +1515,7 @@ send_reply:
        }
 
        /* Send stream and ignore the sent flag. */
-       ret = send_viewer_streams(conn->sock, session_id, 1);
+       ret = send_viewer_streams(conn->sock, session_id, 1, conn->viewer_session);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -1642,6 +1763,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
        bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
        uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
        enum lttng_trace_chunk_status status;
+       bool attached_sessions_have_new_streams = false;
 
        LTTNG_ASSERT(conn);
 
@@ -1693,6 +1815,17 @@ static int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       ret = check_new_streams(conn);
+       if (ret < 0) {
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams in the attached sessions, returning status=%s",
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               goto send_reply;
+       } else if (ret == 1) {
+               attached_sessions_have_new_streams = true;
+       }
+
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
@@ -1778,6 +1911,22 @@ static int viewer_get_next_index(struct relay_connection *conn)
                    conn->viewer_session->current_trace_chunk ?
                            std::to_string(viewer_session_chunk_id).c_str() :
                            "None");
+       } else if (vstream->stream_file.trace_chunk &&
+                  rstream->completed_rotation_count == vstream->last_seen_rotation_count &&
+                  !rstream->trace_chunk) {
+               /*
+                * When a relay stream is closed, there is a window before the rotation of the
+                * streams happens, during which the next index may be fetched. If the seen
+                * rotations are the same and the relay stream trace chunk is null, don't rotate.
+                * When the close finishes, the rotation count on the relay stream will go up.
+                */
+               DBG("Transition to latest chunk check (%s -> %s): relay stream chunk is null, but viewer stream knows a chunk and isn't yet behind a rotation",
+                   vstream->stream_file.trace_chunk ?
+                           std::to_string(stream_file_chunk_id).c_str() :
+                           "None",
+                   conn->viewer_session->current_trace_chunk ?
+                           std::to_string(viewer_session_chunk_id).c_str() :
+                           "None");
        } else {
                DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
                    vstream->stream_file.trace_chunk ?
@@ -1802,6 +1951,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
                 */
                goto send_reply;
        }
+
        /* At this point, ret is 0 thus we will be able to read the index. */
        LTTNG_ASSERT(!ret);
 
@@ -1811,7 +1961,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
                if (rstream->closed) {
                        viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
                        DBG("Cannot open index for stream id %" PRIu64
-                           "stream is closed, returning status=%s",
+                           " stream is closed, returning status=%s",
                            (uint64_t) be64toh(request_index.stream_id),
                            lttng_viewer_next_index_return_code_str(
                                    (enum lttng_viewer_next_index_return_code) viewer_index.status));
@@ -1880,19 +2030,6 @@ static int viewer_get_next_index(struct relay_connection *conn)
                vstream->stream_file.handle = fs_handle;
        }
 
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               ERR("Error checking for new streams before sending new index to stream id %" PRIu64
-                   ", returning status=%s",
-                   (uint64_t) be64toh(request_index.stream_id),
-                   lttng_viewer_next_index_return_code_str(
-                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
-               goto send_reply;
-       } else if (ret == 1) {
-               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-       }
-
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
                viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
@@ -1943,6 +2080,10 @@ send_reply:
                pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
        }
 
+       if (attached_sessions_have_new_streams) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        viewer_index.flags = htobe32(viewer_index.flags);
        viewer_index.status = htobe32(viewer_index.status);
        health_code_update();
@@ -2109,6 +2250,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
        struct lttng_viewer_get_metadata request;
        struct lttng_viewer_metadata_packet reply;
        struct relay_viewer_stream *vstream = nullptr;
+       bool dispose_of_stream = false;
 
        LTTNG_ASSERT(conn);
 
@@ -2137,6 +2279,9 @@ static int viewer_get_metadata(struct relay_connection *conn)
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
+
+       pthread_mutex_lock(&vstream->stream->trace->session->lock);
+       pthread_mutex_lock(&vstream->stream->trace->lock);
        pthread_mutex_lock(&vstream->stream->lock);
        if (!vstream->stream->is_metadata) {
                ERR("Invalid metadata stream");
@@ -2145,11 +2290,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
 
        if (vstream->metadata_sent >= vstream->stream->metadata_received) {
                /*
-                * The live viewers expect to receive a NO_NEW_METADATA
-                * status before a stream disappears, otherwise they abort the
-                * entire live connection when receiving an error status.
-                *
-                * Clear feature resets the metadata_sent to 0 until the
+                * Clear feature resets the metadata_received to 0 until the
                 * same metadata is received again.
                 */
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
@@ -2157,20 +2298,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
                 * The live viewer considers a closed 0 byte metadata stream as
                 * an error.
                 */
-               if (vstream->metadata_sent > 0) {
-                       if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
-                               /*
-                                * Release ownership for the viewer metadata
-                                * stream. Note that this reference is the
-                                * viewer's reference. The vstream still exists
-                                * until the end of the function as
-                                * viewer_stream_get_by_id() took a reference.
-                                */
-                               viewer_stream_put(vstream);
-                       }
-
-                       vstream->stream->no_new_metadata_notified = true;
-               }
+               dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed;
                goto send_reply;
        }
 
@@ -2194,7 +2322,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
-                   "vsession chunk %p vstream chunk %p",
+                   "vsession chunk %p vstream chunk=%p",
                    conn->viewer_session->current_trace_chunk,
                    vstream->stream_file.trace_chunk);
                lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
@@ -2208,6 +2336,19 @@ static int viewer_get_metadata(struct relay_connection *conn)
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
        if (!vstream->stream_file.trace_chunk) {
+               if (vstream->stream->trace->session->connection_closed) {
+                       /*
+                        * If the connection is closed, there is no way for the metadata stream
+                        * to ever transition back to an active chunk. As such, signal to the viewer
+                        * that there is no new metadata available.
+                        *
+                        * The stream can be disposed-of. On the next execution of this command,
+                        * the relay daemon will reply with an error status since the stream can't
+                        * be found.
+                        */
+                       dispose_of_stream = true;
+               }
+
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                len = 0;
                goto send_reply;
@@ -2338,6 +2479,8 @@ send_reply:
        health_code_update();
        if (vstream) {
                pthread_mutex_unlock(&vstream->stream->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->session->lock);
        }
        ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
@@ -2363,7 +2506,22 @@ end_free:
 end:
        if (vstream) {
                viewer_stream_put(vstream);
+               if (dispose_of_stream) {
+                       /*
+                        * Trigger the destruction of the viewer stream
+                        * by releasing its global reference.
+                        *
+                        * The live viewers expect to receive a NO_NEW_METADATA
+                        * status before a stream disappears, otherwise they abort the
+                        * entire live connection when receiving an error status.
+                        *
+                        * On the next query for this stream, an error will be reported to the
+                        * client.
+                        */
+                       viewer_stream_put(vstream);
+               }
        }
+
        return ret;
 }
 
@@ -2486,7 +2644,7 @@ static void live_relay_unknown_command(struct relay_connection *conn)
 static int process_control(struct lttng_viewer_cmd *recv_hdr, struct relay_connection *conn)
 {
        int ret = 0;
-       lttng_viewer_command cmd = (lttng_viewer_command) be32toh(recv_hdr->cmd);
+       const lttng_viewer_command cmd = (lttng_viewer_command) be32toh(recv_hdr->cmd);
 
        /*
         * Make sure we've done the version check before any command other then
@@ -2565,9 +2723,7 @@ static void *thread_worker(void *data __attribute__((unused)))
        uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttng_ht *viewer_connections_ht;
-       struct lttng_ht_iter iter;
        struct lttng_viewer_cmd recv_hdr;
-       struct relay_connection *destroy_conn;
 
        DBG("[thread] Live viewer relay worker started");
 
@@ -2717,12 +2873,15 @@ error:
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 
        /* Cleanup remaining connection object. */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+       for (auto *destroy_conn :
+            lttng::urcu::lfht_iteration_adapter<relay_connection,
+                                                decltype(relay_connection::sock_n),
+                                                &relay_connection::sock_n>(
+                    *viewer_connections_ht->ht)) {
                health_code_update();
                connection_put(destroy_conn);
        }
-       rcu_read_unlock();
+
 error_poll_create:
        lttng_ht_destroy(viewer_connections_ht);
 viewer_connections_ht_error:
This page took 0.035511 seconds and 4 git commands to generate.