X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=5aa899b282605a0c058f7f497f9b500ad7e0226b;hb=0283afa961253376a5e18de140d29fffd9e89ae3;hp=bfef8b9f948d31bb1845672e54a473bfbbbbee9b;hpb=d8f644d930cbd10bccec3c522bc8de95099827f3;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index bfef8b9f9..5aa899b28 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,95 @@ static uint64_t last_relay_viewer_session_id; static pthread_mutex_t last_relay_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER; +static +const char *lttng_viewer_command_str(lttng_viewer_command cmd) +{ + switch (cmd) { + case LTTNG_VIEWER_CONNECT: + return "CONNECT"; + case LTTNG_VIEWER_LIST_SESSIONS: + return "LIST_SESSIONS"; + case LTTNG_VIEWER_ATTACH_SESSION: + return "ATTACH_SESSION"; + case LTTNG_VIEWER_GET_NEXT_INDEX: + return "GET_NEXT_INDEX"; + case LTTNG_VIEWER_GET_PACKET: + return "GET_PACKET"; + case LTTNG_VIEWER_GET_METADATA: + return "GET_METADATA"; + case LTTNG_VIEWER_GET_NEW_STREAMS: + return "GET_NEW_STREAMS"; + case LTTNG_VIEWER_CREATE_SESSION: + return "CREATE_SESSION"; + case LTTNG_VIEWER_DETACH_SESSION: + return "DETACH_SESSION"; + default: + abort(); + } +} + +static +const char *lttng_viewer_next_index_return_code_str( + enum lttng_viewer_next_index_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_INDEX_OK: + return "INDEX_OK"; + case LTTNG_VIEWER_INDEX_RETRY: + return "INDEX_RETRY"; + case LTTNG_VIEWER_INDEX_HUP: + return "INDEX_HUP"; + case LTTNG_VIEWER_INDEX_ERR: + return "INDEX_ERR"; + case LTTNG_VIEWER_INDEX_INACTIVE: + return "INDEX_INACTIVE"; + case LTTNG_VIEWER_INDEX_EOF: + return "INDEX_EOF"; + default: + abort(); + } +} + +static +const char *lttng_viewer_attach_return_code_str( + enum lttng_viewer_attach_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_ATTACH_OK: + return "ATTACH_OK"; + case LTTNG_VIEWER_ATTACH_ALREADY: + return "ATTACH_ALREADY"; + case LTTNG_VIEWER_ATTACH_UNK: + return "ATTACH_UNK"; + case LTTNG_VIEWER_ATTACH_NOT_LIVE: + return "ATTACH_NOT_LIVE"; + case LTTNG_VIEWER_ATTACH_SEEK_ERR: + return "ATTACH_SEEK_ERR"; + case LTTNG_VIEWER_ATTACH_NO_SESSION: + return "ATTACH_NO_SESSION"; + default: + abort(); + } +}; + +static +const char *lttng_viewer_get_packet_return_code_str( + enum lttng_viewer_get_packet_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_GET_PACKET_OK: + return "GET_PACKET_OK"; + case LTTNG_VIEWER_GET_PACKET_RETRY: + return "GET_PACKET_RETRY"; + case LTTNG_VIEWER_GET_PACKET_ERR: + return "GET_PACKET_ERR"; + case LTTNG_VIEWER_GET_PACKET_EOF: + return "GET_PACKET_EOF"; + default: + abort(); + } +}; + /* * Cleanup the daemon */ @@ -374,7 +464,7 @@ static int make_viewer_streams(struct relay_session *relay_session, * chunk can be used safely. */ if ((relay_stream->ongoing_rotation.is_set || - relay_session->ongoing_rotation) && + session_has_ongoing_rotation(relay_session)) && relay_stream->trace_chunk) { viewer_stream_trace_chunk = lttng_trace_chunk_copy( relay_stream->trace_chunk); @@ -565,7 +655,7 @@ end: } static -int close_sock(void *data, int *in_fd) +int close_sock(void *data, int *in_fd __attribute__((unused))) { struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -676,7 +766,7 @@ error: * This thread manages the listening for new connections on the network */ static -void *thread_listener(void *data) +void *thread_listener(void *data __attribute__((unused))) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; @@ -841,7 +931,7 @@ error_sock_control: * This thread manages the dispatching of the requests to worker threads */ static -void *thread_dispatcher(void *data) +void *thread_dispatcher(void *data __attribute__((unused))) { int err = -1; ssize_t ret; @@ -936,8 +1026,6 @@ int viewer_connect(struct relay_connection *conn) health_code_update(); - DBG("Viewer is establishing a connection to the relayd."); - ret = recv_request(conn->sock, &msg, sizeof(msg)); if (ret < 0) { goto end; @@ -1024,8 +1112,6 @@ int viewer_list_sessions(struct relay_connection *conn) uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT; uint32_t count = 0; - DBG("List sessions received"); - send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf)); if (!send_session_buf) { return -1; @@ -1130,8 +1216,6 @@ int viewer_get_new_streams(struct relay_connection *conn) LTTNG_ASSERT(conn); - DBG("Get new streams received"); - health_code_update(); /* Receive the request from the connected client. */ @@ -1167,12 +1251,27 @@ int viewer_get_new_streams(struct relay_connection *conn) * the viewer's point of view. */ pthread_mutex_lock(&session->lock); + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session_has_ongoing_rotation(session)) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW); + goto send_reply_unlock; + } ret = make_viewer_streams(session, conn->viewer_session, LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { - goto error_unlock_session; + /* + * This is caused by an internal error; propagate the negative + * 'ret' to close the connection. + */ + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); + goto send_reply_unlock; } send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1227,10 +1326,6 @@ end_put_session: } error: return ret; -error_unlock_session: - pthread_mutex_unlock(&session->lock); - session_put(session); - return ret; } /* @@ -1261,28 +1356,34 @@ int viewer_attach_session(struct relay_connection *conn) } session_id = be64toh(request.session_id); + health_code_update(); memset(&response, 0, sizeof(response)); if (!conn->viewer_session) { - DBG("Client trying to attach before creating a live viewer session"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION); + viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION; + DBG("Client trying to attach before creating a live viewer session, returning status=%s", + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } session = session_get_by_id(session_id); if (!session) { - DBG("Relay session %" PRIu64 " not found", session_id); - response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); + viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; + DBG("Relay session %" PRIu64 " not found, returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } - DBG("Attach session ID %" PRIu64 " received", session_id); + DBG("Attach relay session ID %" PRIu64 " received", session_id); pthread_mutex_lock(&session->lock); if (session->live_timer == 0) { - DBG("Not live session"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE); + viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE; + DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1290,19 +1391,34 @@ int viewer_attach_session(struct relay_connection *conn) viewer_attach_status = viewer_session_attach(conn->viewer_session, session); if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) { - response.status = htobe32(viewer_attach_status); + DBG("Error attaching to relay session %" PRIu64 ", returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } switch (be32toh(request.seek)) { case LTTNG_VIEWER_SEEK_BEGINNING: case LTTNG_VIEWER_SEEK_LAST: - response.status = htobe32(LTTNG_VIEWER_ATTACH_OK); + viewer_attach_status = LTTNG_VIEWER_ATTACH_OK; seek_type = (lttng_viewer_seek) be32toh(request.seek); break; default: - ERR("Wrong seek parameter"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR); + ERR("Wrong seek parameter for relay session %" PRIu64 + ", returning status=%s", session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); + viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR; + send_streams = 0; + goto send_reply; + } + + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session_has_ongoing_rotation(session)) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); send_streams = 0; goto send_reply; } @@ -1327,12 +1443,18 @@ int viewer_attach_session(struct relay_connection *conn) if (closed) { send_streams = 0; response.streams_count = 0; - response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); + viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; + ERR("Session %" PRIu64 " is closed, returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } send_reply: health_code_update(); + + response.status = htobe32((uint32_t) viewer_attach_status); + ret = send_response(conn->sock, &response, sizeof(response)); if (ret < 0) { goto end_put_session; @@ -1443,7 +1565,13 @@ static int check_index_status(struct relay_viewer_stream *vstream, * Last index sent and session connection or relay * stream are closed. */ - index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + index->status = LTTNG_VIEWER_INDEX_HUP; + DBG("Check index status: Connection or stream are closed, stream %" PRIu64 + ",connection-closed=%d, relay-stream-closed=%d, returning status=%s", + vstream->stream->stream_handle, + trace->session->connection_closed, rstream->closed, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto hup; } else if (rstream->beacon_ts_end != -1ULL && (rstream->index_received_seqcount == 0 || @@ -1465,11 +1593,14 @@ static int check_index_status(struct relay_viewer_stream *vstream, * viewer_stream_sync_tracefile_array_tail) and skip over * packet sequence numbers. */ - index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); + index->status = LTTNG_VIEWER_INDEX_INACTIVE; index->timestamp_end = htobe64(rstream->beacon_ts_end); index->stream_id = htobe64(rstream->ctf_stream_id); - DBG("Check index status: inactive with beacon, for stream %" PRIu64, - vstream->stream->stream_handle); + DBG("Check index status: inactive with beacon, for stream %" PRIu64 + ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } else if (rstream->index_received_seqcount == 0 || (vstream->index_sent_seqcount != 0 && @@ -1486,9 +1617,13 @@ static int check_index_status(struct relay_viewer_stream *vstream, * viewer_stream_sync_tracefile_array_tail) and skip over * packet sequence numbers. */ - index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - DBG("Check index status: retry for stream %" PRIu64, - vstream->stream->stream_handle); + index->status = LTTNG_VIEWER_INDEX_RETRY; + DBG("Check index status:" + "did not received beacon for stream %" PRIu64 + ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } else if (!tracefile_array_seq_in_file(rstream->tfa, vstream->current_tracefile_id, @@ -1503,7 +1638,13 @@ static int check_index_status(struct relay_viewer_stream *vstream, ret = viewer_stream_rotate(vstream); if (ret == 1) { /* EOF across entire stream. */ - index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + index->status = LTTNG_VIEWER_INDEX_HUP; + DBG("Check index status:" + "reached end of file for stream %" PRIu64 + ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto hup; } /* @@ -1525,12 +1666,15 @@ static int check_index_status(struct relay_viewer_stream *vstream, rstream->tfa, vstream->current_tracefile_id, vstream->index_sent_seqcount)) { - index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - DBG("Check index status: retry: " + index->status = LTTNG_VIEWER_INDEX_RETRY; + DBG("Check index status:" "tracefile array sequence number %" PRIu64 - " not in file for stream %" PRIu64, + " not in file for stream %" PRIu64 + ", returning status=%s", vstream->index_sent_seqcount, - vstream->stream->stream_handle); + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa, @@ -1581,11 +1725,12 @@ int viewer_get_next_index(struct relay_connection *conn) struct relay_stream *rstream = NULL; struct ctf_trace *ctf_trace = NULL; struct relay_viewer_stream *metadata_viewer_stream = NULL; + bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; + uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; + enum lttng_trace_chunk_status status; LTTNG_ASSERT(conn); - DBG("Viewer get next index"); - memset(&viewer_index, 0, sizeof(viewer_index)); health_code_update(); @@ -1597,9 +1742,11 @@ int viewer_get_next_index(struct relay_connection *conn) vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id)); if (!vstream) { - DBG("Client requested index of unknown stream id %" PRIu64, - (uint64_t) be64toh(request_index.stream_id)); - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + DBG("Client requested index of unknown stream id %" PRIu64", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1611,25 +1758,44 @@ int viewer_get_next_index(struct relay_connection *conn) metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace); + /* + * Hold the session lock to protect against concurrent changes + * to the chunk files (e.g. rename done by clear), which are + * protected by the session ongoing rotation state. Those are + * synchronized with the session lock. + */ + pthread_mutex_lock(&rstream->trace->session->lock); pthread_mutex_lock(&rstream->lock); /* * The viewer should not ask for index on metadata stream. */ if (rstream->is_metadata) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + viewer_index.status = LTTNG_VIEWER_INDEX_HUP; + DBG("Client requested index of a metadata stream id %" PRIu64", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; + DBG("Client requested index for stream id %" PRIu64" while a stream rotation is ongoing, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } - if (rstream->trace->session->ongoing_rotation) { + if (session_has_ongoing_rotation(rstream->trace->session)) { /* Rotation is ongoing, try again later. */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; + DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1645,7 +1811,12 @@ int viewer_get_next_index(struct relay_connection *conn) conn->viewer_session, rstream->trace_chunk); if (ret) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error copying trace chunk for stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } } @@ -1661,12 +1832,50 @@ int viewer_get_next_index(struct relay_connection *conn) * This allows clients to consume all the packets of a trace chunk * after a session's destruction. */ - if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) && - !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { - DBG("Viewer session and viewer stream chunk IDs differ: " - "vsession chunk %p vstream chunk %p", + if (vstream->stream_file.trace_chunk) { + status = lttng_trace_chunk_get_id( + vstream->stream_file.trace_chunk, + &stream_file_chunk_id); + LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); + } + if (conn->viewer_session->current_trace_chunk) { + status = lttng_trace_chunk_get_id( conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + &viewer_session_chunk_id); + LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); + } + + viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + viewer_stream_one_rotation_behind = rstream->completed_rotation_count == + vstream->last_seen_rotation_count + 1; + + if (viewer_stream_and_session_in_same_chunk) { + DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) { + DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + } else { + DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); + viewer_stream_rotate_to_trace_chunk(vstream, conn->viewer_session->current_trace_chunk); vstream->last_seen_rotation_count = @@ -1690,15 +1899,30 @@ int viewer_get_next_index(struct relay_connection *conn) ret = try_open_index(vstream, rstream); if (ret == -ENOENT) { if (rstream->closed) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + viewer_index.status = LTTNG_VIEWER_INDEX_HUP; + DBG("Cannot open index for stream id %" PRIu64 + "stream is closed, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } else { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; + DBG("Cannot open index for stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } } if (ret < 0) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error opening index for stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1711,7 +1935,6 @@ int viewer_get_next_index(struct relay_connection *conn) */ if (!vstream->stream_file.handle) { char file_path[LTTNG_PATH_MAX]; - enum lttng_trace_chunk_status status; struct fs_handle *fs_handle; ret = utils_stream_file_path(rstream->path_name, @@ -1733,7 +1956,12 @@ int viewer_get_next_index(struct relay_connection *conn) if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && rstream->closed) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + viewer_index.status = LTTNG_VIEWER_INDEX_HUP; + DBG("Cannot find trace chunk file and stream is closed for stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } PERROR("Failed to open trace file for viewer stream"); @@ -1744,7 +1972,12 @@ int viewer_get_next_index(struct relay_connection *conn) ret = check_new_streams(conn); if (ret < 0) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error checking for new streams before sending new index to stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } else if (ret == 1) { viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; @@ -1752,11 +1985,20 @@ int viewer_get_next_index(struct relay_connection *conn) ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { - ERR("Relay error reading index file"); - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Relay error reading index file for stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } else { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK); + viewer_index.status = LTTNG_VIEWER_INDEX_OK; + DBG("Read index file for stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); vstream->index_sent_seqcount++; } @@ -1777,6 +2019,7 @@ int viewer_get_next_index(struct relay_connection *conn) send_reply: if (rstream) { pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); } if (metadata_viewer_stream) { @@ -1794,6 +2037,7 @@ send_reply: } viewer_index.flags = htobe32(viewer_index.flags); + viewer_index.status = htobe32(viewer_index.status); health_code_update(); ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index)); @@ -1818,6 +2062,7 @@ end: error_put: pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); if (metadata_viewer_stream) { viewer_stream_put(metadata_viewer_stream); } @@ -1843,8 +2088,7 @@ int viewer_get_packet(struct relay_connection *conn) uint32_t packet_data_len = 0; ssize_t read_len; uint64_t stream_id; - - DBG2("Relay get data packet"); + enum lttng_viewer_get_packet_return_code get_packet_status; health_code_update(); @@ -1861,9 +2105,10 @@ int viewer_get_packet(struct relay_connection *conn) vstream = viewer_stream_get_by_id(stream_id); if (!vstream) { - DBG("Client requested packet of unknown stream id %" PRIu64, - stream_id); - reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; + DBG("Client requested packet of unknown stream id %" PRIu64 + ", returning status=%s", stream_id, + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto send_reply_nolock; } else { packet_data_len = be32toh(get_packet_info.len); @@ -1872,8 +2117,9 @@ int viewer_get_packet(struct relay_connection *conn) reply = (char *) zmalloc(reply_size); if (!reply) { - PERROR("packet reply zmalloc"); - reply_size = sizeof(reply_header); + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; + PERROR("Falled to allocate reply, returning status=%s", + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } @@ -1881,29 +2127,31 @@ int viewer_get_packet(struct relay_connection *conn) lseek_ret = fs_handle_seek(vstream->stream_file.handle, be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to seek file system handle of viewer stream %" PRIu64 - " to offset %" PRIu64, - stream_id, - (uint64_t) be64toh(get_packet_info.offset)); + " to offset %" PRIu64", returning status=%s", stream_id, + (uint64_t) be64toh(get_packet_info.offset), + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } read_len = fs_handle_read(vstream->stream_file.handle, reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to read from file system handle of viewer stream id %" PRIu64 - ", offset: %" PRIu64, - stream_id, - (uint64_t) be64toh(get_packet_info.offset)); + ", offset: %" PRIu64 ", returning status=%s", stream_id, + (uint64_t) be64toh(get_packet_info.offset), + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } - reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); + + get_packet_status = LTTNG_VIEWER_GET_PACKET_OK; reply_header.len = htobe32(packet_data_len); goto send_reply; error: /* No payload to send on error. */ reply_size = sizeof(reply_header); - reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: if (vstream) { @@ -1913,6 +2161,7 @@ send_reply_nolock: health_code_update(); + reply_header.status = htobe32(get_packet_status); if (reply) { memcpy(reply, &reply_header, sizeof(reply_header)); ret = send_response(conn->sock, reply, reply_size); @@ -1958,8 +2207,6 @@ int viewer_get_metadata(struct relay_connection *conn) LTTNG_ASSERT(conn); - DBG("Relay get metadata"); - health_code_update(); ret = recv_request(conn->sock, &request, sizeof(request)); @@ -2006,11 +2253,18 @@ int viewer_get_metadata(struct relay_connection *conn) * an error. */ if (vstream->metadata_sent > 0) { - vstream->stream->no_new_metadata_notified = true; - if (vstream->stream->closed) { - /* Release ownership for the viewer metadata stream. */ + if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { + /* + * Release ownership for the viewer metadata + * stream. Note that this reference is the + * viewer's reference. The vstream still exists + * until the end of the function as + * viewer_stream_get_by_id() took a reference. + */ viewer_stream_put(vstream); } + + vstream->stream->no_new_metadata_notified = true; } goto send_reply; } @@ -2219,8 +2473,6 @@ int viewer_create_session(struct relay_connection *conn) int ret; struct lttng_viewer_create_session_response resp; - DBG("Viewer create session received"); - memset(&resp, 0, sizeof(resp)); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); conn->viewer_session = viewer_session_create(); @@ -2257,8 +2509,6 @@ int viewer_detach_session(struct relay_connection *conn) struct relay_session *session = NULL; uint64_t viewer_session_to_close; - DBG("Viewer detach session received"); - LTTNG_ASSERT(conn); health_code_update(); @@ -2337,21 +2587,24 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, struct relay_connection *conn) { int ret = 0; - uint32_t msg_value; - - msg_value = be32toh(recv_hdr->cmd); + lttng_viewer_command cmd = + (lttng_viewer_command) be32toh(recv_hdr->cmd); /* - * Make sure we've done the version check before any command other then a - * new client connection. + * Make sure we've done the version check before any command other then + * a new client connection. */ - if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) { - ERR("Viewer conn value %" PRIu32 " before version check", msg_value); + if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) { + ERR("Viewer on connection %d requested %s command before version check", + conn->sock->fd, lttng_viewer_command_str(cmd)); ret = -1; goto end; } - switch (msg_value) { + DBG("Processing %s viewer command from connection %d", + lttng_viewer_command_str(cmd), conn->sock->fd); + + switch (cmd) { case LTTNG_VIEWER_CONNECT: ret = viewer_connect(conn); break; @@ -2409,7 +2662,7 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) * This thread does the actual work */ static -void *thread_worker(void *data) +void *thread_worker(void *data __attribute__((unused))) { int ret, err = -1; uint32_t nb_fd;