Fix: relayd: connection abruptly closed on viewer stream creation failure
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
index 92c3bf4a7831d8ac6195009e08af8dc5e6fceee0..e0d446b2a1a4ee25cdc25539c870a87f52a1ef91 100644 (file)
@@ -29,6 +29,7 @@
 #include <urcu/futex.h>
 #include <urcu/rculist.h>
 #include <urcu/uatomic.h>
+#include <string>
 
 #include <common/common.h>
 #include <common/compat/endian.h>
@@ -88,6 +89,95 @@ static uint64_t last_relay_viewer_session_id;
 static pthread_mutex_t last_relay_viewer_session_id_lock =
                PTHREAD_MUTEX_INITIALIZER;
 
+static
+const char *lttng_viewer_command_str(lttng_viewer_command cmd)
+{
+       switch (cmd) {
+       case LTTNG_VIEWER_CONNECT:
+               return "CONNECT";
+       case LTTNG_VIEWER_LIST_SESSIONS:
+               return "LIST_SESSIONS";
+       case LTTNG_VIEWER_ATTACH_SESSION:
+               return "ATTACH_SESSION";
+       case LTTNG_VIEWER_GET_NEXT_INDEX:
+               return "GET_NEXT_INDEX";
+       case LTTNG_VIEWER_GET_PACKET:
+               return "GET_PACKET";
+       case LTTNG_VIEWER_GET_METADATA:
+               return "GET_METADATA";
+       case LTTNG_VIEWER_GET_NEW_STREAMS:
+               return "GET_NEW_STREAMS";
+       case LTTNG_VIEWER_CREATE_SESSION:
+               return "CREATE_SESSION";
+       case LTTNG_VIEWER_DETACH_SESSION:
+               return "DETACH_SESSION";
+       default:
+               abort();
+       }
+}
+
+static
+const char *lttng_viewer_next_index_return_code_str(
+               enum lttng_viewer_next_index_return_code code)
+{
+       switch (code) {
+       case LTTNG_VIEWER_INDEX_OK:
+               return "INDEX_OK";
+       case LTTNG_VIEWER_INDEX_RETRY:
+               return "INDEX_RETRY";
+       case LTTNG_VIEWER_INDEX_HUP:
+               return "INDEX_HUP";
+       case LTTNG_VIEWER_INDEX_ERR:
+               return "INDEX_ERR";
+       case LTTNG_VIEWER_INDEX_INACTIVE:
+               return "INDEX_INACTIVE";
+       case LTTNG_VIEWER_INDEX_EOF:
+               return "INDEX_EOF";
+       default:
+               abort();
+       }
+}
+
+static
+const char *lttng_viewer_attach_return_code_str(
+               enum lttng_viewer_attach_return_code code)
+{
+       switch (code) {
+       case LTTNG_VIEWER_ATTACH_OK:
+               return "ATTACH_OK";
+       case LTTNG_VIEWER_ATTACH_ALREADY:
+               return "ATTACH_ALREADY";
+       case LTTNG_VIEWER_ATTACH_UNK:
+               return "ATTACH_UNK";
+       case LTTNG_VIEWER_ATTACH_NOT_LIVE:
+               return "ATTACH_NOT_LIVE";
+       case LTTNG_VIEWER_ATTACH_SEEK_ERR:
+               return "ATTACH_SEEK_ERR";
+       case LTTNG_VIEWER_ATTACH_NO_SESSION:
+               return "ATTACH_NO_SESSION";
+       default:
+               abort();
+       }
+};
+
+static
+const char *lttng_viewer_get_packet_return_code_str(
+               enum lttng_viewer_get_packet_return_code code)
+{
+       switch (code) {
+       case LTTNG_VIEWER_GET_PACKET_OK:
+               return "GET_PACKET_OK";
+       case LTTNG_VIEWER_GET_PACKET_RETRY:
+               return "GET_PACKET_RETRY";
+       case LTTNG_VIEWER_GET_PACKET_ERR:
+               return "GET_PACKET_ERR";
+       case LTTNG_VIEWER_GET_PACKET_EOF:
+               return "GET_PACKET_EOF";
+       default:
+               abort();
+       }
+};
+
 /*
  * Cleanup the daemon
  */
