Fix: relayd: live: mishandled initial null trace chunk
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index ccba8d70594c278aaabfee2a6d4b6687cd0adc34..962fe0fd14a340c9e903f6410d09b1b6f386a3de 100644 (file)
@@ -281,16 +281,9 @@ static int make_viewer_streams(struct relay_session *relay_session,
        struct ctf_trace *ctf_trace;
        struct relay_stream *relay_stream = NULL;
 
-       assert(relay_session);
+       LTTNG_ASSERT(relay_session);
        ASSERT_LOCKED(relay_session->lock);
 
-       if (!viewer_session->current_trace_chunk) {
-               ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk",
-                               relay_session->session_name);
-               ret = -1;
-               goto error;
-       }
-
        if (relay_session->connection_closed) {
                *closed = true;
        }
@@ -361,7 +354,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                        viewer_stream = viewer_stream_get_by_id(
                                        relay_stream->stream_handle);
                        if (!viewer_stream) {
-                               struct lttng_trace_chunk *viewer_stream_trace_chunk;
+                               struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
 
                                /*
                                 * Save that we sent the metadata stream to the
@@ -391,12 +384,42 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                                goto error_unlock;
                                        }
                                } else {
-                                       const bool reference_acquired = lttng_trace_chunk_get(
-                                                       viewer_session->current_trace_chunk);
+                                       /*
+                                        * Transition the viewer session into the newest trace chunk available.
+                                        */
+                                       if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk,
+                                                       relay_stream->trace_chunk)) {
+
+                                               ret = viewer_session_set_trace_chunk_copy(
+                                                               viewer_session,
+                                                               relay_stream->trace_chunk);
+                                               if (ret) {
+                                                       ret = -1;
+                                                       ctf_trace_put(ctf_trace);
+                                                       goto error_unlock;
+                                               }
+                                       }
 
-                                       assert(reference_acquired);
-                                       viewer_stream_trace_chunk =
-                                                       viewer_session->current_trace_chunk;
+                                       if (relay_stream->trace_chunk) {
+                                               /*
+                                                * If the corresponding relay
+                                                * stream's trace chunk is set,
+                                                * the viewer stream will be
+                                                * created under it.
+                                                *
+                                                * Note that a relay stream can
+                                                * have a NULL output trace
+                                                * chunk (for instance, after a
+                                                * clear against a stopped
+                                                * session).
+                                                */
+                                               const bool reference_acquired = lttng_trace_chunk_get(
+                                                               viewer_session->current_trace_chunk);
+
+                                               LTTNG_ASSERT(reference_acquired);
+                                               viewer_stream_trace_chunk =
+                                                               viewer_session->current_trace_chunk;
+                                       }
                                }
 
                                viewer_stream = viewer_stream_create(
@@ -460,7 +483,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
 
 error_unlock:
        rcu_read_unlock();
-error:
+
        if (relay_stream) {
                pthread_mutex_unlock(&relay_stream->lock);
                stream_put(relay_stream);
@@ -1018,14 +1041,6 @@ int viewer_list_sessions(struct relay_connection *conn)
                        /* Skip closed session */
                        goto next_session;
                }
-               if (!session->current_trace_chunk) {
-                       /*
-                        * Skip un-attachable session. It is either
-                        * being destroyed or has not had a trace
-                        * chunk created against it yet.
-                        */
-                       goto next_session;
-               }
 
                if (count >= buf_count) {
                        struct lttng_viewer_session *newbuf;
@@ -1111,7 +1126,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
        uint64_t session_id;
        bool closed = false;
 
-       assert(conn);
+       LTTNG_ASSERT(conn);
 
        DBG("Get new streams received");
 
@@ -1140,10 +1155,19 @@ int viewer_get_new_streams(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since
+        * that at this point the client is already attached to the session.Aany
+        * initial stream will have been created with the seek type at attach
+        * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach).
+        * Otherwise any event happening in a new stream between the attach and
+        * a call to viewer_get_new_streams will be "lost" (never received) from
+        * the viewer's point of view.
+        */
        pthread_mutex_lock(&session->lock);
        ret = make_viewer_streams(session,
                        conn->viewer_session,
-                       LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
+                       LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
                goto error_unlock_session;
@@ -1224,7 +1248,7 @@ int viewer_attach_session(struct relay_connection *conn)
        bool closed = false;
        uint64_t session_id;
 
-       assert(conn);
+       LTTNG_ASSERT(conn);
 
        health_code_update();
 
@@ -1254,15 +1278,6 @@ int viewer_attach_session(struct relay_connection *conn)
        DBG("Attach session ID %" PRIu64 " received", session_id);
 
        pthread_mutex_lock(&session->lock);
-       if (!session->current_trace_chunk) {
-               /*
-                * Session is either being destroyed or it never had a trace
-                * chunk created against it.
-                */
-               DBG("Session requested by live client has no current trace chunk, returning unknown session");
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
-               goto send_reply;
-       }
        if (session->live_timer == 0) {
                DBG("Not live session");
                response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
@@ -1371,10 +1386,12 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->index_received_seqcount == 0) {
+       if (rstream->index_received_seqcount == 0 ||
+                       !vstream->stream_file.trace_chunk) {
                ret = -ENOENT;
                goto end;
        }
+
        chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
                        vstream->stream_file.trace_chunk, rstream->path_name,
                        rstream->channel_name, rstream->tracefile_size,
@@ -1514,7 +1531,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                vstream->stream->stream_handle);
                        goto index_ready;
                }
-               assert(tracefile_array_seq_in_file(rstream->tfa,
+               LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
                                vstream->current_tracefile_id,
                                vstream->index_sent_seqcount));
        }
@@ -1528,6 +1545,24 @@ index_ready:
        return 1;
 }
 
