X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=5aa899b282605a0c058f7f497f9b500ad7e0226b;hb=0283afa961253376a5e18de140d29fffd9e89ae3;hp=b84a12f35b8d791a5ea204c36a3d2ff92ffa97ae;hpb=7e4fb89bb6323adc132f018184bb159fd8dd7727;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index b84a12f35..5aa899b28 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -138,6 +138,46 @@ const char *lttng_viewer_next_index_return_code_str( } } +static +const char *lttng_viewer_attach_return_code_str( + enum lttng_viewer_attach_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_ATTACH_OK: + return "ATTACH_OK"; + case LTTNG_VIEWER_ATTACH_ALREADY: + return "ATTACH_ALREADY"; + case LTTNG_VIEWER_ATTACH_UNK: + return "ATTACH_UNK"; + case LTTNG_VIEWER_ATTACH_NOT_LIVE: + return "ATTACH_NOT_LIVE"; + case LTTNG_VIEWER_ATTACH_SEEK_ERR: + return "ATTACH_SEEK_ERR"; + case LTTNG_VIEWER_ATTACH_NO_SESSION: + return "ATTACH_NO_SESSION"; + default: + abort(); + } +}; + +static +const char *lttng_viewer_get_packet_return_code_str( + enum lttng_viewer_get_packet_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_GET_PACKET_OK: + return "GET_PACKET_OK"; + case LTTNG_VIEWER_GET_PACKET_RETRY: + return "GET_PACKET_RETRY"; + case LTTNG_VIEWER_GET_PACKET_ERR: + return "GET_PACKET_ERR"; + case LTTNG_VIEWER_GET_PACKET_EOF: + return "GET_PACKET_EOF"; + default: + abort(); + } +}; + /* * Cleanup the daemon */ @@ -424,7 +464,7 @@ static int make_viewer_streams(struct relay_session *relay_session, * chunk can be used safely. */ if ((relay_stream->ongoing_rotation.is_set || - relay_session->ongoing_rotation) && + session_has_ongoing_rotation(relay_session)) && relay_stream->trace_chunk) { viewer_stream_trace_chunk = lttng_trace_chunk_copy( relay_stream->trace_chunk); @@ -615,7 +655,7 @@ end: } static -int close_sock(void *data, int *in_fd) +int close_sock(void *data, int *in_fd __attribute__((unused))) { struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -726,7 +766,7 @@ error: * This thread manages the listening for new connections on the network */ static -void *thread_listener(void *data) +void *thread_listener(void *data __attribute__((unused))) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; @@ -891,7 +931,7 @@ error_sock_control: * This thread manages the dispatching of the requests to worker threads */ static -void *thread_dispatcher(void *data) +void *thread_dispatcher(void *data __attribute__((unused))) { int err = -1; ssize_t ret; @@ -1216,7 +1256,7 @@ int viewer_get_new_streams(struct relay_connection *conn) * stream, because the chunk can be in an intermediate state * due to directory renaming. */ - if (session->ongoing_rotation) { + if (session_has_ongoing_rotation(session)) { DBG("Relay session %" PRIu64 " rotation ongoing", session_id); response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW); goto send_reply_unlock; @@ -1226,7 +1266,12 @@ int viewer_get_new_streams(struct relay_connection *conn) LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { - goto error_unlock_session; + /* + * This is caused by an internal error; propagate the negative + * 'ret' to close the connection. + */ + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); + goto send_reply_unlock; } send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1281,10 +1326,6 @@ end_put_session: } error: return ret; -error_unlock_session: - pthread_mutex_unlock(&session->lock); - session_put(session); - return ret; } /* @@ -1315,28 +1356,34 @@ int viewer_attach_session(struct relay_connection *conn) } session_id = be64toh(request.session_id); + health_code_update(); memset(&response, 0, sizeof(response)); if (!conn->viewer_session) { - DBG("Client trying to attach before creating a live viewer session"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION); + viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION; + DBG("Client trying to attach before creating a live viewer session, returning status=%s", + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } session = session_get_by_id(session_id); if (!session) { - DBG("Relay session %" PRIu64 " not found", session_id); - response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); + viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; + DBG("Relay session %" PRIu64 " not found, returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } - DBG("Attach session ID %" PRIu64 " received", session_id); + DBG("Attach relay session ID %" PRIu64 " received", session_id); pthread_mutex_lock(&session->lock); if (session->live_timer == 0) { - DBG("Not live session"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE); + viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE; + DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1344,19 +1391,23 @@ int viewer_attach_session(struct relay_connection *conn) viewer_attach_status = viewer_session_attach(conn->viewer_session, session); if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) { - response.status = htobe32(viewer_attach_status); + DBG("Error attaching to relay session %" PRIu64 ", returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } switch (be32toh(request.seek)) { case LTTNG_VIEWER_SEEK_BEGINNING: case LTTNG_VIEWER_SEEK_LAST: - response.status = htobe32(LTTNG_VIEWER_ATTACH_OK); + viewer_attach_status = LTTNG_VIEWER_ATTACH_OK; seek_type = (lttng_viewer_seek) be32toh(request.seek); break; default: - ERR("Wrong seek parameter"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR); + ERR("Wrong seek parameter for relay session %" PRIu64 + ", returning status=%s", session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); + viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR; send_streams = 0; goto send_reply; } @@ -1366,7 +1417,7 @@ int viewer_attach_session(struct relay_connection *conn) * stream, because the chunk can be in an intermediate state * due to directory renaming. */ - if (session->ongoing_rotation) { + if (session_has_ongoing_rotation(session)) { DBG("Relay session %" PRIu64 " rotation ongoing", session_id); send_streams = 0; goto send_reply; @@ -1392,12 +1443,18 @@ int viewer_attach_session(struct relay_connection *conn) if (closed) { send_streams = 0; response.streams_count = 0; - response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); + viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; + ERR("Session %" PRIu64 " is closed, returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } send_reply: health_code_update(); + + response.status = htobe32((uint32_t) viewer_attach_status); + ret = send_response(conn->sock, &response, sizeof(response)); if (ret < 0) { goto end_put_session; @@ -1732,7 +1789,7 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - if (rstream->trace->session->ongoing_rotation) { + if (session_has_ongoing_rotation(rstream->trace->session)) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s", @@ -2031,6 +2088,7 @@ int viewer_get_packet(struct relay_connection *conn) uint32_t packet_data_len = 0; ssize_t read_len; uint64_t stream_id; + enum lttng_viewer_get_packet_return_code get_packet_status; health_code_update(); @@ -2047,9 +2105,10 @@ int viewer_get_packet(struct relay_connection *conn) vstream = viewer_stream_get_by_id(stream_id); if (!vstream) { - DBG("Client requested packet of unknown stream id %" PRIu64, - stream_id); - reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; + DBG("Client requested packet of unknown stream id %" PRIu64 + ", returning status=%s", stream_id, + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto send_reply_nolock; } else { packet_data_len = be32toh(get_packet_info.len); @@ -2058,8 +2117,9 @@ int viewer_get_packet(struct relay_connection *conn) reply = (char *) zmalloc(reply_size); if (!reply) { - PERROR("packet reply zmalloc"); - reply_size = sizeof(reply_header); + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; + PERROR("Falled to allocate reply, returning status=%s", + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } @@ -2067,29 +2127,31 @@ int viewer_get_packet(struct relay_connection *conn) lseek_ret = fs_handle_seek(vstream->stream_file.handle, be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to seek file system handle of viewer stream %" PRIu64 - " to offset %" PRIu64, - stream_id, - (uint64_t) be64toh(get_packet_info.offset)); + " to offset %" PRIu64", returning status=%s", stream_id, + (uint64_t) be64toh(get_packet_info.offset), + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } read_len = fs_handle_read(vstream->stream_file.handle, reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { + get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to read from file system handle of viewer stream id %" PRIu64 - ", offset: %" PRIu64, - stream_id, - (uint64_t) be64toh(get_packet_info.offset)); + ", offset: %" PRIu64 ", returning status=%s", stream_id, + (uint64_t) be64toh(get_packet_info.offset), + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } - reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); + + get_packet_status = LTTNG_VIEWER_GET_PACKET_OK; reply_header.len = htobe32(packet_data_len); goto send_reply; error: /* No payload to send on error. */ reply_size = sizeof(reply_header); - reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: if (vstream) { @@ -2099,6 +2161,7 @@ send_reply_nolock: health_code_update(); + reply_header.status = htobe32(get_packet_status); if (reply) { memcpy(reply, &reply_header, sizeof(reply_header)); ret = send_response(conn->sock, reply, reply_size); @@ -2599,7 +2662,7 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) * This thread does the actual work */ static -void *thread_worker(void *data) +void *thread_worker(void *data __attribute__((unused))) { int ret, err = -1; uint32_t nb_fd;