@@ -374,7 +464,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * chunk can be used safely.
                                 */
                                if ((relay_stream->ongoing_rotation.is_set ||
-                                                   relay_session->ongoing_rotation) &&
+                                               session_has_ongoing_rotation(relay_session)) &&
                                                relay_stream->trace_chunk) {
                                        viewer_stream_trace_chunk = lttng_trace_chunk_copy(
                                                        relay_stream->trace_chunk);
@@ -936,8 +1026,6 @@ int viewer_connect(struct relay_connection *conn)
 
        health_code_update();
 
-       DBG("Viewer is establishing a connection to the relayd.");
-
        ret = recv_request(conn->sock, &msg, sizeof(msg));
        if (ret < 0) {
                goto end;
@@ -1024,8 +1112,6 @@ int viewer_list_sessions(struct relay_connection *conn)
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
 
-       DBG("List sessions received");
-
        send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
        if (!send_session_buf) {
                return -1;
@@ -1130,8 +1216,6 @@ int viewer_get_new_streams(struct relay_connection *conn)
 
        LTTNG_ASSERT(conn);
 
-       DBG("Get new streams received");
-
        health_code_update();
 
        /* Receive the request from the connected client. */
@@ -1167,12 +1251,27 @@ int viewer_get_new_streams(struct relay_connection *conn)
         * the viewer's point of view.
         */
        pthread_mutex_lock(&session->lock);
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session_has_ongoing_rotation(session)) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
+               goto send_reply_unlock;
+       }
        ret = make_viewer_streams(session,
                        conn->viewer_session,
                        LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
-               goto error_unlock_session;
+               /*
+                * This is caused by an internal error; propagate the negative
+                * 'ret' to close the connection.
+                */
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+               goto send_reply;
        }
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
@@ -1227,10 +1326,6 @@ end_put_session:
        }
 error:
        return ret;
-error_unlock_session:
-       pthread_mutex_unlock(&session->lock);
-       session_put(session);
-       return ret;
 }
 
 /*
@@ -1261,28 +1356,34 @@ int viewer_attach_session(struct relay_connection *conn)
        }
 
        session_id = be64toh(request.session_id);
+
        health_code_update();
 
        memset(&response, 0, sizeof(response));
 
        if (!conn->viewer_session) {
-               DBG("Client trying to attach before creating a live viewer session");
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
+               viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION;
+               DBG("Client trying to attach before creating a live viewer session, returning status=%s",
+                               lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
        session = session_get_by_id(session_id);
        if (!session) {
-               DBG("Relay session %" PRIu64 " not found", session_id);
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
+               viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK;
+               DBG("Relay session %" PRIu64 " not found, returning status=%s",
+                               session_id,
+                               lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
-       DBG("Attach session ID %" PRIu64 " received", session_id);
+       DBG("Attach relay session ID %" PRIu64 " received", session_id);
 
        pthread_mutex_lock(&session->lock);
        if (session->live_timer == 0) {
-               DBG("Not live session");
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
+               viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE;
+               DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s",
+                               session_id,
+                               lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
@@ -1290,19 +1391,34 @@ int viewer_attach_session(struct relay_connection *conn)
        viewer_attach_status = viewer_session_attach(conn->viewer_session,
                        session);
        if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
-               response.status = htobe32(viewer_attach_status);
+               DBG("Error attaching to relay session %" PRIu64 ", returning status=%s",
+                               session_id,
+                               lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
        switch (be32toh(request.seek)) {
        case LTTNG_VIEWER_SEEK_BEGINNING:
        case LTTNG_VIEWER_SEEK_LAST:
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
+               viewer_attach_status = LTTNG_VIEWER_ATTACH_OK;
                seek_type = (lttng_viewer_seek) be32toh(request.seek);
                break;
        default:
-               ERR("Wrong seek parameter");
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
+               ERR("Wrong seek parameter for relay session %" PRIu64
+                               ", returning status=%s", session_id,
+                               lttng_viewer_attach_return_code_str(viewer_attach_status));
+               viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR;
+               send_streams = 0;
+               goto send_reply;
+       }
+
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session_has_ongoing_rotation(session)) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
                send_streams = 0;
                goto send_reply;
        }
@@ -1327,12 +1443,18 @@ int viewer_attach_session(struct relay_connection *conn)
        if (closed) {
                send_streams = 0;
                response.streams_count = 0;
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
+               viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK;
+               ERR("Session %" PRIu64 " is closed, returning status=%s",
+                               session_id,
+                               lttng_viewer_attach_return_code_str(viewer_attach_status));
                goto send_reply;
        }
 
 send_reply:
        health_code_update();
+
+       response.status = htobe32((uint32_t) viewer_attach_status);
+
        ret = send_response(conn->sock, &response, sizeof(response));
        if (ret < 0) {
                goto end_put_session;
@@ -1443,7 +1565,13 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * Last index sent and session connection or relay
                 * stream are closed.
                 */
