X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=d963a69b4c1870d901f5fa6fb9b2ba58efa8a811;hb=8f141dbdf7c4628c13d7b099240a7bd4bcacf2cf;hp=c169be918f0b87891b8bc61785ff6610aa952f1a;hpb=e8b269fa71e5bccc0fd89bc7526bbcdf9f83222e;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index c169be918..d963a69b4 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,33 @@ static uint64_t last_relay_viewer_session_id; static pthread_mutex_t last_relay_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER; +static +const char *lttng_viewer_command_str(lttng_viewer_command cmd) +{ + switch (cmd) { + case LTTNG_VIEWER_CONNECT: + return "CONNECT"; + case LTTNG_VIEWER_LIST_SESSIONS: + return "LIST_SESSIONS"; + case LTTNG_VIEWER_ATTACH_SESSION: + return "ATTACH_SESSION"; + case LTTNG_VIEWER_GET_NEXT_INDEX: + return "GET_NEXT_INDEX"; + case LTTNG_VIEWER_GET_PACKET: + return "GET_PACKET"; + case LTTNG_VIEWER_GET_METADATA: + return "GET_METADATA"; + case LTTNG_VIEWER_GET_NEW_STREAMS: + return "GET_NEW_STREAMS"; + case LTTNG_VIEWER_CREATE_SESSION: + return "CREATE_SESSION"; + case LTTNG_VIEWER_DETACH_SESSION: + return "DETACH_SESSION"; + default: + abort(); + } +} + /* * Cleanup the daemon */ @@ -936,8 +964,6 @@ int viewer_connect(struct relay_connection *conn) health_code_update(); - DBG("Viewer is establishing a connection to the relayd."); - ret = recv_request(conn->sock, &msg, sizeof(msg)); if (ret < 0) { goto end; @@ -1024,8 +1050,6 @@ int viewer_list_sessions(struct relay_connection *conn) uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT; uint32_t count = 0; - DBG("List sessions received"); - send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf)); if (!send_session_buf) { return -1; @@ -1130,8 +1154,6 @@ int viewer_get_new_streams(struct relay_connection *conn) LTTNG_ASSERT(conn); - DBG("Get new streams received"); - health_code_update(); /* Receive the request from the connected client. */ @@ -1167,6 +1189,16 @@ int viewer_get_new_streams(struct relay_connection *conn) * the viewer's point of view. */ pthread_mutex_lock(&session->lock); + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session->ongoing_rotation) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW); + goto send_reply_unlock; + } ret = make_viewer_streams(session, conn->viewer_session, LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, @@ -1307,6 +1339,17 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session->ongoing_rotation) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + send_streams = 0; + goto send_reply; + } + ret = make_viewer_streams(session, conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); @@ -1581,11 +1624,12 @@ int viewer_get_next_index(struct relay_connection *conn) struct relay_stream *rstream = NULL; struct ctf_trace *ctf_trace = NULL; struct relay_viewer_stream *metadata_viewer_stream = NULL; + bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; + uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; + enum lttng_trace_chunk_status status; LTTNG_ASSERT(conn); - DBG("Viewer get next index"); - memset(&viewer_index, 0, sizeof(viewer_index)); health_code_update(); @@ -1611,6 +1655,13 @@ int viewer_get_next_index(struct relay_connection *conn) metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace); + /* + * Hold the session lock to protect against concurrent changes + * to the chunk files (e.g. rename done by clear), which are + * protected by the session ongoing rotation state. Those are + * synchronized with the session lock. + */ + pthread_mutex_lock(&rstream->trace->session->lock); pthread_mutex_lock(&rstream->lock); /* @@ -1661,12 +1712,50 @@ int viewer_get_next_index(struct relay_connection *conn) * This allows clients to consume all the packets of a trace chunk * after a session's destruction. */ - if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) && - !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { - DBG("Viewer session and viewer stream chunk IDs differ: " - "vsession chunk %p vstream chunk %p", + if (vstream->stream_file.trace_chunk) { + status = lttng_trace_chunk_get_id( + vstream->stream_file.trace_chunk, + &stream_file_chunk_id); + LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); + } + if (conn->viewer_session->current_trace_chunk) { + status = lttng_trace_chunk_get_id( conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + &viewer_session_chunk_id); + LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); + } + + viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + viewer_stream_one_rotation_behind = rstream->completed_rotation_count == + vstream->last_seen_rotation_count + 1; + + if (viewer_stream_and_session_in_same_chunk) { + DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) { + DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + } else { + DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + viewer_stream_rotate_to_trace_chunk(vstream, conn->viewer_session->current_trace_chunk); vstream->last_seen_rotation_count = @@ -1711,7 +1800,6 @@ int viewer_get_next_index(struct relay_connection *conn) */ if (!vstream->stream_file.handle) { char file_path[LTTNG_PATH_MAX]; - enum lttng_trace_chunk_status status; struct fs_handle *fs_handle; ret = utils_stream_file_path(rstream->path_name, @@ -1777,6 +1865,7 @@ int viewer_get_next_index(struct relay_connection *conn) send_reply: if (rstream) { pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); } if (metadata_viewer_stream) { @@ -1818,6 +1907,7 @@ end: error_put: pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); if (metadata_viewer_stream) { viewer_stream_put(metadata_viewer_stream); } @@ -1844,8 +1934,6 @@ int viewer_get_packet(struct relay_connection *conn) ssize_t read_len; uint64_t stream_id; - DBG2("Relay get data packet"); - health_code_update(); ret = recv_request(conn->sock, &get_packet_info, @@ -1958,8 +2046,6 @@ int viewer_get_metadata(struct relay_connection *conn) LTTNG_ASSERT(conn); - DBG("Relay get metadata"); - health_code_update(); ret = recv_request(conn->sock, &request, sizeof(request)); @@ -2007,9 +2093,16 @@ int viewer_get_metadata(struct relay_connection *conn) */ if (vstream->metadata_sent > 0) { if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { - /* Release ownership for the viewer metadata stream. */ + /* + * Release ownership for the viewer metadata + * stream. Note that this reference is the + * viewer's reference. The vstream still exists + * until the end of the function as + * viewer_stream_get_by_id() took a reference. + */ viewer_stream_put(vstream); } + vstream->stream->no_new_metadata_notified = true; } goto send_reply; @@ -2219,8 +2312,6 @@ int viewer_create_session(struct relay_connection *conn) int ret; struct lttng_viewer_create_session_response resp; - DBG("Viewer create session received"); - memset(&resp, 0, sizeof(resp)); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); conn->viewer_session = viewer_session_create(); @@ -2257,8 +2348,6 @@ int viewer_detach_session(struct relay_connection *conn) struct relay_session *session = NULL; uint64_t viewer_session_to_close; - DBG("Viewer detach session received"); - LTTNG_ASSERT(conn); health_code_update(); @@ -2337,21 +2426,24 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, struct relay_connection *conn) { int ret = 0; - uint32_t msg_value; - - msg_value = be32toh(recv_hdr->cmd); + lttng_viewer_command cmd = + (lttng_viewer_command) be32toh(recv_hdr->cmd); /* - * Make sure we've done the version check before any command other then a - * new client connection. + * Make sure we've done the version check before any command other then + * a new client connection. */ - if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) { - ERR("Viewer conn value %" PRIu32 " before version check", msg_value); + if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) { + ERR("Viewer on connection %d requested %s command before version check", + conn->sock->fd, lttng_viewer_command_str(cmd)); ret = -1; goto end; } - switch (msg_value) { + DBG("Processing %s viewer command from connection %d", + lttng_viewer_command_str(cmd), conn->sock->fd); + + switch (cmd) { case LTTNG_VIEWER_CONNECT: ret = viewer_connect(conn); break;