X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=96ec11efcde387e67e8884b63593f705933effb4;hb=bb1dcf019b7b1c2057cc34afde9172d12fc3981e;hp=bfef8b9f948d31bb1845672e54a473bfbbbbee9b;hpb=d8f644d930cbd10bccec3c522bc8de95099827f3;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index bfef8b9f9..96ec11efc 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -1167,6 +1168,16 @@ int viewer_get_new_streams(struct relay_connection *conn) * the viewer's point of view. */ pthread_mutex_lock(&session->lock); + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session->ongoing_rotation) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW); + goto send_reply_unlock; + } ret = make_viewer_streams(session, conn->viewer_session, LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, @@ -1307,6 +1318,17 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session->ongoing_rotation) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + send_streams = 0; + goto send_reply; + } + ret = make_viewer_streams(session, conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); @@ -1581,6 +1603,9 @@ int viewer_get_next_index(struct relay_connection *conn) struct relay_stream *rstream = NULL; struct ctf_trace *ctf_trace = NULL; struct relay_viewer_stream *metadata_viewer_stream = NULL; + bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; + uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; + enum lttng_trace_chunk_status status; LTTNG_ASSERT(conn); @@ -1661,12 +1686,50 @@ int viewer_get_next_index(struct relay_connection *conn) * This allows clients to consume all the packets of a trace chunk * after a session's destruction. */ - if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) && - !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { - DBG("Viewer session and viewer stream chunk IDs differ: " - "vsession chunk %p vstream chunk %p", + if (vstream->stream_file.trace_chunk) { + status = lttng_trace_chunk_get_id( + vstream->stream_file.trace_chunk, + &stream_file_chunk_id); + LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); + } + if (conn->viewer_session->current_trace_chunk) { + status = lttng_trace_chunk_get_id( conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + &viewer_session_chunk_id); + LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); + } + + viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + viewer_stream_one_rotation_behind = rstream->completed_rotation_count == + vstream->last_seen_rotation_count + 1; + + if (viewer_stream_and_session_in_same_chunk) { + DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) { + DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + } else { + DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + viewer_stream_rotate_to_trace_chunk(vstream, conn->viewer_session->current_trace_chunk); vstream->last_seen_rotation_count = @@ -1711,7 +1774,6 @@ int viewer_get_next_index(struct relay_connection *conn) */ if (!vstream->stream_file.handle) { char file_path[LTTNG_PATH_MAX]; - enum lttng_trace_chunk_status status; struct fs_handle *fs_handle; ret = utils_stream_file_path(rstream->path_name, @@ -2006,11 +2068,11 @@ int viewer_get_metadata(struct relay_connection *conn) * an error. */ if (vstream->metadata_sent > 0) { - vstream->stream->no_new_metadata_notified = true; - if (vstream->stream->closed) { + if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { /* Release ownership for the viewer metadata stream. */ viewer_stream_put(vstream); } + vstream->stream->no_new_metadata_notified = true; } goto send_reply; }