-               index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+               index->status = LTTNG_VIEWER_INDEX_HUP;
+               DBG("Check index status: Connection or stream are closed, stream %" PRIu64
+                       ",connection-closed=%d, relay-stream-closed=%d, returning status=%s",
+                       vstream->stream->stream_handle,
+                       trace->session->connection_closed, rstream->closed,
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) index->status));
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
                        (rstream->index_received_seqcount == 0 ||
@@ -1465,11 +1593,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * viewer_stream_sync_tracefile_array_tail) and skip over
                 * packet sequence numbers.
                 */
-               index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
+               index->status = LTTNG_VIEWER_INDEX_INACTIVE;
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
-               DBG("Check index status: inactive with beacon, for stream %" PRIu64,
-                               vstream->stream->stream_handle);
+               DBG("Check index status: inactive with beacon, for stream %" PRIu64
+                       ", returning status=%s",
+                       vstream->stream->stream_handle,
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) index->status));
                goto index_ready;
        } else if (rstream->index_received_seqcount == 0 ||
                        (vstream->index_sent_seqcount != 0 &&
@@ -1486,9 +1617,13 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * viewer_stream_sync_tracefile_array_tail) and skip over
                 * packet sequence numbers.
                 */
-               index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-               DBG("Check index status: retry for stream %" PRIu64,
-                               vstream->stream->stream_handle);
+               index->status = LTTNG_VIEWER_INDEX_RETRY;
+               DBG("Check index status:"
+                       "did not received beacon for stream %" PRIu64
+                       ", returning status=%s",
+                       vstream->stream->stream_handle,
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) index->status));
                goto index_ready;
        } else if (!tracefile_array_seq_in_file(rstream->tfa,
                        vstream->current_tracefile_id,
@@ -1503,7 +1638,13 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                ret = viewer_stream_rotate(vstream);
                if (ret == 1) {
                        /* EOF across entire stream. */
-                       index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                       index->status = LTTNG_VIEWER_INDEX_HUP;
+                       DBG("Check index status:"
+                               "reached end of file for stream %" PRIu64
+                               ", returning status=%s",
+                               vstream->stream->stream_handle,
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) index->status));
                        goto hup;
                }
                /*
@@ -1525,12 +1666,15 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                        rstream->tfa,
                                        vstream->current_tracefile_id,
                                        vstream->index_sent_seqcount)) {
-                       index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-                       DBG("Check index status: retry: "
+                       index->status = LTTNG_VIEWER_INDEX_RETRY;
+                       DBG("Check index status:"
                                "tracefile array sequence number %" PRIu64
-                               " not in file for stream %" PRIu64,
+                               " not in file for stream %" PRIu64
+                               ", returning status=%s",
                                vstream->index_sent_seqcount,
-                               vstream->stream->stream_handle);
+                               vstream->stream->stream_handle,
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) index->status));
                        goto index_ready;
                }
                LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
@@ -1581,11 +1725,12 @@ int viewer_get_next_index(struct relay_connection *conn)
        struct relay_stream *rstream = NULL;
        struct ctf_trace *ctf_trace = NULL;
        struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
+       uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
+       enum lttng_trace_chunk_status status;
 
        LTTNG_ASSERT(conn);
 
-       DBG("Viewer get next index");
-
        memset(&viewer_index, 0, sizeof(viewer_index));
        health_code_update();
 
@@ -1597,9 +1742,11 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
-               DBG("Client requested index of unknown stream id %" PRIu64,
-                               (uint64_t) be64toh(request_index.stream_id));
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               DBG("Client requested index of unknown stream id %" PRIu64", returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
@@ -1611,25 +1758,44 @@ int viewer_get_next_index(struct relay_connection *conn)
        metadata_viewer_stream =
                        ctf_trace_get_viewer_metadata_stream(ctf_trace);
 
+       /*
+        * Hold the session lock to protect against concurrent changes
+        * to the chunk files (e.g. rename done by clear), which are
+        * protected by the session ongoing rotation state. Those are
+        * synchronized with the session lock.
+        */
+       pthread_mutex_lock(&rstream->trace->session->lock);
        pthread_mutex_lock(&rstream->lock);
 
        /*
         * The viewer should not ask for index on metadata stream.
         */
        if (rstream->is_metadata) {
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+               viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
+               DBG("Client requested index of a metadata stream id %" PRIu64", returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
+               DBG("Client requested index for stream id %" PRIu64" while a stream rotation is ongoing, returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
-       if (rstream->trace->session->ongoing_rotation) {
+       if (session_has_ongoing_rotation(rstream->trace->session)) {
                /* Rotation is ongoing, try again later. */
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
+               DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
@@ -1645,7 +1811,12 @@ int viewer_get_next_index(struct relay_connection *conn)
                                conn->viewer_session,
                                rstream->trace_chunk);
                if (ret) {
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+                       viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+                       ERR("Error copying trace chunk for stream id %" PRIu64
+                               ", returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                        goto send_reply;
                }
        }
@@ -1661,12 +1832,50 @@ int viewer_get_next_index(struct relay_connection *conn)
         * This allows clients to consume all the packets of a trace chunk
         * after a session's destruction.
         */
-       if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
-                       !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
-               DBG("Viewer session and viewer stream chunk differ: "
-                               "vsession chunk %p vstream chunk %p",
+       if (vstream->stream_file.trace_chunk) {
+               status = lttng_trace_chunk_get_id(
+                               vstream->stream_file.trace_chunk,
+                               &stream_file_chunk_id);
+               LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+       }
+       if (conn->viewer_session->current_trace_chunk) {
+               status = lttng_trace_chunk_get_id(
                                conn->viewer_session->current_trace_chunk,
-                               vstream->stream_file.trace_chunk);
+                               &viewer_session_chunk_id);
+               LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
+       }
+
+       viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
+                       conn->viewer_session->current_trace_chunk,
+                       vstream->stream_file.trace_chunk);
+       viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
+                       vstream->last_seen_rotation_count + 1;
+
+       if (viewer_stream_and_session_in_same_chunk) {
+               DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+       } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
+               DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+       } else {
+               DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
+                               vstream->stream_file.trace_chunk ?
+                                               std::to_string(stream_file_chunk_id).c_str() :
+                                               "None",
+                               conn->viewer_session->current_trace_chunk ?
+                                               std::to_string(viewer_session_chunk_id).c_str() :
+                                               "None");
+
                viewer_stream_rotate_to_trace_chunk(vstream,
                                conn->viewer_session->current_trace_chunk);
                vstream->last_seen_rotation_count =
@@ -1690,15 +1899,30 @@ int viewer_get_next_index(struct relay_connection *conn)
        ret = try_open_index(vstream, rstream);
        if (ret == -ENOENT) {
               if (rstream->closed) {
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                       viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
+                       DBG("Cannot open index for stream id %" PRIu64
+                               "stream is closed, returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                        goto send_reply;
               } else {
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
+                       DBG("Cannot open index for stream id %" PRIu64
+                               ", returning status=%s",
+                               (uint64_t) be64toh(request_index.stream_id),
+                               lttng_viewer_next_index_return_code_str(
+                                       (enum lttng_viewer_next_index_return_code) viewer_index.status));
                        goto send_reply;
               }
        }
        if (ret < 0) {
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error opening index for stream id %" PRIu64
+                       ", returning status=%s",
+                       (uint64_t) be64toh(request_index.stream_id),
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        }
 
@@ -1711,7 +1935,6 @@ int viewer_get_next_index(struct relay_connection *conn)
         */
        if (!vstream->stream_file.handle) {
                char file_path[LTTNG_PATH_MAX];
-               enum lttng_trace_chunk_status status;
                struct fs_handle *fs_handle;
 
                ret = utils_stream_file_path(rstream->path_name,
@@ -1733,7 +1956,12 @@ int viewer_get_next_index(struct relay_connection *conn)
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
                                        rstream->closed) {
-                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                               viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
+                               DBG("Cannot find trace chunk file and stream is closed for stream id %" PRIu64
+                                       ", returning status=%s",
+                                       (uint64_t) be64toh(request_index.stream_id),
+                                       lttng_viewer_next_index_return_code_str(
+                                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
                                goto send_reply;
                        }
                        PERROR("Failed to open trace file for viewer stream");
@@ -1744,7 +1972,12 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        ret = check_new_streams(conn);
        if (ret < 0) {
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams before sending new index to stream id %" PRIu64
+                       ", returning status=%s",
+                       (uint64_t) be64toh(request_index.stream_id),
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        } else if (ret == 1) {
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
@@ -1752,11 +1985,20 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
-               ERR("Relay error reading index file");
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Relay error reading index file for stream id %" PRIu64
+                       ", returning status=%s",
+                       (uint64_t) be64toh(request_index.stream_id),
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
                goto send_reply;
        } else {
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
+               viewer_index.status = LTTNG_VIEWER_INDEX_OK;
+               DBG("Read index file for stream id %" PRIu64
+                       ", returning status=%s",
+                       (uint64_t) be64toh(request_index.stream_id),
+                       lttng_viewer_next_index_return_code_str(
+                               (enum lttng_viewer_next_index_return_code) viewer_index.status));
                vstream->index_sent_seqcount++;
        }
 
@@ -1777,6 +2019,7 @@ int viewer_get_next_index(struct relay_connection *conn)
 send_reply:
        if (rstream) {
                pthread_mutex_unlock(&rstream->lock);
+               pthread_mutex_unlock(&rstream->trace->session->lock);
        }
 
        if (metadata_viewer_stream) {
@@ -1794,6 +2037,7 @@ send_reply:
        }
 
        viewer_index.flags = htobe32(viewer_index.flags);
+       viewer_index.status = htobe32(viewer_index.status);
        health_code_update();
 
        ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
@@ -1818,6 +2062,7 @@ end:
 
 error_put:
        pthread_mutex_unlock(&rstream->lock);
+       pthread_mutex_unlock(&rstream->trace->session->lock);
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
        }
@@ -1843,8 +2088,7 @@ int viewer_get_packet(struct relay_connection *conn)
        uint32_t packet_data_len = 0;
        ssize_t read_len;
        uint64_t stream_id;
-
-       DBG2("Relay get data packet");
+       enum lttng_viewer_get_packet_return_code get_packet_status;
 
        health_code_update();
 
@@ -1861,9 +2105,10 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(stream_id);
        if (!vstream) {
-               DBG("Client requested packet of unknown stream id %" PRIu64,
-                               stream_id);
-               reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+               DBG("Client requested packet of unknown stream id %" PRIu64
+                       ", returning status=%s", stream_id,
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto send_reply_nolock;
        } else {
                packet_data_len = be32toh(get_packet_info.len);
@@ -1872,8 +2117,9 @@ int viewer_get_packet(struct relay_connection *conn)
 
        reply = (char *) zmalloc(reply_size);
        if (!reply) {
-               PERROR("packet reply zmalloc");
-               reply_size = sizeof(reply_header);
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
+               PERROR("Falled to allocate reply, returning status=%s",
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
 
@@ -1881,29 +2127,31 @@ int viewer_get_packet(struct relay_connection *conn)
        lseek_ret = fs_handle_seek(vstream->stream_file.handle,
                        be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Failed to seek file system handle of viewer stream %" PRIu64
-                      " to offset %" PRIu64,
-                               stream_id,
-                               (uint64_t) be64toh(get_packet_info.offset));
+                      " to offset %" PRIu64", returning status=%s", stream_id,
+                       (uint64_t) be64toh(get_packet_info.offset),
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
        read_len = fs_handle_read(vstream->stream_file.handle,
                        reply + sizeof(reply_header), packet_data_len);
        if (read_len < packet_data_len) {
+               get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR;
                PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
-                      ", offset: %" PRIu64,
-                               stream_id,
-                               (uint64_t) be64toh(get_packet_info.offset));
+                      ", offset: %" PRIu64 ", returning status=%s", stream_id,
+                      (uint64_t) be64toh(get_packet_info.offset),
+                       lttng_viewer_get_packet_return_code_str(get_packet_status));
                goto error;
        }
-       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+
+       get_packet_status = LTTNG_VIEWER_GET_PACKET_OK;
        reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
 error:
        /* No payload to send on error. */
        reply_size = sizeof(reply_header);
-       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
        if (vstream) {
@@ -1913,6 +2161,7 @@ send_reply_nolock:
 
        health_code_update();
 
+       reply_header.status = htobe32(get_packet_status);
        if (reply) {
                memcpy(reply, &reply_header, sizeof(reply_header));
                ret = send_response(conn->sock, reply, reply_size);
@@ -1958,8 +2207,6 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        LTTNG_ASSERT(conn);
 
-       DBG("Relay get metadata");
-
        health_code_update();
 
        ret = recv_request(conn->sock, &request, sizeof(request));
@@ -2006,11 +2253,18 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * an error.
                 */
                if (vstream->metadata_sent > 0) {
-                       vstream->stream->no_new_metadata_notified = true;
-                       if (vstream->stream->closed) {
-                               /* Release ownership for the viewer metadata stream. */
+                       if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
+                               /*
+                                * Release ownership for the viewer metadata
+                                * stream. Note that this reference is the
+                                * viewer's reference. The vstream still exists
+                                * until the end of the function as
+                                * viewer_stream_get_by_id() took a reference.
+                                */
                                viewer_stream_put(vstream);
                        }
+
+                       vstream->stream->no_new_metadata_notified = true;
                }
                goto send_reply;
        }
@@ -2032,8 +2286,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
 
        if (conn->viewer_session->current_trace_chunk &&
-                       conn->viewer_session->current_trace_chunk !=
-                                       vstream->stream_file.trace_chunk) {
+                       !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+                                       vstream->stream_file.trace_chunk)) {
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
@@ -2219,8 +2473,6 @@ int viewer_create_session(struct relay_connection *conn)
        int ret;
        struct lttng_viewer_create_session_response resp;
 
-       DBG("Viewer create session received");
-
        memset(&resp, 0, sizeof(resp));
        resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
        conn->viewer_session = viewer_session_create();
@@ -2257,8 +2509,6 @@ int viewer_detach_session(struct relay_connection *conn)
        struct relay_session *session = NULL;
        uint64_t viewer_session_to_close;
 
-       DBG("Viewer detach session received");
-
        LTTNG_ASSERT(conn);
 
        health_code_update();
@@ -2337,21 +2587,24 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
                struct relay_connection *conn)
 {
        int ret = 0;
-       uint32_t msg_value;
-
-       msg_value = be32toh(recv_hdr->cmd);
+       lttng_viewer_command cmd =
+                       (lttng_viewer_command) be32toh(recv_hdr->cmd);
 
        /*
-        * Make sure we've done the version check before any command other then a
-        * new client connection.
+        * Make sure we've done the version check before any command other then
+        * new client connection.
         */
-       if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
-               ERR("Viewer conn value %" PRIu32 " before version check", msg_value);
+       if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
+               ERR("Viewer on connection %d requested %s command before version check",
+                       conn->sock->fd, lttng_viewer_command_str(cmd));
                ret = -1;
                goto end;
        }
 
-       switch (msg_value) {
+       DBG("Processing %s viewer command from connection %d",
+                       lttng_viewer_command_str(cmd), conn->sock->fd);
+
+       switch (cmd) {
        case LTTNG_VIEWER_CONNECT:
                ret = viewer_connect(conn);
                break;
This page took 0.034987 seconds and 4 git commands to generate.