Docs: relayd: live: clarify ownership of vstream after viewer release
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index c169be918f0b87891b8bc61785ff6610aa952f1a..d963a69b4c1870d901f5fa6fb9b2ba58efa8a811 100644 (file)
@@ -29,6 +29,7 @@
 #include <urcu/futex.h>
 #include <urcu/rculist.h>
 #include <urcu/uatomic.h>
+#include <string>
 
 #include <common/common.h>
 #include <common/compat/endian.h>
@@ -88,6 +89,33 @@ static uint64_t last_relay_viewer_session_id;
 static pthread_mutex_t last_relay_viewer_session_id_lock =
                PTHREAD_MUTEX_INITIALIZER;
 
+static
+const char *lttng_viewer_command_str(lttng_viewer_command cmd)
+{
+       switch (cmd) {
+       case LTTNG_VIEWER_CONNECT:
+               return "CONNECT";
+       case LTTNG_VIEWER_LIST_SESSIONS:
+               return "LIST_SESSIONS";
+       case LTTNG_VIEWER_ATTACH_SESSION:
+               return "ATTACH_SESSION";
+       case LTTNG_VIEWER_GET_NEXT_INDEX:
+               return "GET_NEXT_INDEX";
+       case LTTNG_VIEWER_GET_PACKET:
+               return "GET_PACKET";
+       case LTTNG_VIEWER_GET_METADATA:
+               return "GET_METADATA";
+       case LTTNG_VIEWER_GET_NEW_STREAMS:
+               return "GET_NEW_STREAMS";
+       case LTTNG_VIEWER_CREATE_SESSION:
+               return "CREATE_SESSION";
+       case LTTNG_VIEWER_DETACH_SESSION:
+               return "DETACH_SESSION";
+       default:
+               abort();
+       }
+}
+
 /*
  * Cleanup the daemon
  */
@@ -936,8 +964,6 @@ int viewer_connect(struct relay_connection *conn)
 
        health_code_update();
 
-       DBG("Viewer is establishing a connection to the relayd.");
-
        ret = recv_request(conn->sock, &msg, sizeof(msg));
        if (ret < 0) {
                goto end;
@@ -1024,8 +1050,6 @@ int viewer_list_sessions(struct relay_connection *conn)
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
 
-       DBG("List sessions received");
-
        send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
        if (!send_session_buf) {
                return -1;
@@ -1130,8 +1154,6 @@ int viewer_get_new_streams(struct relay_connection *conn)
 
        LTTNG_ASSERT(conn);
 
-       DBG("Get new streams received");
-
        health_code_update();
 
        /* Receive the request from the connected client. */
@@ -1167,6 +1189,16 @@ int viewer_get_new_streams(struct relay_connection *conn)
         * the viewer's point of view.
         */
        pthread_mutex_lock(&session->lock);
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session->ongoing_rotation) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
+               goto send_reply_unlock;
+       }
        ret = make_viewer_streams(session,
                        conn->viewer_session,
                        LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
@@ -1307,6 +1339,17 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session->ongoing_rotation) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               send_streams = 0;
+               goto send_reply;
+       }
+
        ret = make_viewer_streams(session,
                        conn->viewer_session, seek_type,
                        &nb_streams, NULL, NULL, &closed);
@@ -1581,11 +1624,12 @@ int viewer_get_next_index(struct relay_connection *conn)
        struct relay_stream *rstream = NULL;
        struct ctf_trace *ctf_trace = NULL;
        struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
+       uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
+       enum lttng_trace_chunk_status status;
 
        LTTNG_ASSERT(conn);
 
-       DBG("Viewer get next index");
-
        memset(&viewer_index, 0, sizeof(viewer_index));
        health_code_update();
 
@@ -1611,6 +1655,13 @@ int viewer_get_next_index(struct relay_connection *conn)
        metadata_viewer_stream =
                        ctf_trace_get_viewer_metadata_stream(ctf_trace);
 
+       /*
+        * Hold the session lock to protect against concurrent changes
+        * to the chunk files (e.g. rename done by clear), which are
+        * protected by the session ongoing rotation state. Those are
+        * synchronized with the session lock.
+        */
+       pthread_mutex_lock(&rstream->trace->session->lock);
        pthread_mutex_lock(&rstream->lock);
 
        /*
@@ -1661,12 +1712,50 @@ int viewer_get_next_index(struct relay_connection *conn)
         * This allows clients to consume all the packets of a trace chunk
         * after a session's destruction.
         */
