X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=1683320188ef15bada61e84b4e2c7495acfc140e;hp=cf6d8ba2397178be15ac64da636bd7c9caec6b0c;hb=46f4eaa5c2a4e5f106ca105e2a93018e8fd4e477;hpb=715e6fb12b68e1749ee78607ad032acea6bbc2a5 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index cf6d8ba23..168332018 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -83,6 +83,14 @@ NULL #endif ; +enum relay_connection_status { + RELAY_CONNECTION_STATUS_OK, + /* An error occured while processing an event on the connection. */ + RELAY_CONNECTION_STATUS_ERROR, + /* Connection closed/shutdown cleanly. */ + RELAY_CONNECTION_STATUS_CLOSED, +}; + /* command line options */ char *opt_output_path; static int opt_daemon, opt_background; @@ -2242,8 +2250,12 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.timestamp_end = be64toh(index_info.timestamp_end); index_info.events_discarded = be64toh(index_info.events_discarded); index_info.stream_id = be64toh(index_info.stream_id); - index_info.stream_instance_id = be64toh(index_info.stream_instance_id); - index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + + if (conn->minor >= 8) { + index_info.stream_instance_id = + be64toh(index_info.stream_instance_id); + index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } stream = stream_get_by_id(index_info.relay_stream_id); if (!stream) { @@ -2945,9 +2957,11 @@ end: return ret; } -static int relay_process_control_receive_payload(struct relay_connection *conn) +static enum relay_connection_status relay_process_control_receive_payload( + struct relay_connection *conn) { int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct lttng_dynamic_buffer *reception_buffer = &conn->protocol.ctrl.reception_buffer; struct ctrl_connection_state_receive_payload *state = @@ -2963,11 +2977,15 @@ static int relay_process_control_receive_payload(struct relay_connection *conn) reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive command payload on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive command payload on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -2985,7 +3003,6 @@ static int relay_process_control_receive_payload(struct relay_connection *conn) DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3004,17 +3021,23 @@ reception_complete: ret = relay_process_control_command(conn, &state->header, &payload_view); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } ret = connection_reset_protocol_state(conn); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + } end: - return ret; + return status; } -static int relay_process_control_receive_header(struct relay_connection *conn) +static enum relay_connection_status relay_process_control_receive_header( + struct relay_connection *conn) { int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct lttcomm_relayd_hdr header; struct lttng_dynamic_buffer *reception_buffer = &conn->protocol.ctrl.reception_buffer; @@ -3027,11 +3050,15 @@ static int relay_process_control_receive_header(struct relay_connection *conn) reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive control command header on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive control command header on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -3049,7 +3076,6 @@ static int relay_process_control_receive_header(struct relay_connection *conn) DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3071,7 +3097,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn) if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", header.data_size); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3081,6 +3107,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn) ret = lttng_dynamic_buffer_set_size(reception_buffer, header.data_size); if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3089,32 +3116,33 @@ static int relay_process_control_receive_header(struct relay_connection *conn) * Manually invoke the next state as the poll loop * will not wake-up to allow us to proceed further. */ - ret = relay_process_control_receive_payload(conn); + status = relay_process_control_receive_payload(conn); } end: - return ret; + return status; } /* * Process the commands received on the control socket */ -static int relay_process_control(struct relay_connection *conn) +static enum relay_connection_status relay_process_control( + struct relay_connection *conn) { - int ret = 0; + enum relay_connection_status status; switch (conn->protocol.ctrl.state_id) { case CTRL_CONNECTION_STATE_RECEIVE_HEADER: - ret = relay_process_control_receive_header(conn); + status = relay_process_control_receive_header(conn); break; case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD: - ret = relay_process_control_receive_payload(conn); + status = relay_process_control_receive_payload(conn); break; default: ERR("Unknown control connection protocol state encountered."); abort(); } - return ret; + return status; } /* @@ -3186,9 +3214,11 @@ end: return ret; } -static int relay_process_data_receive_header(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_header( + struct relay_connection *conn) { int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct data_connection_state_receive_header *state = &conn->protocol.data.state.receive_header; struct lttcomm_relayd_data_hdr header; @@ -3200,12 +3230,15 @@ static int relay_process_data_receive_header(struct relay_connection *conn) state->header_reception_buffer + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive data header on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive data header on sock %d", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -3252,7 +3285,8 @@ static int relay_process_data_receive_header(struct relay_connection *conn) if (!stream) { DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, header.stream_id); - ret = 0; + /* Protocol error. */ + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3277,6 +3311,7 @@ static int relay_process_data_receive_header(struct relay_connection *conn) &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Failed to rotate stream output file"); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3293,12 +3328,14 @@ end_stream_unlock: pthread_mutex_unlock(&stream->lock); stream_put(stream); end: - return ret; + return status; } -static int relay_process_data_receive_payload(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_payload( + struct relay_connection *conn) { int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct relay_stream *stream; struct data_connection_state_receive_payload *state = &conn->protocol.data.state.receive_payload; @@ -3311,9 +3348,10 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) stream = stream_get_by_id(state->header.stream_id); if (!stream) { + /* Protocol error. */ DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, state->header.stream_id); - ret = 0; + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3337,13 +3375,16 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, MSG_DONTWAIT); if (ret < 0) { - ERR("Socket %d error %d", conn->sock->fd, ret); - ret = -1; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Socket %d error", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end_stream_unlock; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, state->header.stream_id); + status = RELAY_CONNECTION_STATUS_CLOSED; break; } else if (ret < (int) recv_size) { /* @@ -3360,7 +3401,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3380,16 +3421,16 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - ret = 0; goto end_stream_unlock; } ret = write_padding_to_file(stream->stream_fd->fd, state->header.padding_size); - if (ret < 0) { + if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3401,6 +3442,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } } @@ -3428,6 +3470,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ret = try_rotate_stream(stream); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3446,29 +3489,30 @@ end_stream_unlock: stream_put(stream); end: - return ret; + return status; } /* * relay_process_data: Process the data received on the data socket */ -static int relay_process_data(struct relay_connection *conn) +static enum relay_connection_status relay_process_data( + struct relay_connection *conn) { - int ret; + enum relay_connection_status status; switch (conn->protocol.data.state_id) { case DATA_CONNECTION_STATE_RECEIVE_HEADER: - ret = relay_process_data_receive_header(conn); + status = relay_process_data_receive_header(conn); break; case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD: - ret = relay_process_data_receive_payload(conn); + status = relay_process_data_receive_payload(conn); break; default: ERR("Unexpected data connection communication state."); abort(); } - return ret; + return status; } static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) @@ -3640,9 +3684,11 @@ restart: assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { - ret = relay_process_control(ctrl_conn); - if (ret < 0) { - /* Clear the connection on error. */ + enum relay_connection_status status; + + status = relay_process_control(ctrl_conn); + if (status != RELAY_CONNECTION_STATUS_OK) { + /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, ctrl_conn); @@ -3716,9 +3762,11 @@ restart: assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - ret = relay_process_data(data_conn); - /* Connection closed */ - if (ret < 0) { + enum relay_connection_status status; + + status = relay_process_data(data_conn); + /* Connection closed or error. */ + if (status != RELAY_CONNECTION_STATUS_OK) { relay_thread_close_connection(&events, pollfd, data_conn); /*