Fix: relayd: failure to read index entry or stream packet after clear
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index f28b9c66fa90fb89301287b8af9910d0aac7f5f0..5877467c0d5598f0cbdc7481f08c144a0b7580b1 100644 (file)
@@ -192,7 +192,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
                uint64_t session_id, unsigned int ignore_sent_flag)
 {
        ssize_t ret;
-       struct lttng_viewer_stream send_stream;
        struct lttng_ht_iter iter;
        struct relay_viewer_stream *vstream;
 
@@ -201,6 +200,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
        cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
                        stream_n.node) {
                struct ctf_trace *ctf_trace;
+               struct lttng_viewer_stream send_stream = {};
 
                health_code_update();
 
@@ -268,8 +268,8 @@ end_unlock:
  *
  * Return 0 on success or else a negative value.
  */
-static int make_viewer_streams(struct relay_session *session,
-               struct lttng_trace_chunk *viewer_trace_chunk,
+static int make_viewer_streams(struct relay_session *relay_session,
+               struct relay_viewer_session *viewer_session,
                enum lttng_viewer_seek seek_t,
                uint32_t *nb_total,
                uint32_t *nb_unsent,
@@ -279,18 +279,12 @@ static int make_viewer_streams(struct relay_session *session,
        int ret;
        struct lttng_ht_iter iter;
        struct ctf_trace *ctf_trace;
+       struct relay_stream *relay_stream = NULL;
 
-       assert(session);
-       ASSERT_LOCKED(session->lock);
+       assert(relay_session);
+       ASSERT_LOCKED(relay_session->lock);
 
-       if (!viewer_trace_chunk) {
-               ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk",
-                               session->session_name);
-               ret = -1;
-               goto error;
-       }
-
-       if (session->connection_closed) {
+       if (relay_session->connection_closed) {
                *closed = true;
        }
 
@@ -299,10 +293,9 @@ static int make_viewer_streams(struct relay_session *session,
         * used for a the given session id only.
         */
        rcu_read_lock();
-       cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
-                       node.node) {
+       cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter,
+                       ctf_trace, node.node) {
                bool trace_has_metadata_stream = false;
-               struct relay_stream *stream;
 
                health_code_update();
 
@@ -314,15 +307,23 @@ static int make_viewer_streams(struct relay_session *session,
                 * Iterate over all the streams of the trace to see if we have a
                 * metadata stream.
                 */
-               cds_list_for_each_entry_rcu(
-                               stream, &ctf_trace->stream_list, stream_node)
+               cds_list_for_each_entry_rcu(relay_stream,
+                               &ctf_trace->stream_list, stream_node)
                {
-                       if (stream->is_metadata) {
+                       bool is_metadata_stream;
+
+                       pthread_mutex_lock(&relay_stream->lock);
+                       is_metadata_stream = relay_stream->is_metadata;
+                       pthread_mutex_unlock(&relay_stream->lock);
+
+                       if (is_metadata_stream) {
                                trace_has_metadata_stream = true;
                                break;
                        }
                }
 
+               relay_stream = NULL;
+
                /*
                 * If there is no metadata stream in this trace at the moment
                 * and we never sent one to the viewer, skip the trace. We
@@ -334,35 +335,89 @@ static int make_viewer_streams(struct relay_session *session,
                        continue;
                }
 
-               cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
-                       struct relay_viewer_stream *vstream;
+               cds_list_for_each_entry_rcu(relay_stream,
+                               &ctf_trace->stream_list, stream_node)
+               {
+                       struct relay_viewer_stream *viewer_stream;
 
-                       if (!stream_get(stream)) {
+                       if (!stream_get(relay_stream)) {
                                continue;
                        }
+
+                       pthread_mutex_lock(&relay_stream->lock);
                        /*
                         * stream published is protected by the session lock.
                         */
-                       if (!stream->published) {
+                       if (!relay_stream->published) {
                                goto next;
                        }
-                       vstream = viewer_stream_get_by_id(stream->stream_handle);
-                       if (!vstream) {
+                       viewer_stream = viewer_stream_get_by_id(
+                                       relay_stream->stream_handle);
+                       if (!viewer_stream) {
+                               struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
+
                                /*
                                 * Save that we sent the metadata stream to the
                                 * viewer. So that we know what trace the viewer
                                 * is aware of.
                                 */
-                               if (stream->is_metadata) {
-                                       ctf_trace->metadata_stream_sent_to_viewer =
-                                                       true;
+                               if (relay_stream->is_metadata) {
+                                       ctf_trace->metadata_stream_sent_to_viewer = true;
+                               }
+
+                               /*
+                                * If a rotation is ongoing, use a copy of the
+                                * relay stream's chunk to ensure the stream
+                                * files exist.
+                                *
+                                * Otherwise, the viewer session's current trace
+                                * chunk can be used safely.
+                                */
+                               if ((relay_stream->ongoing_rotation.is_set ||
+                                                   relay_session->ongoing_rotation) &&
+                                               relay_stream->trace_chunk) {
+                                       viewer_stream_trace_chunk = lttng_trace_chunk_copy(
+                                                       relay_stream->trace_chunk);
+                                       if (!viewer_stream_trace_chunk) {
+                                               ret = -1;
+                                               ctf_trace_put(ctf_trace);
+                                               goto error_unlock;
+                                       }
+                               } else {
+                                       bool reference_acquired;
+
+                                       /*
+                                        * Transition the viewer session into the newest trace chunk available.
+                                        */
+                                       if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk,
+                                                       relay_stream->trace_chunk)) {
+
+                                               ret = viewer_session_set_trace_chunk_copy(
+                                                               viewer_session,
+                                                               relay_stream->trace_chunk);
+                                               if (ret) {
+                                                       ret = -1;
+                                                       ctf_trace_put(ctf_trace);
+                                                       goto error_unlock;
+                                               }
+                                       }
+
+                                       reference_acquired = lttng_trace_chunk_get(
+                                                       viewer_session->current_trace_chunk);
+                                       assert(reference_acquired);
+                                       viewer_stream_trace_chunk =
+                                                       viewer_session->current_trace_chunk;
                                }
-                               vstream = viewer_stream_create(stream,
-                                               viewer_trace_chunk, seek_t);
-                               if (!vstream) {
+
+                               viewer_stream = viewer_stream_create(
+                                               relay_stream,
+                                               viewer_stream_trace_chunk,
+                                               seek_t);
+                               lttng_trace_chunk_put(viewer_stream_trace_chunk);
+                               viewer_stream_trace_chunk = NULL;
+                               if (!viewer_stream) {
                                        ret = -1;
                                        ctf_trace_put(ctf_trace);
-                                       stream_put(stream);
                                        goto error_unlock;
                                }
 
@@ -374,36 +429,40 @@ static int make_viewer_streams(struct relay_session *session,
                                 * Ensure a self-reference is preserved even
                                 * after we have put our local reference.
                                 */
-                               if (!viewer_stream_get(vstream)) {
+                               if (!viewer_stream_get(viewer_stream)) {
                                        ERR("Unable to get self-reference on viewer stream, logic error.");
                                        abort();
                                }
                        } else {
-                               if (!vstream->sent_flag && nb_unsent) {
+                               if (!viewer_stream->sent_flag && nb_unsent) {
                                        /* Update number of unsent stream counter. */
                                        (*nb_unsent)++;
                                }
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               if (stream->is_metadata) {
-                                       if (!stream->closed ||
-                                                       stream->metadata_received > vstream->metadata_sent) {
+                               if (relay_stream->is_metadata) {
+                                       if (!relay_stream->closed ||
+                                                       relay_stream->metadata_received >
+                                                                       viewer_stream->metadata_sent) {
                                                (*nb_total)++;
                                        }
                                } else {
-                                       if (!stream->closed ||
-                                               !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) {
-
+                                       if (!relay_stream->closed ||
+                                                       !(((int64_t)(relay_stream->prev_data_seq -
+                                                                         relay_stream->last_net_seq_num)) >=
+                                                                       0)) {
                                                (*nb_total)++;
                                        }
                                }
                        }
                        /* Put local reference. */
-                       viewer_stream_put(vstream);
+                       viewer_stream_put(viewer_stream);
                next:
-                       stream_put(stream);
+                       pthread_mutex_unlock(&relay_stream->lock);
+                       stream_put(relay_stream);
                }
+               relay_stream = NULL;
                ctf_trace_put(ctf_trace);
        }
 
@@ -411,7 +470,12 @@ static int make_viewer_streams(struct relay_session *session,
 
 error_unlock:
        rcu_read_unlock();
-error:
+
+       if (relay_stream) {
+               pthread_mutex_unlock(&relay_stream->lock);
+               stream_put(relay_stream);
+       }
+
        return ret;
 }
 
@@ -439,6 +503,10 @@ int create_named_thread_poll_set(struct lttng_poll_event *events,
 
        ret = fd_tracker_util_poll_create(the_fd_tracker,
                        name, events, 1, LTTNG_CLOEXEC);
+       if (ret) {
+               PERROR("Failed to create \"%s\" poll file descriptor", name);
+               goto error;
+       }
 
        /* Add quit pipe */
        ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
@@ -561,7 +629,11 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
        ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
                        (const char **) (formated_name ? &formated_name : NULL),
                        1, create_sock, sock);
-       free(formated_name);
+       if (ret) {
+               PERROR("Failed to create \"%s\" socket",
+                               formated_name ?: "Unknown");
+               goto error;
+       }
        DBG("Listening on %s socket %d", name, sock->fd);
 
        ret = sock->ops->bind(sock);
@@ -576,12 +648,14 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
 
        }
 
+       free(formated_name);
        return sock;
 
 error:
        if (sock) {
                lttcomm_destroy_sock(sock);
        }
+       free(formated_name);
        return NULL;
 }
 
@@ -954,14 +1028,6 @@ int viewer_list_sessions(struct relay_connection *conn)
                        /* Skip closed session */
                        goto next_session;
                }
-               if (!session->current_trace_chunk) {
-                       /*
-                        * Skip un-attachable session. It is either
-                        * being destroyed or has not had a trace
-                        * chunk created against it yet.
-                        */
-                       goto next_session;
-               }
 
                if (count >= buf_count) {
                        struct lttng_viewer_session *newbuf;
@@ -1078,7 +1144,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
 
        pthread_mutex_lock(&session->lock);
        ret = make_viewer_streams(session,
-                       conn->viewer_session->current_trace_chunk,
+                       conn->viewer_session,
                        LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
@@ -1190,15 +1256,6 @@ int viewer_attach_session(struct relay_connection *conn)
        DBG("Attach session ID %" PRIu64 " received", session_id);
 
        pthread_mutex_lock(&session->lock);
-       if (!session->current_trace_chunk) {
-               /*
-                * Session is either being destroyed or it never had a trace
-                * chunk created against it.
-                */
-               DBG("Session requested by live client has no current trace chunk, returning unknown session");
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
-               goto send_reply;
-       }
        if (session->live_timer == 0) {
                DBG("Not live session");
                response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
@@ -1227,7 +1284,7 @@ int viewer_attach_session(struct relay_connection *conn)
        }
 
        ret = make_viewer_streams(session,
-                       conn->viewer_session->current_trace_chunk, seek_type,
+                       conn->viewer_session, seek_type,
                        &nb_streams, NULL, NULL, &closed);
        if (ret < 0) {
                goto end_put_session;
@@ -1307,10 +1364,12 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->index_received_seqcount == 0) {
+       if (rstream->index_received_seqcount == 0 ||
+                       !vstream->stream_file.trace_chunk) {
                ret = -ENOENT;
                goto end;
        }
+
        chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
                        vstream->stream_file.trace_chunk, rstream->path_name,
                        rstream->channel_name, rstream->tracefile_size,
@@ -1464,6 +1523,24 @@ index_ready:
        return 1;
 }
 
+static
+void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
+                struct lttng_trace_chunk *new_trace_chunk)
+{
+       lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+
+       if (new_trace_chunk) {
+               const bool acquired_reference = lttng_trace_chunk_get(
+                               new_trace_chunk);
+
+               assert(acquired_reference);
+       }
+
+       vstream->stream_file.trace_chunk = new_trace_chunk;
+       viewer_stream_sync_tracefile_array_tail(vstream);
+       viewer_stream_close_files(vstream);
+}
+
 /*
  * Send the next index for a stream.
  *
@@ -1532,58 +1609,44 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
-       if (rstream->trace_chunk) {
-               uint64_t rchunk_id, vchunk_id;
+       /*
+        * Transition the viewer session into the newest trace chunk available.
+        */
+       if (!lttng_trace_chunk_ids_equal(
+                       conn->viewer_session->current_trace_chunk,
+                       rstream->trace_chunk)) {
+               DBG("Relay stream and viewer chunk ids differ");
 
-               /*
-                * If the relay stream is not yet closed, ensure the viewer
-                * chunk matches the relay chunk after clear.
-                */
-               if (lttng_trace_chunk_get_id(rstream->trace_chunk,
-                               &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
-                       goto send_reply;
-               }
-               if (lttng_trace_chunk_get_id(
-                               conn->viewer_session->current_trace_chunk,
-                               &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = viewer_session_set_trace_chunk_copy(
+                               conn->viewer_session,
+                               rstream->trace_chunk);
+               if (ret) {
                        viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                        goto send_reply;
                }
-
-               if (rchunk_id != vchunk_id) {
-                       DBG("Relay and viewer chunk ids differ: "
-                               "rchunk_id %" PRIu64 " vchunk_id %" PRIu64,
-                               rchunk_id, vchunk_id);
-
-                       lttng_trace_chunk_put(
-                               conn->viewer_session->current_trace_chunk);
-                       conn->viewer_session->current_trace_chunk = NULL;
-                       ret = viewer_session_set_trace_chunk_copy(
-                                       conn->viewer_session,
-                                       rstream->trace_chunk);
-                       if (ret) {
-                               viewer_index.status =
-                                       htobe32(LTTNG_VIEWER_INDEX_ERR);
-                               goto send_reply;
-                       }
-               }
        }
-       if (conn->viewer_session->current_trace_chunk !=
-                       vstream->stream_file.trace_chunk) {
-               bool acquired_reference;
 
+       /*
+        * Transition the viewer stream into the latest trace chunk available.
+        *
+        * Note that the stream must _not_ rotate in one precise condition:
+        * the relay stream has rotated to a NULL trace chunk and the viewer
+        * stream is consuming the trace chunk that was active just before
+        * that rotation to NULL.
+        *
+        * This allows clients to consume all the packets of a trace chunk
+        * after a session's destruction.
+        */
+       if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
+                       !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
                DBG("Viewer session and viewer stream chunk differ: "
                                "vsession chunk %p vstream chunk %p",
                                conn->viewer_session->current_trace_chunk,
                                vstream->stream_file.trace_chunk);
-               lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
-               acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
-               assert(acquired_reference);
-               vstream->stream_file.trace_chunk =
-                       conn->viewer_session->current_trace_chunk;
-               viewer_stream_sync_tracefile_array_tail(vstream);
-               viewer_stream_close_files(vstream);
+               viewer_stream_rotate_to_trace_chunk(vstream,
+                               conn->viewer_session->current_trace_chunk);
+               vstream->last_seen_rotation_count =
+                               rstream->completed_rotation_count;
        }
 
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
@@ -1814,6 +1877,8 @@ int viewer_get_packet(struct relay_connection *conn)
        goto send_reply;
 
 error:
+       /* No payload to send on error. */
+       reply_size = sizeof(reply_header);
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
@@ -1926,10 +1991,45 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto send_reply;
        }
 
+       if (vstream->stream->trace_chunk &&
+                       !lttng_trace_chunk_ids_equal(
+                               conn->viewer_session->current_trace_chunk,
+                               vstream->stream->trace_chunk)) {
+               /* A rotation has occurred on the relay stream. */
+               DBG("Metadata relay stream and viewer chunk ids differ");
+
+               ret = viewer_session_set_trace_chunk_copy(
+                               conn->viewer_session,
+                               vstream->stream->trace_chunk);
+               if (ret) {
+                       reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
+                       goto send_reply;
+               }
+       }
+
+       if (conn->viewer_session->current_trace_chunk !=
+                       vstream->stream_file.trace_chunk) {
+               bool acquired_reference;
+
+               DBG("Viewer session and viewer stream chunk differ: "
+                               "vsession chunk %p vstream chunk %p",
+                               conn->viewer_session->current_trace_chunk,
+                               vstream->stream_file.trace_chunk);
+               lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+               acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
+               assert(acquired_reference);
+               vstream->stream_file.trace_chunk =
+                       conn->viewer_session->current_trace_chunk;
+               viewer_stream_close_files(vstream);
+       }
+
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
-       /* first time, we open the metadata file */
-       if (!vstream->stream_file.handle) {
+       /*
+        * Either this is the first time the metadata file is read, or a
+        * rotation of the corresponding relay stream has occured.
+        */
+       if (!vstream->stream_file.handle && len > 0) {
                struct fs_handle *fs_handle;
                char file_path[LTTNG_PATH_MAX];
                enum lttng_trace_chunk_status status;
@@ -1964,6 +2064,33 @@ int viewer_get_metadata(struct relay_connection *conn)
                        goto error;
                }
                vstream->stream_file.handle = fs_handle;
+
+               if (vstream->metadata_sent != 0) {
+                       /*
+                        * The client does not expect to receive any metadata
+                        * it has received and metadata files in successive
+                        * chunks must be a strict superset of one another.
+                        *
+                        * Skip the first `metadata_sent` bytes to ensure
+                        * they are not sent a second time to the client.
+                        *
+                        * Baring a block layer error or an internal error,
+                        * this seek should not fail as
+                        * `vstream->stream->metadata_received` is reset when
+                        * a relay stream is rotated. If this is reached, it is
+                        * safe to assume that
+                        * `metadata_received` > `metadata_sent`.
+                        */
+                       const off_t seek_ret = fs_handle_seek(fs_handle,
+                                       vstream->metadata_sent, SEEK_SET);
+
+                       if (seek_ret < 0) {
+                               PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64,
+                                               vstream->metadata_sent);
+                               reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
+                               goto send_reply;
+                       }
+               }
        }
 
        reply.len = htobe64(len);
@@ -1982,8 +2109,34 @@ int viewer_get_metadata(struct relay_connection *conn)
        fs_handle_put_fd(vstream->stream_file.handle);
        fd = -1;
        if (read_len < len) {
-               PERROR("Relay reading metadata file");
-               goto error;
+               if (read_len < 0) {
+                       PERROR("Failed to read metadata file");
+                       goto error;
+               } else {
+                       /*
+                        * A clear has been performed which prevents the relay
+                        * from sending `len` bytes of metadata.
+                        *
+                        * It is important not to send any metadata if we
+                        * couldn't read all the available metadata in one shot:
+                        * sending partial metadata can cause the client to
+                        * attempt to parse an incomplete (incoherent) metadata
+                        * stream, which would result in an error.
+                        */
+                       const off_t seek_ret = fs_handle_seek(
+                                       vstream->stream_file.handle, -read_len,
+                                       SEEK_CUR);
+
+                       DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd",
+                                       len, read_len);
+                       read_len = 0;
+                       len = 0;
+                       if (seek_ret < 0) {
+                               PERROR("Failed to restore metadata file position after partial read");
+                               ret = -1;
+                               goto error;
+                       }
+               }
        }
        vstream->metadata_sent += read_len;
        reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
This page took 0.031731 seconds and 4 git commands to generate.