-       if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) &&
-                       !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
-               DBG("Viewer session and viewer stream chunk IDs differ: "
-                               "vsession chunk %p vstream chunk %p",
+       if (vstream->stream_file.trace_chunk) {
+               status = lttng_trace_chunk_get_id(
+                               vstream->stream_file.trace_chunk,
+                               &stream_file_chunk_id);
+               LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+       }
+       if (conn->viewer_session->current_trace_chunk) {
+               status = lttng_trace_chunk_get_id(
                                conn->viewer_session->current_trace_chunk,
-                               vstream->stream_file.trace_chunk);
+                               &viewer_session_chunk_id);
+               LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+       }
+
+       viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
+                       conn->viewer_session->current_trace_chunk,
+                       vstream->stream_file.trace_chunk);
+       viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
+                       vstream->last_seen_rotation_count + 1;
+
+       if (viewer_stream_and_session_in_same_chunk) {
+               DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+       } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
+               DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+       } else {
+               DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+
                viewer_stream_rotate_to_trace_chunk(vstream,
                                conn->viewer_session->current_trace_chunk);
                vstream->last_seen_rotation_count =
@@ -1711,7 +1800,6 @@ int viewer_get_next_index(struct relay_connection *conn)
         */
        if (!vstream->stream_file.handle) {
                char file_path[LTTNG_PATH_MAX];
-               enum lttng_trace_chunk_status status;
                struct fs_handle *fs_handle;
 
                ret = utils_stream_file_path(rstream->path_name,
@@ -1777,6 +1865,7 @@ int viewer_get_next_index(struct relay_connection *conn)
 send_reply:
        if (rstream) {
                pthread_mutex_unlock(&rstream->lock);
+               pthread_mutex_unlock(&rstream->trace->session->lock);
        }
 
        if (metadata_viewer_stream) {
@@ -1818,6 +1907,7 @@ end:
 
 error_put:
        pthread_mutex_unlock(&rstream->lock);
+       pthread_mutex_unlock(&rstream->trace->session->lock);
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
        }
@@ -1844,8 +1934,6 @@ int viewer_get_packet(struct relay_connection *conn)
        ssize_t read_len;
        uint64_t stream_id;
 
-       DBG2("Relay get data packet");
-
        health_code_update();
 
        ret = recv_request(conn->sock, &get_packet_info,
@@ -1958,8 +2046,6 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        LTTNG_ASSERT(conn);
 
-       DBG("Relay get metadata");
-
        health_code_update();
 
        ret = recv_request(conn->sock, &request, sizeof(request));
@@ -2007,9 +2093,16 @@ int viewer_get_metadata(struct relay_connection *conn)
                 */
                if (vstream->metadata_sent > 0) {
                        if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
-                               /* Release ownership for the viewer metadata stream. */
+                               /*
+                                * Release ownership for the viewer metadata
+                                * stream. Note that this reference is the
+                                * viewer's reference. The vstream still exists
+                                * until the end of the function as
+                                * viewer_stream_get_by_id() took a reference.
+                                */
                                viewer_stream_put(vstream);
                        }
+
                        vstream->stream->no_new_metadata_notified = true;
                }
                goto send_reply;
@@ -2219,8 +2312,6 @@ int viewer_create_session(struct relay_connection *conn)
        int ret;
        struct lttng_viewer_create_session_response resp;
 
-       DBG("Viewer create session received");
-
        memset(&resp, 0, sizeof(resp));
        resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
        conn->viewer_session = viewer_session_create();
@@ -2257,8 +2348,6 @@ int viewer_detach_session(struct relay_connection *conn)
        struct relay_session *session = NULL;
        uint64_t viewer_session_to_close;
 
-       DBG("Viewer detach session received");
-
        LTTNG_ASSERT(conn);
 
        health_code_update();
@@ -2337,21 +2426,24 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
                struct relay_connection *conn)
 {
        int ret = 0;
-       uint32_t msg_value;
-
-       msg_value = be32toh(recv_hdr->cmd);
+       lttng_viewer_command cmd =
+                       (lttng_viewer_command) be32toh(recv_hdr->cmd);
 
        /*
-        * Make sure we've done the version check before any command other then a
-        * new client connection.
+        * Make sure we've done the version check before any command other then
+        * new client connection.
         */
-       if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
-               ERR("Viewer conn value %" PRIu32 " before version check", msg_value);
+       if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
+               ERR("Viewer on connection %d requested %s command before version check",
+                       conn->sock->fd, lttng_viewer_command_str(cmd));
                ret = -1;
                goto end;
        }
 
-       switch (msg_value) {
+       DBG("Processing %s viewer command from connection %d",
+                       lttng_viewer_command_str(cmd), conn->sock->fd);
+
+       switch (cmd) {
        case LTTNG_VIEWER_CONNECT:
                ret = viewer_connect(conn);
                break;
This page took 0.026142 seconds and 4 git commands to generate.