Fix: relayd: missing session unlock on error path
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 1a30366606065d248f0aaf05f915c18701683322..37bdfc2e5a2c5326ae0104523d6c4d3c2994a98a 100644 (file)
@@ -374,7 +374,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * chunk can be used safely.
                                 */
                                if ((relay_stream->ongoing_rotation.is_set ||
-                                                   relay_session->ongoing_rotation) &&
+                                               session_has_ongoing_rotation(relay_session)) &&
                                                relay_stream->trace_chunk) {
                                        viewer_stream_trace_chunk = lttng_trace_chunk_copy(
                                                        relay_stream->trace_chunk);
@@ -384,8 +384,6 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                                goto error_unlock;
                                        }
                                } else {
-                                       bool reference_acquired;
-
                                        /*
                                         * Transition the viewer session into the newest trace chunk available.
                                         */
@@ -402,11 +400,26 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                                }
                                        }
 
-                                       reference_acquired = lttng_trace_chunk_get(
-                                                       viewer_session->current_trace_chunk);
-                                       assert(reference_acquired);
-                                       viewer_stream_trace_chunk =
-                                                       viewer_session->current_trace_chunk;
+                                       if (relay_stream->trace_chunk) {
+                                               /*
+                                                * If the corresponding relay
+                                                * stream's trace chunk is set,
+                                                * the viewer stream will be
+                                                * created under it.
+                                                *
+                                                * Note that a relay stream can
+                                                * have a NULL output trace
+                                                * chunk (for instance, after a
+                                                * clear against a stopped
+                                                * session).
+                                                */
+                                               const bool reference_acquired = lttng_trace_chunk_get(
+                                                               viewer_session->current_trace_chunk);
+
+                                               assert(reference_acquired);
+                                               viewer_stream_trace_chunk =
+                                                               viewer_session->current_trace_chunk;
+                                       }
                                }
 
                                viewer_stream = viewer_stream_create(
@@ -1152,12 +1165,27 @@ int viewer_get_new_streams(struct relay_connection *conn)
         * the viewer's point of view.
         */
        pthread_mutex_lock(&session->lock);
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session_has_ongoing_rotation(session)) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
+               goto send_reply_unlock;
+       }
        ret = make_viewer_streams(session,
                        conn->viewer_session,
                        LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
-               goto error_unlock_session;
+               /*
+                * This is caused by an internal error; propagate the negative
+                * 'ret' to close the connection.
+                */
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+               goto send_reply_unlock;
        }
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
@@ -1212,10 +1240,6 @@ end_put_session:
        }
 error:
        return ret;
-error_unlock_session:
-       pthread_mutex_unlock(&session->lock);
-       session_put(session);
-       return ret;
 }
 
 /*
@@ -1292,6 +1316,17 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session_has_ongoing_rotation(session)) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               send_streams = 0;
+               goto send_reply;
+       }
+
        ret = make_viewer_streams(session,
                        conn->viewer_session, seek_type,
                        &nb_streams, NULL, NULL, &closed);
@@ -1596,6 +1631,13 @@ int viewer_get_next_index(struct relay_connection *conn)
        metadata_viewer_stream =
                        ctf_trace_get_viewer_metadata_stream(ctf_trace);
 
+       /*
+        * Hold the session lock to protect against concurrent changes
+        * to the chunk files (e.g. rename done by clear), which are
+        * protected by the session ongoing rotation state. Those are
+        * synchronized with the session lock.
+        */
+       pthread_mutex_lock(&rstream->trace->session->lock);
        pthread_mutex_lock(&rstream->lock);
 
        /*
@@ -1612,7 +1654,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
-       if (rstream->trace->session->ongoing_rotation) {
+       if (session_has_ongoing_rotation(rstream->trace->session)) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto send_reply;
@@ -1646,9 +1688,9 @@ int viewer_get_next_index(struct relay_connection *conn)
         * This allows clients to consume all the packets of a trace chunk
         * after a session's destruction.
         */
-       if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
+       if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) &&
                        !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
-               DBG("Viewer session and viewer stream chunk differ: "
+               DBG("Viewer session and viewer stream chunk IDs differ: "
                                "vsession chunk %p vstream chunk %p",
                                conn->viewer_session->current_trace_chunk,
                                vstream->stream_file.trace_chunk);
@@ -1762,6 +1804,7 @@ int viewer_get_next_index(struct relay_connection *conn)
 send_reply:
        if (rstream) {
                pthread_mutex_unlock(&rstream->lock);
+               pthread_mutex_unlock(&rstream->trace->session->lock);
        }
 
        if (metadata_viewer_stream) {
@@ -1803,6 +1846,7 @@ end:
 
 error_put:
        pthread_mutex_unlock(&rstream->lock);
+       pthread_mutex_unlock(&rstream->trace->session->lock);
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
        }
@@ -1991,11 +2035,11 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * an error.
                 */
                if (vstream->metadata_sent > 0) {
-                       vstream->stream->no_new_metadata_notified = true;
-                       if (vstream->stream->closed) {
+                       if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
                                /* Release ownership for the viewer metadata stream. */
                                viewer_stream_put(vstream);
                        }
+                       vstream->stream->no_new_metadata_notified = true;
                }
                goto send_reply;
        }
@@ -2016,8 +2060,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                }
        }
 
-       if (conn->viewer_session->current_trace_chunk !=
-                       vstream->stream_file.trace_chunk) {
+       if (conn->viewer_session->current_trace_chunk &&
+                       !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+                                       vstream->stream_file.trace_chunk)) {
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
@@ -2034,11 +2079,16 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
-       /*
-        * Either this is the first time the metadata file is read, or a
-        * rotation of the corresponding relay stream has occurred.
-        */
-       if (!vstream->stream_file.handle && len > 0) {
+       if (!vstream->stream_file.trace_chunk) {
+               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+               len = 0;
+               goto send_reply;
+       } else if (vstream->stream_file.trace_chunk &&
+                       !vstream->stream_file.handle && len > 0) {
+               /*
+                * Either this is the first time the metadata file is read, or a
+                * rotation of the corresponding relay stream has occurred.
+                */
                struct fs_handle *fs_handle;
                char file_path[LTTNG_PATH_MAX];
                enum lttng_trace_chunk_status status;
This page took 0.02611 seconds and 4 git commands to generate.