X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=1e2e9050fef2d6d77ca78e9767a2fea759ba99be;hp=a57b1d4b063e2d02f200fcd07f860276b1d6ae64;hb=c6db3843828a8fbf08444a2bc4191291a4807936;hpb=e2acc8d25bc0073554a2f3e742e070b7034ec1d0 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index a57b1d4b0..1e2e9050f 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -56,6 +56,8 @@ #include #include #include +#include +#include #include #include "cmd.h" @@ -81,6 +83,14 @@ NULL #endif ; +enum relay_connection_status { + RELAY_CONNECTION_STATUS_OK, + /* An error occured while processing an event on the connection. */ + RELAY_CONNECTION_STATUS_ERROR, + /* Connection closed/shutdown cleanly. */ + RELAY_CONNECTION_STATUS_CLOSED, +}; + /* command line options */ char *opt_output_path; static int opt_daemon, opt_background; @@ -145,10 +155,6 @@ static uint64_t last_relay_stream_id; */ static struct relay_conn_queue relay_conn_queue; -/* buffer allocated at startup, used to store the trace data */ -static char *data_buffer; -static unsigned int data_buffer_size; - /* Global relay stream hash table. */ struct lttng_ht *relay_streams_ht; @@ -751,6 +757,7 @@ static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) ret = sock->ops->bind(sock); if (ret < 0) { + PERROR("Failed to bind socket"); goto error; } @@ -1059,35 +1066,39 @@ static int set_index_control_data(struct relay_index *index, struct ctf_packet_index index_data; /* - * The index on disk is encoded in big endian, so we don't need - * to convert the data received on the network. The data_offset - * value is NEVER modified here and is updated by the data - * thread. + * The index on disk is encoded in big endian. */ - index_data.packet_size = data->packet_size; - index_data.content_size = data->content_size; - index_data.timestamp_begin = data->timestamp_begin; - index_data.timestamp_end = data->timestamp_end; - index_data.events_discarded = data->events_discarded; - index_data.stream_id = data->stream_id; + index_data.packet_size = htobe64(data->packet_size); + index_data.content_size = htobe64(data->content_size); + index_data.timestamp_begin = htobe64(data->timestamp_begin); + index_data.timestamp_end = htobe64(data->timestamp_end); + index_data.events_discarded = htobe64(data->events_discarded); + index_data.stream_id = htobe64(data->stream_id); if (conn->minor >= 8) { - index->index_data.stream_instance_id = data->stream_instance_id; - index->index_data.packet_seq_num = data->packet_seq_num; + index->index_data.stream_instance_id = htobe64(data->stream_instance_id); + index->index_data.packet_seq_num = htobe64(data->packet_seq_num); } return relay_index_set_data(index, &index_data); } +static bool session_streams_have_index(const struct relay_session *session) +{ + return session->minor >= 4 && !session->snapshot; +} + /* * Handle the RELAYD_CREATE_SESSION command. * * On success, send back the session id or else return a negative value. */ -static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret = 0, send_ret; + int ret = 0; + ssize_t send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; char session_name[LTTNG_NAME_MAX]; @@ -1100,16 +1111,19 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, memset(&reply, 0, sizeof(reply)); - switch (conn->minor) { - case 1: - case 2: - case 3: - break; - case 4: /* LTTng sessiond 2.4 */ - default: - ret = cmd_create_session_2_4(conn, session_name, + if (conn->minor < 4) { + /* From 2.1 to 2.3 */ + ret = 0; + } else if (conn->minor >= 4 && conn->minor < 11) { + /* From 2.4 to 2.10 */ + ret = cmd_create_session_2_4(payload, session_name, + hostname, &live_timer, &snapshot); + } else { + /* From 2.11 to ... */ + ret = cmd_create_session_2_11(payload, session_name, hostname, &live_timer, &snapshot); } + if (ret < 0) { goto send_reply; } @@ -1134,9 +1148,10 @@ send_reply: } send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (send_ret < 0) { - ERR("Relayd sending session id"); - ret = send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create session\" command reply (ret = %zd)", + send_ret); + ret = -1; } return ret; @@ -1175,8 +1190,9 @@ static void publish_connection_local_streams(struct relay_connection *conn) /* * relay_add_stream: allocate a new stream for a session */ -static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; ssize_t send_ret; @@ -1187,24 +1203,30 @@ static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, uint64_t stream_handle = -1ULL; char *path_name = NULL, *channel_name = NULL; uint64_t tracefile_size = 0, tracefile_count = 0; + struct relay_stream_chunk_id stream_chunk_id = { 0 }; - if (!session || conn->version_check_done == 0) { + if (!session || !conn->version_check_done) { ERR("Trying to add a stream before version check"); ret = -1; goto end_no_session; } - switch (session->minor) { - case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */ - ret = cmd_recv_stream_2_1(conn, &path_name, + if (session->minor == 1) { + /* For 2.1 */ + ret = cmd_recv_stream_2_1(payload, &path_name, &channel_name); - break; - case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */ - default: - ret = cmd_recv_stream_2_2(conn, &path_name, + } else if (session->minor > 1 && session->minor < 11) { + /* From 2.2 to 2.10 */ + ret = cmd_recv_stream_2_2(payload, &path_name, &channel_name, &tracefile_size, &tracefile_count); - break; + } else { + /* From 2.11 to ... */ + ret = cmd_recv_stream_2_11(payload, &path_name, + &channel_name, &tracefile_size, &tracefile_count, + &stream_chunk_id.value); + stream_chunk_id.is_set = true; } + if (ret < 0) { goto send_reply; } @@ -1221,7 +1243,8 @@ static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, /* We pass ownership of path_name and channel_name. */ stream = stream_create(trace, stream_handle, path_name, - channel_name, tracefile_size, tracefile_count); + channel_name, tracefile_size, tracefile_count, + &stream_chunk_id); path_name = NULL; channel_name = NULL; @@ -1242,9 +1265,10 @@ send_reply: send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(struct lttcomm_relayd_status_stream), 0); - if (send_ret < 0) { - ERR("Relay sending stream id"); - ret = (int) send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"add stream\" command reply (ret = %zd)", + send_ret); + ret = -1; } end_no_session: @@ -1256,10 +1280,12 @@ end_no_session: /* * relay_close_stream: close a specific stream */ -static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret, send_ret; + int ret; + ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_close_stream stream_info; struct lttcomm_relayd_generic_reply reply; @@ -1267,26 +1293,23 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, DBG("Close stream received"); - if (!session || conn->version_check_done == 0) { + if (!session || !conn->version_check_done) { ERR("Trying to close a stream before version check"); ret = -1; goto end_no_session; } - ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, - sizeof(struct lttcomm_relayd_close_stream), 0); - if (ret < sizeof(struct lttcomm_relayd_close_stream)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid add_stream struct size : %d", ret); - } + if (payload->size < sizeof(stream_info)) { + ERR("Unexpected payload size in \"relay_close_stream\": expected >= %zu bytes, got %zu bytes", + sizeof(stream_info), payload->size); ret = -1; goto end_no_session; } + memcpy(&stream_info, payload->data, sizeof(stream_info)); + stream_info.stream_id = be64toh(stream_info.stream_id); + stream_info.last_net_seq_num = be64toh(stream_info.last_net_seq_num); - stream = stream_get_by_id(be64toh(stream_info.stream_id)); + stream = stream_get_by_id(stream_info.stream_id); if (!stream) { ret = -1; goto end; @@ -1297,7 +1320,7 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, * pending check. */ pthread_mutex_lock(&stream->lock); - stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + stream->last_net_seq_num = stream_info.last_net_seq_num; pthread_mutex_unlock(&stream->lock); /* @@ -1328,6 +1351,7 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, } } stream_put(stream); + ret = 0; end: memset(&reply, 0, sizeof(reply)); @@ -1338,9 +1362,10 @@ end: } send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(struct lttcomm_relayd_generic_reply), 0); - if (send_ret < 0) { - ERR("Relay sending stream id"); - ret = send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"close stream\" command reply (ret = %zd)", + send_ret); + ret = -1; } end_no_session: @@ -1351,10 +1376,12 @@ end_no_session: * relay_reset_metadata: reset a metadata stream */ static -int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret, send_ret; + int ret; + ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_reset_metadata stream_info; struct lttcomm_relayd_generic_reply reply; @@ -1362,26 +1389,23 @@ int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr, DBG("Reset metadata received"); - if (!session || conn->version_check_done == 0) { + if (!session || !conn->version_check_done) { ERR("Trying to reset a metadata stream before version check"); ret = -1; goto end_no_session; } - ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, - sizeof(struct lttcomm_relayd_reset_metadata), 0); - if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid reset_metadata struct " - "size : %d", ret); - } + if (payload->size < sizeof(stream_info)) { + ERR("Unexpected payload size in \"relay_reset_metadata\": expected >= %zu bytes, got %zu bytes", + sizeof(stream_info), payload->size); ret = -1; goto end_no_session; } - DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version)); + memcpy(&stream_info, payload->data, sizeof(stream_info)); + stream_info.stream_id = be64toh(stream_info.stream_id); + stream_info.version = be64toh(stream_info.version); + + DBG("Update metadata to version %" PRIu64, stream_info.version); /* Unsupported for live sessions for now. */ if (session->live_timer != 0) { @@ -1389,7 +1413,7 @@ int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - stream = stream_get_by_id(be64toh(stream_info.stream_id)); + stream = stream_get_by_id(stream_info.stream_id); if (!stream) { ret = -1; goto end; @@ -1422,9 +1446,10 @@ end: } send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(struct lttcomm_relayd_generic_reply), 0); - if (send_ret < 0) { - ERR("Relay sending reset metadata reply"); - ret = send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"reset metadata\" command reply (ret = %zd)", + send_ret); + ret = -1; } end_no_session: @@ -1437,14 +1462,13 @@ end_no_session: static void relay_unknown_command(struct relay_connection *conn) { struct lttcomm_relayd_generic_reply reply; - int ret; + ssize_t send_ret; memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_ERR_UNK); - ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(struct lttcomm_relayd_generic_reply), 0); - if (ret < 0) { - ERR("Relay sending unknown command"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (send_ret < sizeof(reply)) { + ERR("Failed to send \"unknown command\" command reply (ret = %zd)", send_ret); } } @@ -1452,10 +1476,12 @@ static void relay_unknown_command(struct relay_connection *conn) * relay_start: send an acknowledgment to the client to tell if we are * ready to receive data. We are ready if a session is established. */ -static int relay_start(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_start(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret = htobe32(LTTNG_OK); + int ret = 0; + ssize_t send_ret; struct lttcomm_relayd_generic_reply reply; struct relay_session *session = conn->session; @@ -1465,11 +1491,13 @@ static int relay_start(struct lttcomm_relayd_hdr *recv_hdr, } memset(&reply, 0, sizeof(reply)); - reply.ret_code = ret; - ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(struct lttcomm_relayd_generic_reply), 0); - if (ret < 0) { - ERR("Relay sending start ack"); + reply.ret_code = htobe32(LTTNG_OK); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"relay_start\" command reply (ret = %zd)", + send_ret); + ret = -1; } return ret; @@ -1511,7 +1539,8 @@ end: * Return 0 on success, -1 on error. */ static -int create_rotate_index_file(struct relay_stream *stream) +int create_rotate_index_file(struct relay_stream *stream, + const char *stream_path) { int ret; uint32_t major, minor; @@ -1523,7 +1552,7 @@ int create_rotate_index_file(struct relay_stream *stream) } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, + stream->index_file = lttng_index_file_create(stream_path, stream->channel_name, -1, -1, stream->tracefile_size, tracefile_array_get_file_index_head(stream->tfa), @@ -1541,10 +1570,12 @@ end: } static -int do_rotate_stream(struct relay_stream *stream) +int do_rotate_stream_data(struct relay_stream *stream) { int ret; + DBG("Rotating stream %" PRIu64 " data file", + stream->stream_handle); /* Perform the stream rotation. */ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, @@ -1556,19 +1587,17 @@ int do_rotate_stream(struct relay_stream *stream) goto end; } stream->tracefile_size_current = 0; - - /* Rotate also the index if the stream is not a metadata stream. */ - if (!stream->is_metadata) { - ret = create_rotate_index_file(stream); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end; - } - } - - stream->rotate_at_seq_num = -1ULL; stream->pos_after_last_complete_data_index = 0; + stream->data_rotated = true; + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } end: return ret; } @@ -1580,14 +1609,13 @@ end: * connections are separate, the indexes as well as the commands arrive from * the control connection and we have no control over the order so we could be * in a situation where too much data has been received on the data connection - * before the rotation command on the control connection arrives. We don't need - * to update the index because its order is guaranteed with the rotation - * command message. + * before the rotation command on the control connection arrives. */ static int rotate_truncate_stream(struct relay_stream *stream) { int ret, new_fd; + off_t lseek_ret; uint64_t diff, pos = 0; char buf[FILE_COPY_BUFFER_SIZE]; @@ -1614,10 +1642,11 @@ int rotate_truncate_stream(struct relay_stream *stream) * Rewind the current tracefile to the position at which the rotation * should have occured. */ - ret = lseek(stream->stream_fd->fd, + lseek_ret = lseek(stream->stream_fd->fd, stream->pos_after_last_complete_data_index, SEEK_SET); - if (ret < 0) { + if (lseek_ret < 0) { PERROR("seek truncate stream"); + ret = -1; goto end; } @@ -1679,12 +1708,6 @@ int rotate_truncate_stream(struct relay_stream *stream) goto end; } - ret = create_rotate_index_file(stream); - if (ret < 0) { - ERR("Rotate stream index file"); - goto end; - } - /* * Update the offset and FD of all the eventual indexes created by the * data connection before the rotation command arrived. @@ -1707,26 +1730,100 @@ end: } /* - * Check if a stream should perform a rotation (for session rotation). + * Check if a stream's index file should be rotated (for session rotation). * Must be called with the stream lock held. * * Return 0 on success, a negative value on error. */ static -int try_rotate_stream(struct relay_stream *stream) +int try_rotate_stream_index(struct relay_stream *stream) { int ret = 0; - /* No rotation expected. */ if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ goto end; } - if (stream->prev_seq < stream->rotate_at_seq_num) { - DBG("Stream %" PRIu64 " no yet ready for rotation", + if (stream->index_rotated) { + /* Rotation of the index has already occurred. */ + goto end; + } + + if (stream->prev_index_seq == -1ULL || + stream->prev_index_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_index_seq); + goto end; + } else if (stream->prev_index_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq, + stream->prev_index_seq); + ret = -1; + goto end; + } else { + DBG("Rotating stream %" PRIu64 " index file", stream->stream_handle); + ret = create_rotate_index_file(stream, stream->path_name); + stream->index_rotated = true; + + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } + } + +end: + return ret; +} + +/* + * Check if a stream's data file (as opposed to index) should be rotated + * (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int try_rotate_stream_data(struct relay_stream *stream) +{ + int ret = 0; + + if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ + goto end; + } + + if (stream->data_rotated) { + /* Rotation of the data file has already occurred. */ goto end; - } else if (stream->prev_seq > stream->rotate_at_seq_num) { + } + + if (stream->prev_data_seq == -1ULL || + stream->prev_data_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); + goto end; + } else if (stream->prev_data_seq > stream->rotate_at_seq_num) { + /* + * prev_data_seq is checked here since indexes and rotation + * commands are serialized with respect to each other. + */ DBG("Rotation after too much data has been written in tracefile " "for stream %" PRIu64 ", need to truncate before " "rotating", stream->stream_handle); @@ -1735,11 +1832,20 @@ int try_rotate_stream(struct relay_stream *stream) ERR("Failed to truncate stream"); goto end; } + } else if (stream->prev_data_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); + ret = -1; + goto end; } else { - /* stream->prev_seq == stream->rotate_at_seq_num */ - DBG("Stream %" PRIu64 " ready for rotation", - stream->stream_handle); - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } end: @@ -1749,15 +1855,16 @@ end: /* * relay_recv_metadata: receive the metadata for the session. */ -static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; - struct lttcomm_relayd_metadata_payload *metadata_struct; + struct lttcomm_relayd_metadata_payload metadata_payload_header; struct relay_stream *metadata_stream; - uint64_t data_size, payload_size; + uint64_t metadata_payload_size; if (!session) { ERR("Metadata sent before version check"); @@ -1765,44 +1872,22 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - data_size = payload_size = be64toh(recv_hdr->data_size); - if (data_size < sizeof(struct lttcomm_relayd_metadata_payload)) { + if (recv_hdr->data_size < sizeof(struct lttcomm_relayd_metadata_payload)) { ERR("Incorrect data size"); ret = -1; goto end; } - payload_size -= sizeof(struct lttcomm_relayd_metadata_payload); + metadata_payload_size = recv_hdr->data_size - + sizeof(struct lttcomm_relayd_metadata_payload); - if (data_buffer_size < data_size) { - /* In case the realloc fails, we can free the memory */ - char *tmp_data_ptr; + memcpy(&metadata_payload_header, payload->data, + sizeof(metadata_payload_header)); + metadata_payload_header.stream_id = be64toh( + metadata_payload_header.stream_id); + metadata_payload_header.padding_size = be32toh( + metadata_payload_header.padding_size); - tmp_data_ptr = realloc(data_buffer, data_size); - if (!tmp_data_ptr) { - ERR("Allocating data buffer"); - free(data_buffer); - ret = -1; - goto end; - } - data_buffer = tmp_data_ptr; - data_buffer_size = data_size; - } - memset(data_buffer, 0, data_size); - DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (size_ret < 0 || size_ret != data_size) { - if (size_ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive the whole metadata"); - } - ret = -1; - goto end; - } - metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; - - metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id)); + metadata_stream = stream_get_by_id(metadata_payload_header.stream_id); if (!metadata_stream) { ret = -1; goto end; @@ -1810,26 +1895,28 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&metadata_stream->lock); - size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload, - payload_size); - if (size_ret < payload_size) { + size_ret = lttng_write(metadata_stream->stream_fd->fd, + payload->data + sizeof(metadata_payload_header), + metadata_payload_size); + if (size_ret < metadata_payload_size) { ERR("Relay error writing metadata on file"); ret = -1; goto end_put; } size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, - be32toh(metadata_struct->padding_size)); - if (size_ret < 0) { + metadata_payload_header.padding_size); + if (size_ret < (int64_t) metadata_payload_header.padding_size) { + ret = -1; goto end_put; } metadata_stream->metadata_received += - payload_size + be32toh(metadata_struct->padding_size); + metadata_payload_size + metadata_payload_header.padding_size; DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); - ret = try_rotate_stream(metadata_stream); + ret = try_rotate_stream_data(metadata_stream); if (ret < 0) { goto end_put; } @@ -1844,53 +1931,59 @@ end: /* * relay_send_version: send relayd version number */ -static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_send_version(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; + ssize_t send_ret; struct lttcomm_relayd_version reply, msg; bool compatible = true; - conn->version_check_done = 1; + conn->version_check_done = true; /* Get version from the other side. */ - ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); - if (ret < 0 || ret != sizeof(msg)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay failed to receive the version values."); - } + if (payload->size < sizeof(msg)) { + ERR("Unexpected payload size in \"relay_send_version\": expected >= %zu bytes, got %zu bytes", + sizeof(msg), payload->size); ret = -1; goto end; } + memcpy(&msg, payload->data, sizeof(msg)); + msg.major = be32toh(msg.major); + msg.minor = be32toh(msg.minor); + memset(&reply, 0, sizeof(reply)); reply.major = RELAYD_VERSION_COMM_MAJOR; reply.minor = RELAYD_VERSION_COMM_MINOR; /* Major versions must be the same */ - if (reply.major != be32toh(msg.major)) { + if (reply.major != msg.major) { DBG("Incompatible major versions (%u vs %u), deleting session", - reply.major, be32toh(msg.major)); + reply.major, msg.major); compatible = false; } conn->major = reply.major; /* We adapt to the lowest compatible version */ - if (reply.minor <= be32toh(msg.minor)) { + if (reply.minor <= msg.minor) { conn->minor = reply.minor; } else { - conn->minor = be32toh(msg.minor); + conn->minor = msg.minor; } reply.major = htobe32(reply.major); reply.minor = htobe32(reply.minor); - ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(struct lttcomm_relayd_version), 0); - if (ret < 0) { - ERR("Relay sending version"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"send version\" command reply (ret = %zd)", + send_ret); + ret = -1; + goto end; + } else { + ret = 0; } if (!compatible) { @@ -1908,41 +2001,37 @@ end: /* * Check for data pending for a given stream id from the session daemon. */ -static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { struct relay_session *session = conn->session; struct lttcomm_relayd_data_pending msg; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; + ssize_t send_ret; int ret; - uint64_t last_net_seq_num, stream_id; + uint64_t stream_seq; DBG("Data pending command received"); - if (!session || conn->version_check_done == 0) { + if (!session || !conn->version_check_done) { ERR("Trying to check for data before version check"); ret = -1; goto end_no_session; } - ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); - if (ret < sizeof(msg)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid data_pending struct size : %d", - ret); - } + if (payload->size < sizeof(msg)) { + ERR("Unexpected payload size in \"relay_data_pending\": expected >= %zu bytes, got %zu bytes", + sizeof(msg), payload->size); ret = -1; goto end_no_session; } + memcpy(&msg, payload->data, sizeof(msg)); + msg.stream_id = be64toh(msg.stream_id); + msg.last_net_seq_num = be64toh(msg.last_net_seq_num); - stream_id = be64toh(msg.stream_id); - last_net_seq_num = be64toh(msg.last_net_seq_num); - - stream = stream_get_by_id(stream_id); + stream = stream_get_by_id(msg.stream_id); if (stream == NULL) { ret = -1; goto end; @@ -1950,12 +2039,23 @@ static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&stream->lock); - DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 - " and last_seq %" PRIu64, stream_id, stream->prev_seq, - last_net_seq_num); + if (session_streams_have_index(session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64 + ", prev_index_seq %" PRIu64 + ", and last_seq %" PRIu64, msg.stream_id, + stream->prev_data_seq, stream->prev_index_seq, + msg.last_net_seq_num); /* Avoid wrapping issue */ - if (((int64_t) (stream->prev_seq - last_net_seq_num)) >= 0) { + if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) { /* Data has in fact been written and is NOT pending */ ret = 0; } else { @@ -1971,9 +2071,11 @@ end: memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(ret); - ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (ret < 0) { - ERR("Relay data pending ret code failed"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"data pending\" command reply (ret = %zd)", + send_ret); + ret = -1; } end_no_session: @@ -1988,52 +2090,53 @@ end_no_session: * the control socket has been handled. So, this is why we simply return * OK here. */ -static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_quiescent_control(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; - uint64_t stream_id; + ssize_t send_ret; struct relay_stream *stream; struct lttcomm_relayd_quiescent_control msg; struct lttcomm_relayd_generic_reply reply; DBG("Checking quiescent state on control socket"); - if (!conn->session || conn->version_check_done == 0) { + if (!conn->session || !conn->version_check_done) { ERR("Trying to check for data before version check"); ret = -1; goto end_no_session; } - ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); - if (ret < sizeof(msg)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid begin data_pending struct size: %d", - ret); - } + if (payload->size < sizeof(msg)) { + ERR("Unexpected payload size in \"relay_quiescent_control\": expected >= %zu bytes, got %zu bytes", + sizeof(msg), payload->size); ret = -1; goto end_no_session; } + memcpy(&msg, payload->data, sizeof(msg)); + msg.stream_id = be64toh(msg.stream_id); - stream_id = be64toh(msg.stream_id); - stream = stream_get_by_id(stream_id); + stream = stream_get_by_id(msg.stream_id); if (!stream) { goto reply; } pthread_mutex_lock(&stream->lock); stream->data_pending_check_done = true; pthread_mutex_unlock(&stream->lock); - DBG("Relay quiescent control pending flag set to %" PRIu64, stream_id); + + DBG("Relay quiescent control pending flag set to %" PRIu64, msg.stream_id); stream_put(stream); reply: memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_OK); - ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (ret < 0) { - ERR("Relay data quiescent control ret code failed"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"quiescent control\" command reply (ret = %zd)", + send_ret); + ret = -1; + } else { + ret = 0; } end_no_session: @@ -2047,41 +2150,36 @@ end_no_session: * * This command returns to the client a LTTNG_OK code. */ -static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; + ssize_t send_ret; struct lttng_ht_iter iter; struct lttcomm_relayd_begin_data_pending msg; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - uint64_t session_id; assert(recv_hdr); assert(conn); DBG("Init streams for data pending"); - if (!conn->session || conn->version_check_done == 0) { + if (!conn->session || !conn->version_check_done) { ERR("Trying to check for data before version check"); ret = -1; goto end_no_session; } - ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); - if (ret < sizeof(msg)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid begin data_pending struct size: %d", - ret); - } + if (payload->size < sizeof(msg)) { + ERR("Unexpected payload size in \"relay_begin_data_pending\": expected >= %zu bytes, got %zu bytes", + sizeof(msg), payload->size); ret = -1; goto end_no_session; } - - session_id = be64toh(msg.session_id); + memcpy(&msg, payload->data, sizeof(msg)); + msg.session_id = be64toh(msg.session_id); /* * Iterate over all streams to set the begin data pending flag. @@ -2095,7 +2193,7 @@ static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, if (!stream_get(stream)) { continue; } - if (stream->trace->session->id == session_id) { + if (stream->trace->session->id == msg.session_id) { pthread_mutex_lock(&stream->lock); stream->data_pending_check_done = false; pthread_mutex_unlock(&stream->lock); @@ -2110,9 +2208,13 @@ static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, /* All good, send back reply. */ reply.ret_code = htobe32(LTTNG_OK); - ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (ret < 0) { - ERR("Relay begin data pending send reply failed"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"begin data pending\" command reply (ret = %zd)", + send_ret); + ret = -1; + } else { + ret = 0; } end_no_session: @@ -2128,39 +2230,34 @@ end_no_session: * * Return to the client if there is data in flight or not with a ret_code. */ -static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; + ssize_t send_ret; struct lttng_ht_iter iter; struct lttcomm_relayd_end_data_pending msg; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - uint64_t session_id; uint32_t is_data_inflight = 0; DBG("End data pending command"); - if (!conn->session || conn->version_check_done == 0) { + if (!conn->session || !conn->version_check_done) { ERR("Trying to check for data before version check"); ret = -1; goto end_no_session; } - ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); - if (ret < sizeof(msg)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid end data_pending struct size: %d", - ret); - } + if (payload->size < sizeof(msg)) { + ERR("Unexpected payload size in \"relay_end_data_pending\": expected >= %zu bytes, got %zu bytes", + sizeof(msg), payload->size); ret = -1; goto end_no_session; } - - session_id = be64toh(msg.session_id); + memcpy(&msg, payload->data, sizeof(msg)); + msg.session_id = be64toh(msg.session_id); /* * Iterate over all streams to see if the begin data pending @@ -2172,13 +2269,24 @@ static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, if (!stream_get(stream)) { continue; } - if (stream->trace->session->id != session_id) { + if (stream->trace->session->id != msg.session_id) { stream_put(stream); continue; } pthread_mutex_lock(&stream->lock); if (!stream->data_pending_check_done) { - if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, stream->stream_handle); @@ -2196,9 +2304,13 @@ static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, /* All good, send back reply. */ reply.ret_code = htobe32(is_data_inflight); - ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (ret < 0) { - ERR("Relay end data pending send reply failed"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"end data pending\" command reply (ret = %zd)", + send_ret); + ret = -1; + } else { + ret = 0; } end_no_session: @@ -2210,23 +2322,24 @@ end_no_session: * * Return 0 on success else a negative value. */ -static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret, send_ret; + int ret; + ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_index index_info; struct relay_index *index; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - uint64_t net_seq_num; size_t msg_len; assert(conn); DBG("Relay receiving index"); - if (!session || conn->version_check_done == 0) { + if (!session || !conn->version_check_done) { ERR("Trying to close a stream before version check"); ret = -1; goto end_no_session; @@ -2235,22 +2348,29 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, msg_len = lttcomm_relayd_index_len( lttng_to_index_major(conn->major, conn->minor), lttng_to_index_minor(conn->major, conn->minor)); - ret = conn->sock->ops->recvmsg(conn->sock, &index_info, - msg_len, 0); - if (ret < msg_len) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid index struct size : %d", ret); - } + if (payload->size < msg_len) { + ERR("Unexpected payload size in \"relay_recv_index\": expected >= %zu bytes, got %zu bytes", + msg_len, payload->size); ret = -1; goto end_no_session; } + memcpy(&index_info, payload->data, msg_len); + index_info.relay_stream_id = be64toh(index_info.relay_stream_id); + index_info.net_seq_num = be64toh(index_info.net_seq_num); + index_info.packet_size = be64toh(index_info.packet_size); + index_info.content_size = be64toh(index_info.content_size); + index_info.timestamp_begin = be64toh(index_info.timestamp_begin); + index_info.timestamp_end = be64toh(index_info.timestamp_end); + index_info.events_discarded = be64toh(index_info.events_discarded); + index_info.stream_id = be64toh(index_info.stream_id); - net_seq_num = be64toh(index_info.net_seq_num); + if (conn->minor >= 8) { + index_info.stream_instance_id = + be64toh(index_info.stream_instance_id); + index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } - stream = stream_get_by_id(be64toh(index_info.relay_stream_id)); + stream = stream_get_by_id(index_info.relay_stream_id); if (!stream) { ERR("stream_get_by_id not found"); ret = -1; @@ -2269,8 +2389,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, */ if (stream->index_received_seqcount > 0 && stream->indexes_in_flight == 0) { - stream->beacon_ts_end = - be64toh(index_info.timestamp_end); + stream->beacon_ts_end = index_info.timestamp_end; } ret = 0; goto end_stream_put; @@ -2279,9 +2398,9 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } if (stream->ctf_stream_id == -1ULL) { - stream->ctf_stream_id = be64toh(index_info.stream_id); + stream->ctf_stream_id = index_info.stream_id; } - index = relay_index_get_by_id_or_create(stream, net_seq_num); + index = relay_index_get_by_id_or_create(stream, index_info.net_seq_num); if (!index) { ret = -1; ERR("relay_index_get_by_id_or_create index NULL"); @@ -2298,12 +2417,23 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info.net_seq_num; + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_put; + } } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -2320,9 +2450,9 @@ end: reply.ret_code = htobe32(LTTNG_OK); } send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (send_ret < 0) { - ERR("Relay sending close index id reply"); - ret = send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"recv index\" command reply (ret = %zd)", send_ret); + ret = -1; } end_no_session: @@ -2334,17 +2464,19 @@ end_no_session: * * Return 0 on success else a negative value. */ -static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_streams_sent(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret, send_ret; + int ret; + ssize_t send_ret; struct lttcomm_relayd_generic_reply reply; assert(conn); DBG("Relay receiving streams_sent"); - if (!conn->session || conn->version_check_done == 0) { + if (!conn->session || !conn->version_check_done) { ERR("Trying to close a stream before version check"); ret = -1; goto end_no_session; @@ -2359,9 +2491,10 @@ static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_OK); send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (send_ret < 0) { - ERR("Relay sending sent_stream reply"); - ret = send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"streams sent\" command reply (ret = %zd)", + send_ret); + ret = -1; } else { /* Success. */ ret = 0; @@ -2372,19 +2505,22 @@ end_no_session: } /* - * relay_rotate_stream: rotate a stream to a new tracefile for the session + * relay_rotate_session_stream: rotate a stream to a new tracefile for the session * rotation feature (not the tracefile rotation feature). */ -static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { - int ret, send_ret; + int ret; + ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_rotate_stream stream_info; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - size_t len; - char *new_pathname = NULL; + size_t header_len; + size_t path_len; + struct lttng_buffer_view new_path_view; DBG("Rotate stream received"); @@ -2400,57 +2536,46 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_reply; } - memset(&stream_info, 0, sizeof(struct lttcomm_relayd_rotate_stream)); + header_len = sizeof(struct lttcomm_relayd_rotate_stream); - /* - * Receive the struct up to the new_pathname member since we don't know - * its size yet. - */ - ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, - sizeof(struct lttcomm_relayd_rotate_stream), 0); - if (ret < sizeof(struct lttcomm_relayd_rotate_stream)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid rotate_stream struct size : %d", ret); - } + if (payload->size < header_len) { + ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes", + header_len, payload->size); ret = -1; goto end_no_reply; } - stream = stream_get_by_id(be64toh(stream_info.stream_id)); - if (!stream) { - ret = -1; - goto end; - } + memcpy(&stream_info, payload->data, header_len); - len = be32toh(stream_info.pathname_length); + /* Convert to host */ + stream_info.pathname_length = be32toh(stream_info.pathname_length); + stream_info.stream_id = be64toh(stream_info.stream_id); + stream_info.new_chunk_id = be64toh(stream_info.new_chunk_id); + stream_info.rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num); + + path_len = stream_info.pathname_length; + if (payload->size < header_len + path_len) { + ERR("Unexpected payload size in \"relay_rotate_session_stream\" including path: expected >= %zu bytes, got %zu bytes", + header_len + path_len, payload->size); + ret = -1; + goto end_no_reply; + } + /* Ensure it fits in local filename length. */ - if (len >= LTTNG_PATH_MAX) { + if (path_len >= LTTNG_PATH_MAX) { ret = -ENAMETOOLONG; ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes", - len, LTTNG_PATH_MAX); + path_len, LTTNG_PATH_MAX); goto end; } - new_pathname = zmalloc(len); - if (!new_pathname) { - PERROR("Failed to allocation new path name of relay_rotate_session_stream command"); - ret = -1; - goto end; - } + new_path_view = lttng_buffer_view_from_view(payload, header_len, + stream_info.pathname_length); - ret = conn->sock->ops->recvmsg(conn->sock, new_pathname, len, 0); - if (ret < len) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid rotate_stream struct size : %d", ret); - } + stream = stream_get_by_id(stream_info.stream_id); + if (!stream) { ret = -1; - goto end_no_reply; + goto end; } pthread_mutex_lock(&stream->lock); @@ -2459,33 +2584,48 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr, * Update the trace path (just the folder, the stream name does not * change). */ - free(stream->path_name); - stream->path_name = create_output_path(new_pathname); + free(stream->prev_path_name); + stream->prev_path_name = stream->path_name; + stream->path_name = create_output_path(new_path_view.data); if (!stream->path_name) { ERR("Failed to create a new output path"); + ret = -1; goto end_stream_unlock; } ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, -1, -1); if (ret < 0) { ERR("relay creating output directory"); + ret = -1; goto end_stream_unlock; } - stream->chunk_id = be64toh(stream_info.new_chunk_id); + + assert(stream->current_chunk_id.is_set); + stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { + /* + * Metadata streams have no index; consider its rotation + * complete. + */ + stream->index_rotated = true; /* * The metadata stream is sent only over the control connection * so we know we have all the data to perform the stream * rotation. */ - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } else { - stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num); - ret = try_rotate_stream(stream); - } - if (ret < 0) { - goto end_stream_unlock; + stream->rotate_at_seq_num = stream_info.rotate_at_seq_num; + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end_stream_unlock; + } + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } end_stream_unlock: @@ -2500,29 +2640,31 @@ end: } send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(struct lttcomm_relayd_generic_reply), 0); - if (send_ret < 0) { - ERR("Failed to send reply of rotate session stream command"); - ret = send_ret; + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"rotate session stream\" command reply (ret = %zd)", + send_ret); + ret = -1; } end_no_reply: - free(new_pathname); return ret; } /* * relay_mkdir: Create a folder on the disk. */ -static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; - ssize_t network_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_mkdir path_info_header; - struct lttcomm_relayd_mkdir *path_info = NULL; struct lttcomm_relayd_generic_reply reply; char *path = NULL; + size_t header_len; + ssize_t send_ret; + struct lttng_buffer_view path_view; if (!session || !conn->version_check_done) { ERR("Trying to create a directory before version check"); @@ -2540,22 +2682,25 @@ static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - network_ret = conn->sock->ops->recvmsg(conn->sock, &path_info_header, - sizeof(path_info_header), 0); - if (network_ret < (ssize_t) sizeof(path_info_header)) { - if (network_ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Reception of mkdir command argument length failed with ret = %zi, expected %zu", - network_ret, sizeof(path_info_header)); - } + header_len = sizeof(path_info_header); + if (payload->size < header_len) { + ERR("Unexpected payload size in \"relay_mkdir\": expected >= %zu bytes, got %zu bytes", + header_len, payload->size); ret = -1; goto end_no_session; } + memcpy(&path_info_header, payload->data, header_len); + path_info_header.length = be32toh(path_info_header.length); + if (payload->size < header_len + path_info_header.length) { + ERR("Unexpected payload size in \"relay_mkdir\" including path: expected >= %zu bytes, got %zu bytes", + header_len + path_info_header.length, payload->size); + ret = -1; + goto end_no_session; + } + /* Ensure that it fits in local path length. */ if (path_info_header.length >= LTTNG_PATH_MAX) { ret = -ENAMETOOLONG; @@ -2564,34 +2709,17 @@ static int relay_mkdir(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - path_info = zmalloc(sizeof(path_info_header) + path_info_header.length); - if (!path_info) { - PERROR("zmalloc of mkdir command path"); - ret = -1; - goto end; - } - - network_ret = conn->sock->ops->recvmsg(conn->sock, path_info->path, - path_info_header.length, 0); - if (network_ret < (ssize_t) path_info_header.length) { - if (network_ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Reception of mkdir path argument failed with ret = %zi, expected %" PRIu32, - network_ret, path_info_header.length); - } - ret = -1; - goto end_no_session; - } + path_view = lttng_buffer_view_from_view(payload, header_len, + path_info_header.length); - path = create_output_path(path_info->path); + path = create_output_path(path_view.data); if (!path) { ERR("Failed to create output path"); ret = -1; goto end; } + DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path); ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); if (ret < 0) { ERR("relay creating output directory"); @@ -2607,18 +2735,14 @@ end: } else { reply.ret_code = htobe32(LTTNG_OK); } - network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(struct lttcomm_relayd_generic_reply), 0); - if (network_ret < (ssize_t) sizeof(struct lttcomm_relayd_generic_reply)) { - ERR("Failed to send mkdir command status code with ret = %zi, expected %zu", - network_ret, - sizeof(struct lttcomm_relayd_generic_reply)); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"mkdir\" command reply (ret = %zd)", send_ret); ret = -1; } end_no_session: free(path); - free(path_info); return ret; } @@ -2643,18 +2767,20 @@ static int validate_rotate_rename_path_length(const char *path_type, * completed. We are not closing any fd here, just moving the folder, so it * works even if data is still in-flight. */ -static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { int ret; - ssize_t network_ret; + ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_generic_reply reply; struct lttcomm_relayd_rotate_rename header; - char *received_paths = NULL; + size_t header_len; size_t received_paths_size; - const char *received_old_path, *received_new_path; char *complete_old_path = NULL, *complete_new_path = NULL; + struct lttng_buffer_view old_path_view; + struct lttng_buffer_view new_path_view; if (!session || !conn->version_check_done) { ERR("Trying to rename a trace folder before version check"); @@ -2668,25 +2794,27 @@ static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_reply; } - network_ret = conn->sock->ops->recvmsg(conn->sock, &header, - sizeof(header), 0); - if (network_ret < (ssize_t) sizeof(header)) { - if (network_ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", - conn->sock->fd); - } else { - ERR("Relay didn't receive a valid rotate_rename command header: expected %zu bytes, recvmsg() returned %zi", - sizeof(header), network_ret); - } + header_len = sizeof(header); + if (payload->size < header_len) { + ERR("Unexpected payload size in \"relay_rotate_rename\": expected >= %zu bytes, got %zu bytes", + header_len, payload->size); ret = -1; goto end_no_reply; } + memcpy(&header, payload->data, header_len); + header.old_path_length = be32toh(header.old_path_length); header.new_path_length = be32toh(header.new_path_length); received_paths_size = header.old_path_length + header.new_path_length; + if (payload->size < header_len + received_paths_size) { + ERR("Unexpected payload size in \"relay_rotate_rename\" including paths: expected >= %zu bytes, got %zu bytes", + header_len, payload->size); + ret = -1; + goto end_no_reply; + } + /* Ensure the paths don't exceed their allowed size. */ ret = validate_rotate_rename_path_length("old", header.old_path_length); if (ret) { @@ -2697,56 +2825,41 @@ static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - received_paths = zmalloc(received_paths_size); - if (!received_paths) { - PERROR("Could not allocate rotate commands paths reception buffer"); - ret = -1; - goto end; - } - - network_ret = conn->sock->ops->recvmsg(conn->sock, received_paths, - received_paths_size, 0); - if (network_ret < (ssize_t) received_paths_size) { - if (network_ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", - conn->sock->fd); - } else { - ERR("Relay failed to received rename command paths (%zu bytes): recvmsg() returned %zi", - received_paths_size, network_ret); - } - ret = -1; - goto end_no_reply; - } + old_path_view = lttng_buffer_view_from_view(payload, header_len, + header.old_path_length); + new_path_view = lttng_buffer_view_from_view(payload, + header_len + header.old_path_length, + header.new_path_length); /* Validate that both paths received are NULL terminated. */ - if (received_paths[header.old_path_length - 1] != '\0') { + if (old_path_view.data[old_path_view.size - 1] != '\0') { ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)"); ret = -1; goto end; } - if (received_paths[received_paths_size - 1] != '\0') { + if (new_path_view.data[new_path_view.size - 1] != '\0') { ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)"); ret = -1; goto end; } - received_old_path = received_paths; - received_new_path = received_paths + header.old_path_length; - - complete_old_path = create_output_path(received_old_path); + DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"", + old_path_view.data, new_path_view.data); + complete_old_path = create_output_path(old_path_view.data); if (!complete_old_path) { ERR("Failed to build old output path in rotate_rename command"); ret = -1; goto end; } - complete_new_path = create_output_path(received_new_path); + complete_new_path = create_output_path(new_path_view.data); if (!complete_new_path) { ERR("Failed to build new output path in rotate_rename command"); ret = -1; goto end; } + DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"", + complete_old_path, complete_new_path); ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, -1, -1); @@ -2775,15 +2888,15 @@ end: } else { reply.ret_code = htobe32(LTTNG_OK); } - network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(struct lttcomm_relayd_generic_reply), 0); - if (network_ret < sizeof(struct lttcomm_relayd_generic_reply)) { - ERR("Relay sending stream id"); + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(reply), 0); + if (send_ret < sizeof(reply)) { + ERR("Failed to send \"rotate rename\" command reply (ret = %zd)", + send_ret); ret = -1; } end_no_reply: - free(received_paths); free(complete_old_path); free(complete_new_path); return ret; @@ -2798,16 +2911,17 @@ end_no_reply: * new chunk folder). */ static -int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) { struct relay_session *session = conn->session; struct lttcomm_relayd_rotate_pending msg; - struct lttcomm_relayd_generic_reply reply; + struct lttcomm_relayd_rotate_pending_reply reply; struct lttng_ht_iter iter; struct relay_stream *stream; - int ret; - ssize_t network_ret; + int ret = 0; + ssize_t send_ret; uint64_t chunk_id; bool rotate_pending = false; @@ -2825,21 +2939,19 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_reply; } - network_ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); - if (network_ret < (ssize_t) sizeof(msg)) { - if (network_ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Relay didn't receive valid rotate_pending struct size : %zi", - network_ret); - } + if (payload->size < sizeof(msg)) { + ERR("Unexpected payload size in \"relay_rotate_pending\": expected >= %zu bytes, got %zu bytes", + sizeof(msg), payload->size); ret = -1; goto end_no_reply; } + memcpy(&msg, payload->data, sizeof(msg)); + chunk_id = be64toh(msg.chunk_id); - DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + + DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64, + session->session_name, chunk_id); /* * Iterate over all the streams in the session and check if they are @@ -2861,7 +2973,7 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, rotate_pending = true; DBG("Stream %" PRIu64 " is still rotating", stream->stream_handle); - } else if (stream->chunk_id < chunk_id) { + } else if (stream->current_chunk_id.value < chunk_id) { /* * Stream closed on the consumer but still active on the * relay. @@ -2883,11 +2995,13 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, send_reply: rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); - reply.ret_code = htobe32(rotate_pending ? 1 : 0); - network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK); + reply.is_pending = (uint8_t) !!rotate_pending; + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (network_ret < (ssize_t) sizeof(reply)) { - ERR("Relay rotate pending ret code failed"); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"rotate pending\" command reply (ret = %zd)", + send_ret); ret = -1; } @@ -2895,69 +3009,87 @@ end_no_reply: return ret; } -/* - * Process the commands received on the control socket - */ -static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn) +#define DBG_CMD(cmd_name, conn) \ + DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); + +static int relay_process_control_command(struct relay_connection *conn, + const struct lttcomm_relayd_hdr *header, + const struct lttng_buffer_view *payload) { int ret = 0; - switch (be32toh(recv_hdr->cmd)) { + switch (header->cmd) { case RELAYD_CREATE_SESSION: - ret = relay_create_session(recv_hdr, conn); + DBG_CMD("RELAYD_CREATE_SESSION", conn); + ret = relay_create_session(header, conn, payload); break; case RELAYD_ADD_STREAM: - ret = relay_add_stream(recv_hdr, conn); + DBG_CMD("RELAYD_ADD_STREAM", conn); + ret = relay_add_stream(header, conn, payload); break; case RELAYD_START_DATA: - ret = relay_start(recv_hdr, conn); + DBG_CMD("RELAYD_START_DATA", conn); + ret = relay_start(header, conn, payload); break; case RELAYD_SEND_METADATA: - ret = relay_recv_metadata(recv_hdr, conn); + DBG_CMD("RELAYD_SEND_METADATA", conn); + ret = relay_recv_metadata(header, conn, payload); break; case RELAYD_VERSION: - ret = relay_send_version(recv_hdr, conn); + DBG_CMD("RELAYD_VERSION", conn); + ret = relay_send_version(header, conn, payload); break; case RELAYD_CLOSE_STREAM: - ret = relay_close_stream(recv_hdr, conn); + DBG_CMD("RELAYD_CLOSE_STREAM", conn); + ret = relay_close_stream(header, conn, payload); break; case RELAYD_DATA_PENDING: - ret = relay_data_pending(recv_hdr, conn); + DBG_CMD("RELAYD_DATA_PENDING", conn); + ret = relay_data_pending(header, conn, payload); break; case RELAYD_QUIESCENT_CONTROL: - ret = relay_quiescent_control(recv_hdr, conn); + DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn); + ret = relay_quiescent_control(header, conn, payload); break; case RELAYD_BEGIN_DATA_PENDING: - ret = relay_begin_data_pending(recv_hdr, conn); + DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn); + ret = relay_begin_data_pending(header, conn, payload); break; case RELAYD_END_DATA_PENDING: - ret = relay_end_data_pending(recv_hdr, conn); + DBG_CMD("RELAYD_END_DATA_PENDING", conn); + ret = relay_end_data_pending(header, conn, payload); break; case RELAYD_SEND_INDEX: - ret = relay_recv_index(recv_hdr, conn); + DBG_CMD("RELAYD_SEND_INDEX", conn); + ret = relay_recv_index(header, conn, payload); break; case RELAYD_STREAMS_SENT: - ret = relay_streams_sent(recv_hdr, conn); + DBG_CMD("RELAYD_STREAMS_SENT", conn); + ret = relay_streams_sent(header, conn, payload); break; case RELAYD_RESET_METADATA: - ret = relay_reset_metadata(recv_hdr, conn); + DBG_CMD("RELAYD_RESET_METADATA", conn); + ret = relay_reset_metadata(header, conn, payload); break; case RELAYD_ROTATE_STREAM: - ret = relay_rotate_session_stream(recv_hdr, conn); + DBG_CMD("RELAYD_ROTATE_STREAM", conn); + ret = relay_rotate_session_stream(header, conn, payload); break; case RELAYD_ROTATE_RENAME: - ret = relay_rotate_rename(recv_hdr, conn); + DBG_CMD("RELAYD_ROTATE_RENAME", conn); + ret = relay_rotate_rename(header, conn, payload); break; case RELAYD_ROTATE_PENDING: - ret = relay_rotate_pending(recv_hdr, conn); + DBG_CMD("RELAYD_ROTATE_PENDING", conn); + ret = relay_rotate_pending(header, conn, payload); break; case RELAYD_MKDIR: - ret = relay_mkdir(recv_hdr, conn); + DBG_CMD("RELAYD_MKDIR", conn); + ret = relay_mkdir(header, conn, payload); break; case RELAYD_UPDATE_SYNC_INFO: default: - ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); + ERR("Received unknown command (%u)", header->cmd); relay_unknown_command(conn); ret = -1; goto end; @@ -2967,6 +3099,194 @@ end: return ret; } +static enum relay_connection_status relay_process_control_receive_payload( + struct relay_connection *conn) +{ + int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; + struct lttng_dynamic_buffer *reception_buffer = + &conn->protocol.ctrl.reception_buffer; + struct ctrl_connection_state_receive_payload *state = + &conn->protocol.ctrl.state.receive_payload; + struct lttng_buffer_view payload_view; + + if (state->left_to_receive == 0) { + /* Short-circuit for payload-less commands. */ + goto reception_complete; + } + + ret = conn->sock->ops->recvmsg(conn->sock, + reception_buffer->data + state->received, + state->left_to_receive, MSG_DONTWAIT); + if (ret < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive command payload on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } + goto end; + } else if (ret == 0) { + DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_CLOSED; + goto end; + } + + assert(ret > 0); + assert(ret <= state->left_to_receive); + + state->left_to_receive -= ret; + state->received += ret; + + if (state->left_to_receive > 0) { + /* + * Can't transition to the protocol's next state, wait to + * receive the rest of the header. + */ + DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", + state->received, state->left_to_receive, + conn->sock->fd); + goto end; + } + +reception_complete: + DBG("Done receiving control command payload: fd = %i, payload size = %" PRIu64 " bytes", + conn->sock->fd, state->received); + /* + * The payload required to process the command has been received. + * A view to the reception buffer is forwarded to the various + * commands and the state of the control is reset on success. + * + * Commands are responsible for sending their reply to the peer. + */ + payload_view = lttng_buffer_view_from_dynamic_buffer(reception_buffer, + 0, -1); + ret = relay_process_control_command(conn, + &state->header, &payload_view); + if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end; + } + + ret = connection_reset_protocol_state(conn); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + } +end: + return status; +} + +static enum relay_connection_status relay_process_control_receive_header( + struct relay_connection *conn) +{ + int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; + struct lttcomm_relayd_hdr header; + struct lttng_dynamic_buffer *reception_buffer = + &conn->protocol.ctrl.reception_buffer; + struct ctrl_connection_state_receive_header *state = + &conn->protocol.ctrl.state.receive_header; + + assert(state->left_to_receive != 0); + + ret = conn->sock->ops->recvmsg(conn->sock, + reception_buffer->data + state->received, + state->left_to_receive, MSG_DONTWAIT); + if (ret < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive control command header on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } + goto end; + } else if (ret == 0) { + DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_CLOSED; + goto end; + } + + assert(ret > 0); + assert(ret <= state->left_to_receive); + + state->left_to_receive -= ret; + state->received += ret; + + if (state->left_to_receive > 0) { + /* + * Can't transition to the protocol's next state, wait to + * receive the rest of the header. + */ + DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", + state->received, state->left_to_receive, + conn->sock->fd); + goto end; + } + + /* Transition to next state: receiving the command's payload. */ + conn->protocol.ctrl.state_id = + CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD; + memcpy(&header, reception_buffer->data, sizeof(header)); + header.circuit_id = be64toh(header.circuit_id); + header.data_size = be64toh(header.data_size); + header.cmd = be32toh(header.cmd); + header.cmd_version = be32toh(header.cmd_version); + memcpy(&conn->protocol.ctrl.state.receive_payload.header, + &header, sizeof(header)); + + DBG("Done receiving control command header: fd = %i, cmd = %" PRIu32 ", cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", + conn->sock->fd, header.cmd, header.cmd_version, + header.data_size); + + if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { + ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", + header.data_size); + status = RELAY_CONNECTION_STATUS_ERROR; + goto end; + } + + conn->protocol.ctrl.state.receive_payload.left_to_receive = + header.data_size; + conn->protocol.ctrl.state.receive_payload.received = 0; + ret = lttng_dynamic_buffer_set_size(reception_buffer, + header.data_size); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end; + } + + if (header.data_size == 0) { + /* + * Manually invoke the next state as the poll loop + * will not wake-up to allow us to proceed further. + */ + status = relay_process_control_receive_payload(conn); + } +end: + return status; +} + +/* + * Process the commands received on the control socket + */ +static enum relay_connection_status relay_process_control( + struct relay_connection *conn) +{ + enum relay_connection_status status; + + switch (conn->protocol.ctrl.state_id) { + case CTRL_CONNECTION_STATE_RECEIVE_HEADER: + status = relay_process_control_receive_header(conn); + break; + case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD: + status = relay_process_control_receive_payload(conn); + break; + default: + ERR("Unknown control connection protocol state encountered."); + abort(); + } + + return status; +} + /* * Handle index for a data stream. * @@ -2975,7 +3295,7 @@ end: * Return 0 on success else a negative value. */ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, - int rotate_index, bool *flushed, uint64_t total_size) + bool rotate_index, bool *flushed, uint64_t total_size) { int ret = 0; uint64_t data_offset; @@ -2999,7 +3319,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } if (rotate_index || !stream->index_file) { - ret = create_rotate_index_file(stream); + const char *stream_path; + + /* + * The data connection creates the stream's first index file. + * + * This can happen _after_ a ROTATE_STREAM command. In + * other words, the data of the first packet of this stream + * can be received after a ROTATE_STREAM command. + * + * The ROTATE_STREAM command changes the stream's path_name + * to point to the "next" chunk. If a rotation is pending for + * this stream, as indicated by "rotate_at_seq_num != -1ULL", + * it means that we are still receiving data that belongs in the + * stream's former path. + * + * In this very specific case, we must ensure that the index + * file is created in the streams's former path, + * "prev_path_name". + * + * All other rotations beyond the first one are not affected + * by this problem since the actual rotation operation creates + * the new chunk's index file. + */ + stream_path = stream->rotate_at_seq_num == -1ULL ? + stream->path_name: + stream->prev_path_name; + + ret = create_rotate_index_file(stream, stream_path); if (ret < 0) { ERR("Failed to rotate index"); /* Put self-ref for this index due to error. */ @@ -3027,67 +3374,99 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: return ret; } -/* - * relay_process_data: Process the data received on the data socket - */ -static int relay_process_data(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_header( + struct relay_connection *conn) { - int ret = 0, rotate_index = 0; - ssize_t size_ret; + int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; + struct data_connection_state_receive_header *state = + &conn->protocol.data.state.receive_header; + struct lttcomm_relayd_data_hdr header; struct relay_stream *stream; - struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id; - uint64_t net_seq_num; - uint32_t data_size; - struct relay_session *session; - bool new_stream = false, close_requested = false; - size_t chunk_size = RECV_DATA_BUFFER_SIZE; - size_t recv_off = 0; - char data_buffer[chunk_size]; - bool index_flushed = false; - - ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, - sizeof(struct lttcomm_relayd_data_hdr), 0); - if (ret <= 0) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Unable to receive data header on sock %d", conn->sock->fd); + + assert(state->left_to_receive != 0); + + ret = conn->sock->ops->recvmsg(conn->sock, + state->header_reception_buffer + state->received, + state->left_to_receive, MSG_DONTWAIT); + if (ret < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive data header on sock %d", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; } - ret = -1; + goto end; + } else if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } - stream_id = be64toh(data_hdr.stream_id); - stream = stream_get_by_id(stream_id); - if (!stream) { - ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id); - ret = -1; + assert(ret > 0); + assert(ret <= state->left_to_receive); + + state->left_to_receive -= ret; + state->received += ret; + + if (state->left_to_receive > 0) { + /* + * Can't transition to the protocol's next state, wait to + * receive the rest of the header. + */ + DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", + state->received, state->left_to_receive, + conn->sock->fd); goto end; } - session = stream->trace->session; - data_size = be32toh(data_hdr.data_size); - net_seq_num = be64toh(data_hdr.net_seq_num); + /* Transition to next state: receiving the payload. */ + conn->protocol.data.state_id = DATA_CONNECTION_STATE_RECEIVE_PAYLOAD; + + memcpy(&header, state->header_reception_buffer, sizeof(header)); + header.circuit_id = be64toh(header.circuit_id); + header.stream_id = be64toh(header.stream_id); + header.data_size = be32toh(header.data_size); + header.net_seq_num = be64toh(header.net_seq_num); + header.padding_size = be32toh(header.padding_size); + memcpy(&conn->protocol.data.state.receive_payload.header, &header, sizeof(header)); + + conn->protocol.data.state.receive_payload.left_to_receive = + header.data_size; + conn->protocol.data.state.receive_payload.received = 0; + conn->protocol.data.state.receive_payload.rotate_index = false; - DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, - data_size, stream_id, net_seq_num); + DBG("Received data connection header on fd %i: circuit_id = %" PRIu64 ", stream_id = %" PRIu64 ", data_size = %" PRIu32 ", net_seq_num = %" PRIu64 ", padding_size = %" PRIu32, + conn->sock->fd, header.circuit_id, + header.stream_id, header.data_size, + header.net_seq_num, header.padding_size); + + stream = stream_get_by_id(header.stream_id); + if (!stream) { + DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, + header.stream_id); + /* Protocol error. */ + status = RELAY_CONNECTION_STATUS_ERROR; + goto end; + } pthread_mutex_lock(&stream->lock); /* Check if a rotation is needed. */ if (stream->tracefile_size > 0 && - (stream->tracefile_size_current + data_size) > + (stream->tracefile_size_current + header.data_size) > stream->tracefile_size) { uint64_t old_id, new_id; @@ -3103,88 +3482,185 @@ static int relay_process_data(struct relay_connection *conn) -1, stream->stream_fd->fd, &new_id, &stream->stream_fd->fd); if (ret < 0) { - ERR("Rotating stream output file"); + ERR("Failed to rotate stream output file"); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } + /* * Reset current size because we just performed a stream * rotation. */ stream->tracefile_size_current = 0; - rotate_index = 1; + conn->protocol.data.state.receive_payload.rotate_index = true; } - /* - * Index are handled in protocol version 2.4 and above. Also, - * snapshot and index are NOT supported. - */ - if (session->minor >= 4 && !session->snapshot) { - ret = handle_index_data(stream, net_seq_num, rotate_index, - &index_flushed, - data_size + be32toh(data_hdr.padding_size)); - if (ret < 0) { - ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", - stream->stream_handle, net_seq_num, ret); +end_stream_unlock: + pthread_mutex_unlock(&stream->lock); + stream_put(stream); +end: + return status; +} + +static enum relay_connection_status relay_process_data_receive_payload( + struct relay_connection *conn) +{ + int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; + struct relay_stream *stream; + struct data_connection_state_receive_payload *state = + &conn->protocol.data.state.receive_payload; + const size_t chunk_size = RECV_DATA_BUFFER_SIZE; + char data_buffer[chunk_size]; + bool partial_recv = false; + bool new_stream = false, close_requested = false, index_flushed = false; + uint64_t left_to_receive = state->left_to_receive; + struct relay_session *session; + + DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", + state->header.stream_id, state->header.net_seq_num, + state->received, left_to_receive); + + stream = stream_get_by_id(state->header.stream_id); + if (!stream) { + /* Protocol error. */ + ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64, + state->header.stream_id); + status = RELAY_CONNECTION_STATUS_ERROR; + goto end; + } + + pthread_mutex_lock(&stream->lock); + session = stream->trace->session; + if (!conn->session) { + ret = connection_set_session(conn, session); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } } - for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) { - size_t recv_size = min(data_size - recv_off, chunk_size); + /* + * The size of the "chunk" received on any iteration is bounded by: + * - the data left to receive, + * - the data immediately available on the socket, + * - the on-stack data buffer + */ + while (left_to_receive > 0 && !partial_recv) { + ssize_t write_ret; + size_t recv_size = min(left_to_receive, chunk_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0); - if (ret <= 0) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Socket %d error %d", conn->sock->fd, ret); + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, + recv_size, MSG_DONTWAIT); + if (ret < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Socket %d error", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; } - ret = -1; goto end_stream_unlock; + } else if (ret == 0) { + /* No more data ready to be consumed on socket. */ + DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, + state->header.stream_id); + status = RELAY_CONNECTION_STATUS_CLOSED; + break; + } else if (ret < (int) recv_size) { + /* + * All the data available on the socket has been + * consumed. + */ + partial_recv = true; } + recv_size = ret; + /* Write data to stream output fd. */ - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, + write_ret = lttng_write(stream->stream_fd->fd, data_buffer, recv_size); - if (size_ret < recv_size) { + if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } + left_to_receive -= recv_size; + state->received += recv_size; + state->left_to_receive = left_to_receive; + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, - size_ret, stream->stream_handle); + write_ret, stream->stream_handle); + } + + if (state->left_to_receive > 0) { + /* + * Did not receive all the data expected, wait for more data to + * become available on the socket. + */ + DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", + state->header.stream_id, state->received, + state->left_to_receive); + goto end_stream_unlock; } ret = write_padding_to_file(stream->stream_fd->fd, - be32toh(data_hdr.padding_size)); - if (ret < 0) { + state->header.padding_size); + if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", - stream->stream_handle, net_seq_num, ret); + stream->stream_handle, + state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } - stream->tracefile_size_current += - data_size + be32toh(data_hdr.padding_size); - if (stream->prev_seq == -1ULL) { + + + if (session_streams_have_index(session)) { + ret = handle_index_data(stream, state->header.net_seq_num, + state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size); + if (ret < 0) { + ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, + state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + } + + stream->tracefile_size_current += state->header.data_size + + state->header.padding_size; + + if (stream->prev_data_seq == -1ULL) { new_stream = true; } if (index_flushed) { stream->pos_after_last_complete_data_index = stream->tracefile_size_current; + stream->prev_index_seq = state->header.net_seq_num; + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } - stream->prev_seq = net_seq_num; + stream->prev_data_seq = state->header.net_seq_num; + + /* + * Resetting the protocol state (to RECEIVE_HEADER) will trash the + * contents of *state which are aliased (union) to the same location as + * the new state. Don't use it beyond this point. + */ + connection_reset_protocol_state(conn); + state = NULL; - ret = try_rotate_stream(stream); + ret = try_rotate_stream_data(stream); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); - if (close_requested) { + if (close_requested && left_to_receive == 0) { try_stream_close(stream); } @@ -3193,9 +3669,33 @@ end_stream_unlock: uatomic_set(&session->new_streams, 1); pthread_mutex_unlock(&session->lock); } + stream_put(stream); end: - return ret; + return status; +} + +/* + * relay_process_data: Process the data received on the data socket + */ +static enum relay_connection_status relay_process_data( + struct relay_connection *conn) +{ + enum relay_connection_status status; + + switch (conn->protocol.data.state_id) { + case DATA_CONNECTION_STATE_RECEIVE_HEADER: + status = relay_process_data_receive_header(conn); + break; + case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD: + status = relay_process_data_receive_payload(conn); + break; + default: + ERR("Unexpected data connection communication state."); + abort(); + } + + return status; } static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) @@ -3246,7 +3746,6 @@ static void *relay_thread_worker(void *data) struct lttng_poll_event events; struct lttng_ht *relay_connections_ht; struct lttng_ht_iter iter; - struct lttcomm_relayd_hdr recv_hdr; struct relay_connection *destroy_conn = NULL; DBG("[thread] Relay worker started"); @@ -3368,21 +3867,36 @@ restart: assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { - ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, - &recv_hdr, sizeof(recv_hdr), 0); - if (ret <= 0) { - /* Connection closed */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } else { - ret = relay_process_control(&recv_hdr, ctrl_conn); - if (ret < 0) { - /* Clear the session on error. */ - relay_thread_close_connection(&events, - pollfd, ctrl_conn); + enum relay_connection_status status; + + status = relay_process_control(ctrl_conn); + if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the control connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(ctrl_conn->session); } - seen_control = 1; + + /* Clear the connection on error or close. */ + relay_thread_close_connection(&events, + pollfd, + ctrl_conn); } + seen_control = 1; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { relay_thread_close_connection(&events, pollfd, ctrl_conn); @@ -3451,9 +3965,30 @@ restart: assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - ret = relay_process_data(data_conn); - /* Connection closed */ - if (ret < 0) { + enum relay_connection_status status; + + status = relay_process_data(data_conn); + /* Connection closed or error. */ + if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the data connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(data_conn->session); + } relay_thread_close_connection(&events, pollfd, data_conn); /* @@ -3485,16 +4020,14 @@ restart: exit: error: - /* Cleanup reamaining connection object. */ + /* Cleanup remaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { health_code_update(); - if (session_abort(destroy_conn->session)) { - assert(0); - } + session_abort(destroy_conn->session); /* * No need to grab another ref, because we own