relayd: add `lttcomm_relayd_command_str()`
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index 9cdc1c8d751d78b0876d1cd35c88026b72b4c6cf..46faea0310156a53ecb929198c169db342122ad3 100644 (file)
@@ -29,6 +29,7 @@
 #include <urcu/futex.h>
 #include <urcu/rculist.h>
 #include <urcu/uatomic.h>
+#include <string>
 
 #include <common/common.h>
 #include <common/compat/endian.h>
@@ -1602,6 +1603,9 @@ int viewer_get_next_index(struct relay_connection *conn)
        struct relay_stream *rstream = NULL;
        struct ctf_trace *ctf_trace = NULL;
        struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
+       uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
+       enum lttng_trace_chunk_status status;
 
        LTTNG_ASSERT(conn);
 
@@ -1632,6 +1636,13 @@ int viewer_get_next_index(struct relay_connection *conn)
        metadata_viewer_stream =
                        ctf_trace_get_viewer_metadata_stream(ctf_trace);
 
+       /*
+        * Hold the session lock to protect against concurrent changes
+        * to the chunk files (e.g. rename done by clear), which are
+        * protected by the session ongoing rotation state. Those are
+        * synchronized with the session lock.
+        */
+       pthread_mutex_lock(&rstream->trace->session->lock);
        pthread_mutex_lock(&rstream->lock);
 
        /*
@@ -1682,12 +1693,50 @@ int viewer_get_next_index(struct relay_connection *conn)
         * This allows clients to consume all the packets of a trace chunk
         * after a session's destruction.
         */
-       if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) &&
-                       !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
-               DBG("Viewer session and viewer stream chunk IDs differ: "
-                               "vsession chunk %p vstream chunk %p",
+       if (vstream->stream_file.trace_chunk) {
+               status = lttng_trace_chunk_get_id(
+                               vstream->stream_file.trace_chunk,
+                               &stream_file_chunk_id);
+               LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+       }
+       if (conn->viewer_session->current_trace_chunk) {
+               status = lttng_trace_chunk_get_id(
                                conn->viewer_session->current_trace_chunk,
-                               vstream->stream_file.trace_chunk);
+                               &viewer_session_chunk_id);
+               LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+       }
+
+       viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
+                       conn->viewer_session->current_trace_chunk,
+                       vstream->stream_file.trace_chunk);
+       viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
+                       vstream->last_seen_rotation_count + 1;
+
+       if (viewer_stream_and_session_in_same_chunk) {
+               DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+       } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
+               DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+       } else {
+               DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+
                viewer_stream_rotate_to_trace_chunk(vstream,
                                conn->viewer_session->current_trace_chunk);
                vstream->last_seen_rotation_count =
@@ -1732,7 +1781,6 @@ int viewer_get_next_index(struct relay_connection *conn)
         */
        if (!vstream->stream_file.handle) {
                char file_path[LTTNG_PATH_MAX];
-               enum lttng_trace_chunk_status status;
                struct fs_handle *fs_handle;
 
                ret = utils_stream_file_path(rstream->path_name,
@@ -1798,6 +1846,7 @@ int viewer_get_next_index(struct relay_connection *conn)
 send_reply:
        if (rstream) {
                pthread_mutex_unlock(&rstream->lock);
+               pthread_mutex_unlock(&rstream->trace->session->lock);
        }
 
        if (metadata_viewer_stream) {
@@ -1839,6 +1888,7 @@ end:
 
 error_put:
        pthread_mutex_unlock(&rstream->lock);
+       pthread_mutex_unlock(&rstream->trace->session->lock);
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
        }
This page took 0.024838 seconds and 4 git commands to generate.