+static
+void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
+                struct lttng_trace_chunk *new_trace_chunk)
+{
+       lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+
+       if (new_trace_chunk) {
+               const bool acquired_reference = lttng_trace_chunk_get(
+                               new_trace_chunk);
+
+               LTTNG_ASSERT(acquired_reference);
+       }
+
+       vstream->stream_file.trace_chunk = new_trace_chunk;
+       viewer_stream_sync_tracefile_array_tail(vstream);
+       viewer_stream_close_files(vstream);
+}
+
 /*
  * Send the next index for a stream.
  *
@@ -1545,7 +1580,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        struct ctf_trace *ctf_trace = NULL;
        struct relay_viewer_stream *metadata_viewer_stream = NULL;
 
-       assert(conn);
+       LTTNG_ASSERT(conn);
 
        DBG("Viewer get next index");
 
@@ -1596,7 +1631,10 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
-       if (rstream->trace_chunk && !lttng_trace_chunk_ids_equal(
+       /*
+        * Transition the viewer session into the newest trace chunk available.
+        */
+       if (!lttng_trace_chunk_ids_equal(
                        conn->viewer_session->current_trace_chunk,
                        rstream->trace_chunk)) {
                DBG("Relay stream and viewer chunk ids differ");
@@ -1609,21 +1647,28 @@ int viewer_get_next_index(struct relay_connection *conn)
                        goto send_reply;
                }
        }
-       if (conn->viewer_session->current_trace_chunk !=
-                       vstream->stream_file.trace_chunk) {
-               bool acquired_reference;
 
+       /*
+        * Transition the viewer stream into the latest trace chunk available.
+        *
+        * Note that the stream must _not_ rotate in one precise condition:
+        * the relay stream has rotated to a NULL trace chunk and the viewer
+        * stream is consuming the trace chunk that was active just before
+        * that rotation to NULL.
+        *
+        * This allows clients to consume all the packets of a trace chunk
+        * after a session's destruction.
+        */
+       if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
+                       !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
                DBG("Viewer session and viewer stream chunk differ: "
                                "vsession chunk %p vstream chunk %p",
                                conn->viewer_session->current_trace_chunk,
                                vstream->stream_file.trace_chunk);
-               lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
-               acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
-               assert(acquired_reference);
-               vstream->stream_file.trace_chunk =
-                       conn->viewer_session->current_trace_chunk;
-               viewer_stream_sync_tracefile_array_tail(vstream);
-               viewer_stream_close_files(vstream);
+               viewer_stream_rotate_to_trace_chunk(vstream,
+                               conn->viewer_session->current_trace_chunk);
+               vstream->last_seen_rotation_count =
+                               rstream->completed_rotation_count;
        }
 
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
@@ -1637,7 +1682,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
        /* At this point, ret is 0 thus we will be able to read the index. */
-       assert(!ret);
+       LTTNG_ASSERT(!ret);
 
        /* Try to open an index if one is needed for that stream. */
        ret = try_open_index(vstream, rstream);
@@ -1909,7 +1954,7 @@ int viewer_get_metadata(struct relay_connection *conn)
        struct lttng_viewer_metadata_packet reply;
        struct relay_viewer_stream *vstream = NULL;
 
-       assert(conn);
+       LTTNG_ASSERT(conn);
 
        DBG("Relay get metadata");
 
@@ -1984,8 +2029,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                }
        }
 
-       if (conn->viewer_session->current_trace_chunk !=
-                       vstream->stream_file.trace_chunk) {
+       if (conn->viewer_session->current_trace_chunk &&
+                       conn->viewer_session->current_trace_chunk !=
+                                       vstream->stream_file.trace_chunk) {
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
@@ -1994,7 +2040,7 @@ int viewer_get_metadata(struct relay_connection *conn)
                                vstream->stream_file.trace_chunk);
                lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
                acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
-               assert(acquired_reference);
+               LTTNG_ASSERT(acquired_reference);
                vstream->stream_file.trace_chunk =
                        conn->viewer_session->current_trace_chunk;
                viewer_stream_close_files(vstream);
@@ -2002,11 +2048,16 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
-       /*
-        * Either this is the first time the metadata file is read, or a
-        * rotation of the corresponding relay stream has occured.
-        */
-       if (!vstream->stream_file.handle && len > 0) {
+       if (!vstream->stream_file.trace_chunk) {
+               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+               len = 0;
+               goto send_reply;
+       } else if (vstream->stream_file.trace_chunk &&
+                       !vstream->stream_file.handle && len > 0) {
+               /*
+                * Either this is the first time the metadata file is read, or a
+                * rotation of the corresponding relay stream has occurred.
+                */
                struct fs_handle *fs_handle;
                char file_path[LTTNG_PATH_MAX];
                enum lttng_trace_chunk_status status;
@@ -2206,7 +2257,7 @@ int viewer_detach_session(struct relay_connection *conn)
 
        DBG("Viewer detach session received");
 
-       assert(conn);
+       LTTNG_ASSERT(conn);
 
        health_code_update();
 
This page took 0.027957 seconds and 4 git commands to generate.