X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=5877467c0d5598f0cbdc7481f08c144a0b7580b1;hp=f28b9c66fa90fb89301287b8af9910d0aac7f5f0;hb=80516611b6f19201b1e173fb448935aca7a9e668;hpb=8fba2b8dfd9d2e9070741e082b9c1f84cb33f799 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index f28b9c66f..5877467c0 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -192,7 +192,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag) { ssize_t ret; - struct lttng_viewer_stream send_stream; struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; @@ -201,6 +200,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { struct ctf_trace *ctf_trace; + struct lttng_viewer_stream send_stream = {}; health_code_update(); @@ -268,8 +268,8 @@ end_unlock: * * Return 0 on success or else a negative value. */ -static int make_viewer_streams(struct relay_session *session, - struct lttng_trace_chunk *viewer_trace_chunk, +static int make_viewer_streams(struct relay_session *relay_session, + struct relay_viewer_session *viewer_session, enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent, @@ -279,18 +279,12 @@ static int make_viewer_streams(struct relay_session *session, int ret; struct lttng_ht_iter iter; struct ctf_trace *ctf_trace; + struct relay_stream *relay_stream = NULL; - assert(session); - ASSERT_LOCKED(session->lock); + assert(relay_session); + ASSERT_LOCKED(relay_session->lock); - if (!viewer_trace_chunk) { - ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk", - session->session_name); - ret = -1; - goto error; - } - - if (session->connection_closed) { + if (relay_session->connection_closed) { *closed = true; } @@ -299,10 +293,9 @@ static int make_viewer_streams(struct relay_session *session, * used for a the given session id only. */ rcu_read_lock(); - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, - node.node) { + cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter, + ctf_trace, node.node) { bool trace_has_metadata_stream = false; - struct relay_stream *stream; health_code_update(); @@ -314,15 +307,23 @@ static int make_viewer_streams(struct relay_session *session, * Iterate over all the streams of the trace to see if we have a * metadata stream. */ - cds_list_for_each_entry_rcu( - stream, &ctf_trace->stream_list, stream_node) + cds_list_for_each_entry_rcu(relay_stream, + &ctf_trace->stream_list, stream_node) { - if (stream->is_metadata) { + bool is_metadata_stream; + + pthread_mutex_lock(&relay_stream->lock); + is_metadata_stream = relay_stream->is_metadata; + pthread_mutex_unlock(&relay_stream->lock); + + if (is_metadata_stream) { trace_has_metadata_stream = true; break; } } + relay_stream = NULL; + /* * If there is no metadata stream in this trace at the moment * and we never sent one to the viewer, skip the trace. We @@ -334,35 +335,89 @@ static int make_viewer_streams(struct relay_session *session, continue; } - cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) { - struct relay_viewer_stream *vstream; + cds_list_for_each_entry_rcu(relay_stream, + &ctf_trace->stream_list, stream_node) + { + struct relay_viewer_stream *viewer_stream; - if (!stream_get(stream)) { + if (!stream_get(relay_stream)) { continue; } + + pthread_mutex_lock(&relay_stream->lock); /* * stream published is protected by the session lock. */ - if (!stream->published) { + if (!relay_stream->published) { goto next; } - vstream = viewer_stream_get_by_id(stream->stream_handle); - if (!vstream) { + viewer_stream = viewer_stream_get_by_id( + relay_stream->stream_handle); + if (!viewer_stream) { + struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL; + /* * Save that we sent the metadata stream to the * viewer. So that we know what trace the viewer * is aware of. */ - if (stream->is_metadata) { - ctf_trace->metadata_stream_sent_to_viewer = - true; + if (relay_stream->is_metadata) { + ctf_trace->metadata_stream_sent_to_viewer = true; + } + + /* + * If a rotation is ongoing, use a copy of the + * relay stream's chunk to ensure the stream + * files exist. + * + * Otherwise, the viewer session's current trace + * chunk can be used safely. + */ + if ((relay_stream->ongoing_rotation.is_set || + relay_session->ongoing_rotation) && + relay_stream->trace_chunk) { + viewer_stream_trace_chunk = lttng_trace_chunk_copy( + relay_stream->trace_chunk); + if (!viewer_stream_trace_chunk) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } + } else { + bool reference_acquired; + + /* + * Transition the viewer session into the newest trace chunk available. + */ + if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk, + relay_stream->trace_chunk)) { + + ret = viewer_session_set_trace_chunk_copy( + viewer_session, + relay_stream->trace_chunk); + if (ret) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } + } + + reference_acquired = lttng_trace_chunk_get( + viewer_session->current_trace_chunk); + assert(reference_acquired); + viewer_stream_trace_chunk = + viewer_session->current_trace_chunk; } - vstream = viewer_stream_create(stream, - viewer_trace_chunk, seek_t); - if (!vstream) { + + viewer_stream = viewer_stream_create( + relay_stream, + viewer_stream_trace_chunk, + seek_t); + lttng_trace_chunk_put(viewer_stream_trace_chunk); + viewer_stream_trace_chunk = NULL; + if (!viewer_stream) { ret = -1; ctf_trace_put(ctf_trace); - stream_put(stream); goto error_unlock; } @@ -374,36 +429,40 @@ static int make_viewer_streams(struct relay_session *session, * Ensure a self-reference is preserved even * after we have put our local reference. */ - if (!viewer_stream_get(vstream)) { + if (!viewer_stream_get(viewer_stream)) { ERR("Unable to get self-reference on viewer stream, logic error."); abort(); } } else { - if (!vstream->sent_flag && nb_unsent) { + if (!viewer_stream->sent_flag && nb_unsent) { /* Update number of unsent stream counter. */ (*nb_unsent)++; } } /* Update number of total stream counter. */ if (nb_total) { - if (stream->is_metadata) { - if (!stream->closed || - stream->metadata_received > vstream->metadata_sent) { + if (relay_stream->is_metadata) { + if (!relay_stream->closed || + relay_stream->metadata_received > + viewer_stream->metadata_sent) { (*nb_total)++; } } else { - if (!stream->closed || - !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) { - + if (!relay_stream->closed || + !(((int64_t)(relay_stream->prev_data_seq - + relay_stream->last_net_seq_num)) >= + 0)) { (*nb_total)++; } } } /* Put local reference. */ - viewer_stream_put(vstream); + viewer_stream_put(viewer_stream); next: - stream_put(stream); + pthread_mutex_unlock(&relay_stream->lock); + stream_put(relay_stream); } + relay_stream = NULL; ctf_trace_put(ctf_trace); } @@ -411,7 +470,12 @@ static int make_viewer_streams(struct relay_session *session, error_unlock: rcu_read_unlock(); -error: + + if (relay_stream) { + pthread_mutex_unlock(&relay_stream->lock); + stream_put(relay_stream); + } + return ret; } @@ -439,6 +503,10 @@ int create_named_thread_poll_set(struct lttng_poll_event *events, ret = fd_tracker_util_poll_create(the_fd_tracker, name, events, 1, LTTNG_CLOEXEC); + if (ret) { + PERROR("Failed to create \"%s\" poll file descriptor", name); + goto error; + } /* Add quit pipe */ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); @@ -561,7 +629,11 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, (const char **) (formated_name ? &formated_name : NULL), 1, create_sock, sock); - free(formated_name); + if (ret) { + PERROR("Failed to create \"%s\" socket", + formated_name ?: "Unknown"); + goto error; + } DBG("Listening on %s socket %d", name, sock->fd); ret = sock->ops->bind(sock); @@ -576,12 +648,14 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) } + free(formated_name); return sock; error: if (sock) { lttcomm_destroy_sock(sock); } + free(formated_name); return NULL; } @@ -954,14 +1028,6 @@ int viewer_list_sessions(struct relay_connection *conn) /* Skip closed session */ goto next_session; } - if (!session->current_trace_chunk) { - /* - * Skip un-attachable session. It is either - * being destroyed or has not had a trace - * chunk created against it yet. - */ - goto next_session; - } if (count >= buf_count) { struct lttng_viewer_session *newbuf; @@ -1078,7 +1144,7 @@ int viewer_get_new_streams(struct relay_connection *conn) pthread_mutex_lock(&session->lock); ret = make_viewer_streams(session, - conn->viewer_session->current_trace_chunk, + conn->viewer_session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { @@ -1190,15 +1256,6 @@ int viewer_attach_session(struct relay_connection *conn) DBG("Attach session ID %" PRIu64 " received", session_id); pthread_mutex_lock(&session->lock); - if (!session->current_trace_chunk) { - /* - * Session is either being destroyed or it never had a trace - * chunk created against it. - */ - DBG("Session requested by live client has no current trace chunk, returning unknown session"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); - goto send_reply; - } if (session->live_timer == 0) { DBG("Not live session"); response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE); @@ -1227,7 +1284,7 @@ int viewer_attach_session(struct relay_connection *conn) } ret = make_viewer_streams(session, - conn->viewer_session->current_trace_chunk, seek_type, + conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); if (ret < 0) { goto end_put_session; @@ -1307,10 +1364,12 @@ static int try_open_index(struct relay_viewer_stream *vstream, /* * First time, we open the index file and at least one index is ready. */ - if (rstream->index_received_seqcount == 0) { + if (rstream->index_received_seqcount == 0 || + !vstream->stream_file.trace_chunk) { ret = -ENOENT; goto end; } + chunk_status = lttng_index_file_create_from_trace_chunk_read_only( vstream->stream_file.trace_chunk, rstream->path_name, rstream->channel_name, rstream->tracefile_size, @@ -1464,6 +1523,24 @@ index_ready: return 1; } +static +void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, + struct lttng_trace_chunk *new_trace_chunk) +{ + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + + if (new_trace_chunk) { + const bool acquired_reference = lttng_trace_chunk_get( + new_trace_chunk); + + assert(acquired_reference); + } + + vstream->stream_file.trace_chunk = new_trace_chunk; + viewer_stream_sync_tracefile_array_tail(vstream); + viewer_stream_close_files(vstream); +} + /* * Send the next index for a stream. * @@ -1532,58 +1609,44 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - if (rstream->trace_chunk) { - uint64_t rchunk_id, vchunk_id; + /* + * Transition the viewer session into the newest trace chunk available. + */ + if (!lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + rstream->trace_chunk)) { + DBG("Relay stream and viewer chunk ids differ"); - /* - * If the relay stream is not yet closed, ensure the viewer - * chunk matches the relay chunk after clear. - */ - if (lttng_trace_chunk_get_id(rstream->trace_chunk, - &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; - } - if (lttng_trace_chunk_get_id( - conn->viewer_session->current_trace_chunk, - &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = viewer_session_set_trace_chunk_copy( + conn->viewer_session, + rstream->trace_chunk); + if (ret) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } - - if (rchunk_id != vchunk_id) { - DBG("Relay and viewer chunk ids differ: " - "rchunk_id %" PRIu64 " vchunk_id %" PRIu64, - rchunk_id, vchunk_id); - - lttng_trace_chunk_put( - conn->viewer_session->current_trace_chunk); - conn->viewer_session->current_trace_chunk = NULL; - ret = viewer_session_set_trace_chunk_copy( - conn->viewer_session, - rstream->trace_chunk); - if (ret) { - viewer_index.status = - htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; - } - } } - if (conn->viewer_session->current_trace_chunk != - vstream->stream_file.trace_chunk) { - bool acquired_reference; + /* + * Transition the viewer stream into the latest trace chunk available. + * + * Note that the stream must _not_ rotate in one precise condition: + * the relay stream has rotated to a NULL trace chunk and the viewer + * stream is consuming the trace chunk that was active just before + * that rotation to NULL. + * + * This allows clients to consume all the packets of a trace chunk + * after a session's destruction. + */ + if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk && + !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { DBG("Viewer session and viewer stream chunk differ: " "vsession chunk %p vstream chunk %p", conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); - lttng_trace_chunk_put(vstream->stream_file.trace_chunk); - acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); - assert(acquired_reference); - vstream->stream_file.trace_chunk = - conn->viewer_session->current_trace_chunk; - viewer_stream_sync_tracefile_array_tail(vstream); - viewer_stream_close_files(vstream); + viewer_stream_rotate_to_trace_chunk(vstream, + conn->viewer_session->current_trace_chunk); + vstream->last_seen_rotation_count = + rstream->completed_rotation_count; } ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); @@ -1814,6 +1877,8 @@ int viewer_get_packet(struct relay_connection *conn) goto send_reply; error: + /* No payload to send on error. */ + reply_size = sizeof(reply_header); reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: @@ -1926,10 +1991,45 @@ int viewer_get_metadata(struct relay_connection *conn) goto send_reply; } + if (vstream->stream->trace_chunk && + !lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + vstream->stream->trace_chunk)) { + /* A rotation has occurred on the relay stream. */ + DBG("Metadata relay stream and viewer chunk ids differ"); + + ret = viewer_session_set_trace_chunk_copy( + conn->viewer_session, + vstream->stream->trace_chunk); + if (ret) { + reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); + goto send_reply; + } + } + + if (conn->viewer_session->current_trace_chunk != + vstream->stream_file.trace_chunk) { + bool acquired_reference; + + DBG("Viewer session and viewer stream chunk differ: " + "vsession chunk %p vstream chunk %p", + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); + assert(acquired_reference); + vstream->stream_file.trace_chunk = + conn->viewer_session->current_trace_chunk; + viewer_stream_close_files(vstream); + } + len = vstream->stream->metadata_received - vstream->metadata_sent; - /* first time, we open the metadata file */ - if (!vstream->stream_file.handle) { + /* + * Either this is the first time the metadata file is read, or a + * rotation of the corresponding relay stream has occured. + */ + if (!vstream->stream_file.handle && len > 0) { struct fs_handle *fs_handle; char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; @@ -1964,6 +2064,33 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } vstream->stream_file.handle = fs_handle; + + if (vstream->metadata_sent != 0) { + /* + * The client does not expect to receive any metadata + * it has received and metadata files in successive + * chunks must be a strict superset of one another. + * + * Skip the first `metadata_sent` bytes to ensure + * they are not sent a second time to the client. + * + * Baring a block layer error or an internal error, + * this seek should not fail as + * `vstream->stream->metadata_received` is reset when + * a relay stream is rotated. If this is reached, it is + * safe to assume that + * `metadata_received` > `metadata_sent`. + */ + const off_t seek_ret = fs_handle_seek(fs_handle, + vstream->metadata_sent, SEEK_SET); + + if (seek_ret < 0) { + PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64, + vstream->metadata_sent); + reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); + goto send_reply; + } + } } reply.len = htobe64(len); @@ -1982,8 +2109,34 @@ int viewer_get_metadata(struct relay_connection *conn) fs_handle_put_fd(vstream->stream_file.handle); fd = -1; if (read_len < len) { - PERROR("Relay reading metadata file"); - goto error; + if (read_len < 0) { + PERROR("Failed to read metadata file"); + goto error; + } else { + /* + * A clear has been performed which prevents the relay + * from sending `len` bytes of metadata. + * + * It is important not to send any metadata if we + * couldn't read all the available metadata in one shot: + * sending partial metadata can cause the client to + * attempt to parse an incomplete (incoherent) metadata + * stream, which would result in an error. + */ + const off_t seek_ret = fs_handle_seek( + vstream->stream_file.handle, -read_len, + SEEK_CUR); + + DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd", + len, read_len); + read_len = 0; + len = 0; + if (seek_ret < 0) { + PERROR("Failed to restore metadata file position after partial read"); + ret = -1; + goto error; + } + } } vstream->metadata_sent += read_len; reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);