X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=37bdfc2e5a2c5326ae0104523d6c4d3c2994a98a;hb=004e1d13c2df939c3f7cd51fbbc89f422864bf4c;hp=5877467c0d5598f0cbdc7481f08c144a0b7580b1;hpb=80516611b6f19201b1e173fb448935aca7a9e668;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 5877467c0..37bdfc2e5 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -374,7 +374,7 @@ static int make_viewer_streams(struct relay_session *relay_session, * chunk can be used safely. */ if ((relay_stream->ongoing_rotation.is_set || - relay_session->ongoing_rotation) && + session_has_ongoing_rotation(relay_session)) && relay_stream->trace_chunk) { viewer_stream_trace_chunk = lttng_trace_chunk_copy( relay_stream->trace_chunk); @@ -384,8 +384,6 @@ static int make_viewer_streams(struct relay_session *relay_session, goto error_unlock; } } else { - bool reference_acquired; - /* * Transition the viewer session into the newest trace chunk available. */ @@ -402,11 +400,26 @@ static int make_viewer_streams(struct relay_session *relay_session, } } - reference_acquired = lttng_trace_chunk_get( - viewer_session->current_trace_chunk); - assert(reference_acquired); - viewer_stream_trace_chunk = - viewer_session->current_trace_chunk; + if (relay_stream->trace_chunk) { + /* + * If the corresponding relay + * stream's trace chunk is set, + * the viewer stream will be + * created under it. + * + * Note that a relay stream can + * have a NULL output trace + * chunk (for instance, after a + * clear against a stopped + * session). + */ + const bool reference_acquired = lttng_trace_chunk_get( + viewer_session->current_trace_chunk); + + assert(reference_acquired); + viewer_stream_trace_chunk = + viewer_session->current_trace_chunk; + } } viewer_stream = viewer_stream_create( @@ -1142,13 +1155,37 @@ int viewer_get_new_streams(struct relay_connection *conn) goto send_reply; } + /* + * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since + * that at this point the client is already attached to the session.Aany + * initial stream will have been created with the seek type at attach + * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach). + * Otherwise any event happening in a new stream between the attach and + * a call to viewer_get_new_streams will be "lost" (never received) from + * the viewer's point of view. + */ pthread_mutex_lock(&session->lock); + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session_has_ongoing_rotation(session)) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW); + goto send_reply_unlock; + } ret = make_viewer_streams(session, conn->viewer_session, - LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, + LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { - goto error_unlock_session; + /* + * This is caused by an internal error; propagate the negative + * 'ret' to close the connection. + */ + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); + goto send_reply_unlock; } send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1203,10 +1240,6 @@ end_put_session: } error: return ret; -error_unlock_session: - pthread_mutex_unlock(&session->lock); - session_put(session); - return ret; } /* @@ -1283,6 +1316,17 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session_has_ongoing_rotation(session)) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + send_streams = 0; + goto send_reply; + } + ret = make_viewer_streams(session, conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); @@ -1587,6 +1631,13 @@ int viewer_get_next_index(struct relay_connection *conn) metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace); + /* + * Hold the session lock to protect against concurrent changes + * to the chunk files (e.g. rename done by clear), which are + * protected by the session ongoing rotation state. Those are + * synchronized with the session lock. + */ + pthread_mutex_lock(&rstream->trace->session->lock); pthread_mutex_lock(&rstream->lock); /* @@ -1603,7 +1654,7 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - if (rstream->trace->session->ongoing_rotation) { + if (session_has_ongoing_rotation(rstream->trace->session)) { /* Rotation is ongoing, try again later. */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); goto send_reply; @@ -1637,9 +1688,9 @@ int viewer_get_next_index(struct relay_connection *conn) * This allows clients to consume all the packets of a trace chunk * after a session's destruction. */ - if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk && + if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) && !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { - DBG("Viewer session and viewer stream chunk differ: " + DBG("Viewer session and viewer stream chunk IDs differ: " "vsession chunk %p vstream chunk %p", conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); @@ -1753,6 +1804,7 @@ int viewer_get_next_index(struct relay_connection *conn) send_reply: if (rstream) { pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); } if (metadata_viewer_stream) { @@ -1794,6 +1846,7 @@ end: error_put: pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); if (metadata_viewer_stream) { viewer_stream_put(metadata_viewer_stream); } @@ -1982,11 +2035,11 @@ int viewer_get_metadata(struct relay_connection *conn) * an error. */ if (vstream->metadata_sent > 0) { - vstream->stream->no_new_metadata_notified = true; - if (vstream->stream->closed) { + if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { /* Release ownership for the viewer metadata stream. */ viewer_stream_put(vstream); } + vstream->stream->no_new_metadata_notified = true; } goto send_reply; } @@ -2007,8 +2060,9 @@ int viewer_get_metadata(struct relay_connection *conn) } } - if (conn->viewer_session->current_trace_chunk != - vstream->stream_file.trace_chunk) { + if (conn->viewer_session->current_trace_chunk && + !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk)) { bool acquired_reference; DBG("Viewer session and viewer stream chunk differ: " @@ -2025,11 +2079,16 @@ int viewer_get_metadata(struct relay_connection *conn) len = vstream->stream->metadata_received - vstream->metadata_sent; - /* - * Either this is the first time the metadata file is read, or a - * rotation of the corresponding relay stream has occured. - */ - if (!vstream->stream_file.handle && len > 0) { + if (!vstream->stream_file.trace_chunk) { + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); + len = 0; + goto send_reply; + } else if (vstream->stream_file.trace_chunk && + !vstream->stream_file.handle && len > 0) { + /* + * Either this is the first time the metadata file is read, or a + * rotation of the corresponding relay stream has occurred. + */ struct fs_handle *fs_handle; char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status;