From 5569b118e687de5c6179b3a432187f06c1277608 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 27 Apr 2018 16:47:04 -0400 Subject: [PATCH] Propagate whether a connection was closed cleanly or after an error MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This allows a follow-up fix that requires this distinction to decide whether a session must be closed or aborted. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/main.c | 124 ++++++++++++++++++++++++------------ 1 file changed, 84 insertions(+), 40 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index a31287380..09a73e392 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -83,6 +83,14 @@ NULL #endif ; +enum relay_connection_status { + RELAY_CONNECTION_STATUS_OK, + /* An error occured while processing an event on the connection. */ + RELAY_CONNECTION_STATUS_ERROR, + /* Connection closed/shutdown cleanly. */ + RELAY_CONNECTION_STATUS_CLOSED, +}; + /* command line options */ char *opt_output_path; static int opt_daemon, opt_background; @@ -2949,9 +2957,11 @@ end: return ret; } -static int relay_process_control_receive_payload(struct relay_connection *conn) +static enum relay_connection_status relay_process_control_receive_payload( + struct relay_connection *conn) { int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct lttng_dynamic_buffer *reception_buffer = &conn->protocol.ctrl.reception_buffer; struct ctrl_connection_state_receive_payload *state = @@ -2967,11 +2977,15 @@ static int relay_process_control_receive_payload(struct relay_connection *conn) reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive command payload on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive command payload on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -2989,7 +3003,6 @@ static int relay_process_control_receive_payload(struct relay_connection *conn) DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3008,17 +3021,23 @@ reception_complete: ret = relay_process_control_command(conn, &state->header, &payload_view); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } ret = connection_reset_protocol_state(conn); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + } end: - return ret; + return status; } -static int relay_process_control_receive_header(struct relay_connection *conn) +static enum relay_connection_status relay_process_control_receive_header( + struct relay_connection *conn) { int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct lttcomm_relayd_hdr header; struct lttng_dynamic_buffer *reception_buffer = &conn->protocol.ctrl.reception_buffer; @@ -3031,11 +3050,15 @@ static int relay_process_control_receive_header(struct relay_connection *conn) reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive control command header on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive control command header on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -3053,7 +3076,6 @@ static int relay_process_control_receive_header(struct relay_connection *conn) DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3075,7 +3097,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn) if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", header.data_size); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3085,6 +3107,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn) ret = lttng_dynamic_buffer_set_size(reception_buffer, header.data_size); if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3093,32 +3116,33 @@ static int relay_process_control_receive_header(struct relay_connection *conn) * Manually invoke the next state as the poll loop * will not wake-up to allow us to proceed further. */ - ret = relay_process_control_receive_payload(conn); + status = relay_process_control_receive_payload(conn); } end: - return ret; + return status; } /* * Process the commands received on the control socket */ -static int relay_process_control(struct relay_connection *conn) +static enum relay_connection_status relay_process_control( + struct relay_connection *conn) { - int ret = 0; + enum relay_connection_status status; switch (conn->protocol.ctrl.state_id) { case CTRL_CONNECTION_STATE_RECEIVE_HEADER: - ret = relay_process_control_receive_header(conn); + status = relay_process_control_receive_header(conn); break; case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD: - ret = relay_process_control_receive_payload(conn); + status = relay_process_control_receive_payload(conn); break; default: ERR("Unknown control connection protocol state encountered."); abort(); } - return ret; + return status; } /* @@ -3190,9 +3214,11 @@ end: return ret; } -static int relay_process_data_receive_header(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_header( + struct relay_connection *conn) { int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct data_connection_state_receive_header *state = &conn->protocol.data.state.receive_header; struct lttcomm_relayd_data_hdr header; @@ -3204,12 +3230,15 @@ static int relay_process_data_receive_header(struct relay_connection *conn) state->header_reception_buffer + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive data header on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive data header on sock %d", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -3256,7 +3285,8 @@ static int relay_process_data_receive_header(struct relay_connection *conn) if (!stream) { DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, header.stream_id); - ret = 0; + /* Protocol error. */ + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3281,6 +3311,7 @@ static int relay_process_data_receive_header(struct relay_connection *conn) &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Failed to rotate stream output file"); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3297,12 +3328,14 @@ end_stream_unlock: pthread_mutex_unlock(&stream->lock); stream_put(stream); end: - return ret; + return status; } -static int relay_process_data_receive_payload(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_payload( + struct relay_connection *conn) { int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct relay_stream *stream; struct data_connection_state_receive_payload *state = &conn->protocol.data.state.receive_payload; @@ -3315,9 +3348,10 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) stream = stream_get_by_id(state->header.stream_id); if (!stream) { + /* Protocol error. */ DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, state->header.stream_id); - ret = 0; + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3341,13 +3375,16 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, MSG_DONTWAIT); if (ret < 0) { - ERR("Socket %d error %d", conn->sock->fd, ret); - ret = -1; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Socket %d error", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end_stream_unlock; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, state->header.stream_id); + status = RELAY_CONNECTION_STATUS_CLOSED; break; } else if (ret < (int) recv_size) { /* @@ -3364,7 +3401,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3384,7 +3421,6 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - ret = 0; goto end_stream_unlock; } @@ -3394,6 +3430,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3405,6 +3442,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } } @@ -3432,6 +3470,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ret = try_rotate_stream(stream); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3450,29 +3489,30 @@ end_stream_unlock: stream_put(stream); end: - return ret; + return status; } /* * relay_process_data: Process the data received on the data socket */ -static int relay_process_data(struct relay_connection *conn) +static enum relay_connection_status relay_process_data( + struct relay_connection *conn) { - int ret; + enum relay_connection_status status; switch (conn->protocol.data.state_id) { case DATA_CONNECTION_STATE_RECEIVE_HEADER: - ret = relay_process_data_receive_header(conn); + status = relay_process_data_receive_header(conn); break; case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD: - ret = relay_process_data_receive_payload(conn); + status = relay_process_data_receive_payload(conn); break; default: ERR("Unexpected data connection communication state."); abort(); } - return ret; + return status; } static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) @@ -3644,9 +3684,11 @@ restart: assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { - ret = relay_process_control(ctrl_conn); - if (ret < 0) { - /* Clear the connection on error. */ + enum relay_connection_status status; + + status = relay_process_control(ctrl_conn); + if (status != RELAY_CONNECTION_STATUS_OK) { + /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, ctrl_conn); @@ -3720,9 +3762,11 @@ restart: assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - ret = relay_process_data(data_conn); - /* Connection closed */ - if (ret < 0) { + enum relay_connection_status status; + + status = relay_process_data(data_conn); + /* Connection closed or error. */ + if (status != RELAY_CONNECTION_STATUS_OK) { relay_thread_close_connection(&events, pollfd, data_conn); /* -- 2.34.1