X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=dddc2a2b4f77b25c9ba968826460147302f25e0a;hp=44ea6cdeee8cd06baf8e0127032de7fc0b4aed03;hb=348a81dcf7b6944b10a813d93dcaf86fdb5194f6;hpb=5312a3edd938117cb5670de711f237e7b6f2e68c diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 44ea6cdee..dddc2a2b4 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -74,6 +74,7 @@ #include "connection.h" #include "tracefile-array.h" #include "tcp_keep_alive.h" +#include "sessiond-trace-chunks.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -83,6 +84,14 @@ NULL #endif ; +enum relay_connection_status { + RELAY_CONNECTION_STATUS_OK, + /* An error occurred while processing an event on the connection. */ + RELAY_CONNECTION_STATUS_ERROR, + /* Connection closed/shutdown cleanly. */ + RELAY_CONNECTION_STATUS_CLOSED, +}; + /* command line options */ char *opt_output_path; static int opt_daemon, opt_background; @@ -159,6 +168,8 @@ struct lttng_ht *sessions_ht; /* Relayd health monitoring */ struct health_app *health_relayd; +struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry; + static struct option long_options[] = { { "control-port", 1, 0, 'C', }, { "data-port", 1, 0, 'D', }, @@ -850,14 +861,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* - * No activity for this FD (poll - * implementation). - */ - continue; - } - /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -1075,6 +1078,11 @@ static int set_index_control_data(struct relay_index *index, return relay_index_set_data(index, &index_data); } +static bool session_streams_have_index(const struct relay_session *session) +{ + return session->minor >= 4 && !session->snapshot; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -1086,34 +1094,60 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, { int ret = 0; ssize_t send_ret; - struct relay_session *session; - struct lttcomm_relayd_status_session reply; - char session_name[LTTNG_NAME_MAX]; - char hostname[LTTNG_HOST_NAME_MAX]; + struct relay_session *session = NULL; + struct lttcomm_relayd_status_session reply = {}; + char session_name[LTTNG_NAME_MAX] = {}; + char hostname[LTTNG_HOST_NAME_MAX] = {}; uint32_t live_timer = 0; bool snapshot = false; - - memset(session_name, 0, LTTNG_NAME_MAX); - memset(hostname, 0, LTTNG_HOST_NAME_MAX); - - memset(&reply, 0, sizeof(reply)); - - switch (conn->minor) { - case 1: - case 2: - case 3: - break; - case 4: /* LTTng sessiond 2.4 */ - default: + /* Left nil for peers < 2.11. */ + lttng_uuid sessiond_uuid = {}; + LTTNG_OPTIONAL(uint64_t) id_sessiond = {}; + LTTNG_OPTIONAL(uint64_t) current_chunk_id = {}; + LTTNG_OPTIONAL(time_t) creation_time = {}; + + if (conn->minor < 4) { + /* From 2.1 to 2.3 */ + ret = 0; + } else if (conn->minor >= 4 && conn->minor < 11) { + /* From 2.4 to 2.10 */ ret = cmd_create_session_2_4(payload, session_name, hostname, &live_timer, &snapshot); + } else { + bool has_current_chunk; + uint64_t current_chunk_id_value; + time_t creation_time_value; + uint64_t id_sessiond_value; + + /* From 2.11 to ... */ + ret = cmd_create_session_2_11(payload, session_name, hostname, + &live_timer, &snapshot, &id_sessiond_value, + sessiond_uuid, &has_current_chunk, + ¤t_chunk_id_value, &creation_time_value); + if (lttng_uuid_is_nil(sessiond_uuid)) { + /* The nil UUID is reserved for pre-2.11 clients. */ + ERR("Illegal nil UUID announced by peer in create session command"); + ret = -1; + goto send_reply; + } + LTTNG_OPTIONAL_SET(&id_sessiond, id_sessiond_value); + LTTNG_OPTIONAL_SET(&creation_time, creation_time_value); + if (has_current_chunk) { + LTTNG_OPTIONAL_SET(¤t_chunk_id, + current_chunk_id_value); + } } + if (ret < 0) { goto send_reply; } session = session_create(session_name, hostname, live_timer, - snapshot, conn->major, conn->minor); + snapshot, sessiond_uuid, + id_sessiond.is_set ? &id_sessiond.value : NULL, + current_chunk_id.is_set ? ¤t_chunk_id.value : NULL, + creation_time.is_set ? &creation_time.value : NULL, + conn->major, conn->minor); if (!session) { ret = -1; goto send_reply; @@ -1137,7 +1171,9 @@ send_reply: send_ret); ret = -1; } - + if (ret < 0 && session) { + session_put(session); + } return ret; } @@ -1171,6 +1207,34 @@ static void publish_connection_local_streams(struct relay_connection *conn) pthread_mutex_unlock(&session->lock); } +static int conform_channel_path(char *channel_path) +{ + int ret = 0; + + if (strstr("../", channel_path)) { + ERR("Refusing channel path as it walks up the path hierarchy: \"%s\"", + channel_path); + ret = -1; + goto end; + } + + if (*channel_path == '/') { + const size_t len = strlen(channel_path); + + /* + * Channel paths from peers prior to 2.11 are expressed as an + * absolute path that is, in reality, relative to the relay + * daemon's output directory. Remove the leading slash so it + * is correctly interpreted as a relative path later on. + * + * len (and not len - 1) is used to copy the trailing NULL. + */ + bcopy(channel_path + 1, channel_path, len); + } +end: + return ret; +} + /* * relay_add_stream: allocate a new stream for a session */ @@ -1187,6 +1251,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, uint64_t stream_handle = -1ULL; char *path_name = NULL, *channel_name = NULL; uint64_t tracefile_size = 0, tracefile_count = 0; + LTTNG_OPTIONAL(uint64_t) stream_chunk_id = {}; if (!session || !conn->version_check_done) { ERR("Trying to add a stream before version check"); @@ -1194,21 +1259,30 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - switch (session->minor) { - case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */ + if (session->minor == 1) { + /* For 2.1 */ ret = cmd_recv_stream_2_1(payload, &path_name, &channel_name); - break; - case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */ - default: + } else if (session->minor > 1 && session->minor < 11) { + /* From 2.2 to 2.10 */ ret = cmd_recv_stream_2_2(payload, &path_name, &channel_name, &tracefile_size, &tracefile_count); - break; + } else { + /* From 2.11 to ... */ + ret = cmd_recv_stream_2_11(payload, &path_name, + &channel_name, &tracefile_size, &tracefile_count, + &stream_chunk_id.value); + stream_chunk_id.is_set = true; } + if (ret < 0) { goto send_reply; } + if (conform_channel_path(path_name)) { + goto send_reply; + } + trace = ctf_trace_get_by_path_or_create(session, path_name); if (!trace) { goto send_reply; @@ -1221,7 +1295,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, /* We pass ownership of path_name and channel_name. */ stream = stream_create(trace, stream_handle, path_name, - channel_name, tracefile_size, tracefile_count); + channel_name, tracefile_size, tracefile_count); path_name = NULL; channel_name = NULL; @@ -1516,11 +1590,14 @@ end: * Return 0 on success, -1 on error. */ static -int create_rotate_index_file(struct relay_stream *stream) +int create_rotate_index_file(struct relay_stream *stream, + const char *channel_path) { int ret; uint32_t major, minor; + ASSERT_LOCKED(stream->lock); + /* Put ref on previous index_file. */ if (stream->index_file) { lttng_index_file_put(stream->index_file); @@ -1528,12 +1605,26 @@ int create_rotate_index_file(struct relay_stream *stream) } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, - stream->channel_name, - -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa), + if (!stream->trace->index_folder_created) { + char *index_subpath = NULL; + + ret = asprintf(&index_subpath, "%s/%s", channel_path, DEFAULT_INDEX_DIR); + if (ret < 0) { + goto end; + } + + ret = lttng_trace_chunk_create_subdirectory(stream->trace_chunk, index_subpath); + free(index_subpath); + if (ret) { + goto end; + } + stream->trace->index_folder_created = true; + } + stream->index_file = lttng_index_file_create_from_trace_chunk( + stream->trace_chunk, channel_path, stream->channel_name, + stream->tracefile_size, stream->tracefile_count, lttng_to_index_major(major, minor), - lttng_to_index_minor(major, minor)); + lttng_to_index_minor(major, minor), true); if (!stream->index_file) { ret = -1; goto end; @@ -1546,10 +1637,12 @@ end: } static -int do_rotate_stream(struct relay_stream *stream) +int do_rotate_stream_data(struct relay_stream *stream) { int ret; + DBG("Rotating stream %" PRIu64 " data file", + stream->stream_handle); /* Perform the stream rotation. */ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, @@ -1561,19 +1654,17 @@ int do_rotate_stream(struct relay_stream *stream) goto end; } stream->tracefile_size_current = 0; - - /* Rotate also the index if the stream is not a metadata stream. */ - if (!stream->is_metadata) { - ret = create_rotate_index_file(stream); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end; - } - } - - stream->rotate_at_seq_num = -1ULL; stream->pos_after_last_complete_data_index = 0; + stream->data_rotated = true; + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } end: return ret; } @@ -1585,9 +1676,7 @@ end: * connections are separate, the indexes as well as the commands arrive from * the control connection and we have no control over the order so we could be * in a situation where too much data has been received on the data connection - * before the rotation command on the control connection arrives. We don't need - * to update the index because its order is guaranteed with the rotation - * command message. + * before the rotation command on the control connection arrives. */ static int rotate_truncate_stream(struct relay_stream *stream) @@ -1618,7 +1707,7 @@ int rotate_truncate_stream(struct relay_stream *stream) /* * Rewind the current tracefile to the position at which the rotation - * should have occured. + * should have occurred. */ lseek_ret = lseek(stream->stream_fd->fd, stream->pos_after_last_complete_data_index, SEEK_SET); @@ -1686,12 +1775,6 @@ int rotate_truncate_stream(struct relay_stream *stream) goto end; } - ret = create_rotate_index_file(stream); - if (ret < 0) { - ERR("Rotate stream index file"); - goto end; - } - /* * Update the offset and FD of all the eventual indexes created by the * data connection before the rotation command arrived. @@ -1714,27 +1797,100 @@ end: } /* - * Check if a stream should perform a rotation (for session rotation). + * Check if a stream's index file should be rotated (for session rotation). * Must be called with the stream lock held. * * Return 0 on success, a negative value on error. */ static -int try_rotate_stream(struct relay_stream *stream) +int try_rotate_stream_index(struct relay_stream *stream) { int ret = 0; - /* No rotation expected. */ if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ goto end; } - if (stream->prev_seq < stream->rotate_at_seq_num || - stream->prev_seq == -1ULL) { - DBG("Stream %" PRIu64 " no yet ready for rotation", + if (stream->index_rotated) { + /* Rotation of the index has already occurred. */ + goto end; + } + + if (stream->prev_index_seq == -1ULL || + stream->prev_index_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_index_seq); + goto end; + } else if (stream->prev_index_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq, + stream->prev_index_seq); + ret = -1; + goto end; + } else { + DBG("Rotating stream %" PRIu64 " index file", stream->stream_handle); + ret = create_rotate_index_file(stream, stream->path_name); + stream->index_rotated = true; + + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } + } + +end: + return ret; +} + +/* + * Check if a stream's data file (as opposed to index) should be rotated + * (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int try_rotate_stream_data(struct relay_stream *stream) +{ + int ret = 0; + + if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ goto end; - } else if (stream->prev_seq > stream->rotate_at_seq_num) { + } + + if (stream->data_rotated) { + /* Rotation of the data file has already occurred. */ + goto end; + } + + if (stream->prev_data_seq == -1ULL || + stream->prev_data_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); + goto end; + } else if (stream->prev_data_seq > stream->rotate_at_seq_num) { + /* + * prev_data_seq is checked here since indexes and rotation + * commands are serialized with respect to each other. + */ DBG("Rotation after too much data has been written in tracefile " "for stream %" PRIu64 ", need to truncate before " "rotating", stream->stream_handle); @@ -1743,11 +1899,20 @@ int try_rotate_stream(struct relay_stream *stream) ERR("Failed to truncate stream"); goto end; } + } else if (stream->prev_data_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); + ret = -1; + goto end; } else { - /* stream->prev_seq == stream->rotate_at_seq_num */ - DBG("Stream %" PRIu64 " ready for rotation", - stream->stream_handle); - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } end: @@ -1808,7 +1973,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, metadata_payload_header.padding_size); - if (size_ret < 0) { + if (size_ret < (int64_t) metadata_payload_header.padding_size) { ret = -1; goto end_put; } @@ -1818,7 +1983,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); - ret = try_rotate_stream(metadata_stream); + ret = try_rotate_stream_data(metadata_stream); if (ret < 0) { goto end_put; } @@ -1913,6 +2078,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_stream *stream; ssize_t send_ret; int ret; + uint64_t stream_seq; DBG("Data pending command received"); @@ -1940,12 +2106,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&stream->lock); - DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 - " and last_seq %" PRIu64, msg.stream_id, - stream->prev_seq, msg.last_net_seq_num); + if (session_streams_have_index(session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64 + ", prev_index_seq %" PRIu64 + ", and last_seq %" PRIu64, msg.stream_id, + stream->prev_data_seq, stream->prev_index_seq, + msg.last_net_seq_num); /* Avoid wrapping issue */ - if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) { + if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) { /* Data has in fact been written and is NOT pending */ ret = 0; } else { @@ -2165,7 +2342,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&stream->lock); if (!stream->data_pending_check_done) { - if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, stream->stream_handle); @@ -2242,8 +2430,12 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.timestamp_end = be64toh(index_info.timestamp_end); index_info.events_discarded = be64toh(index_info.events_discarded); index_info.stream_id = be64toh(index_info.stream_id); - index_info.stream_instance_id = be64toh(index_info.stream_instance_id); - index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + + if (conn->minor >= 8) { + index_info.stream_instance_id = + be64toh(index_info.stream_instance_id); + index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } stream = stream_get_by_id(index_info.relay_stream_id); if (!stream) { @@ -2292,12 +2484,23 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info.net_seq_num; + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_put; + } } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -2386,8 +2589,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr size_t path_len; struct lttng_buffer_view new_path_view; - DBG("Rotate stream received"); - if (!session || !conn->version_check_done) { ERR("Trying to rotate a stream before version check"); ret = -1; @@ -2448,7 +2649,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr * Update the trace path (just the folder, the stream name does not * change). */ - free(stream->path_name); + free(stream->prev_path_name); + stream->prev_path_name = stream->path_name; stream->path_name = create_output_path(new_path_view.data); if (!stream->path_name) { ERR("Failed to create a new output path"); @@ -2463,21 +2665,29 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr goto end_stream_unlock; } - stream->chunk_id = stream_info.new_chunk_id; - if (stream->is_metadata) { + /* + * Metadata streams have no index; consider its rotation + * complete. + */ + stream->index_rotated = true; /* * The metadata stream is sent only over the control connection * so we know we have all the data to perform the stream * rotation. */ - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } else { stream->rotate_at_seq_num = stream_info.rotate_at_seq_num; - ret = try_rotate_stream(stream); - } - if (ret < 0) { - goto end_stream_unlock; + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end_stream_unlock; + } + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } end_stream_unlock: @@ -2502,356 +2712,308 @@ end_no_reply: return ret; } -/* - * relay_mkdir: Create a folder on the disk. - */ -static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr, - struct relay_connection *conn, - const struct lttng_buffer_view *payload) +static int init_session_output_directory_handle(struct relay_session *session, + struct lttng_directory_handle *handle) { int ret; - struct relay_session *session = conn->session; - struct lttcomm_relayd_mkdir path_info_header; - struct lttcomm_relayd_generic_reply reply; - char *path = NULL; - size_t header_len; - ssize_t send_ret; - struct lttng_buffer_view path_view; - - if (!session || !conn->version_check_done) { - ERR("Trying to create a directory before version check"); - ret = -1; - goto end_no_session; - } + /* hostname/session_name */ + char *session_directory = NULL; + /* + * base path + session_directory + * e.g. /home/user/lttng-traces/hostname/session_name + */ + char *full_session_path = NULL; + char creation_time_str[16]; + struct tm *timeinfo; - if (session->major == 2 && session->minor < 11) { - /* - * This client is not supposed to use this command since - * it predates its introduction. - */ - ERR("relay_mkdir command is unsupported before LTTng 2.11"); + assert(session->creation_time.is_set); + timeinfo = localtime(&session->creation_time.value); + if (!timeinfo) { ret = -1; - goto end_no_session; + goto end; } + strftime(creation_time_str, sizeof(creation_time_str), "%Y%m%d-%H%M%S", + timeinfo); - header_len = sizeof(path_info_header); - if (payload->size < header_len) { - ERR("Unexpected payload size in \"relay_mkdir\": expected >= %zu bytes, got %zu bytes", - header_len, payload->size); - ret = -1; - goto end_no_session; + pthread_mutex_lock(&session->lock); + ret = asprintf(&session_directory, "%s/%s-%s", session->hostname, + session->session_name, creation_time_str); + pthread_mutex_unlock(&session->lock); + if (ret < 0) { + PERROR("Failed to format session directory name"); + goto end; } - memcpy(&path_info_header, payload->data, header_len); - - path_info_header.length = be32toh(path_info_header.length); - - if (payload->size < header_len + path_info_header.length) { - ERR("Unexpected payload size in \"relay_mkdir\" including path: expected >= %zu bytes, got %zu bytes", - header_len + path_info_header.length, payload->size); + full_session_path = create_output_path(session_directory); + if (!full_session_path) { ret = -1; - goto end_no_session; - } - - /* Ensure that it fits in local path length. */ - if (path_info_header.length >= LTTNG_PATH_MAX) { - ret = -ENAMETOOLONG; - ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)", - path_info_header.length, LTTNG_PATH_MAX); goto end; } - path_view = lttng_buffer_view_from_view(payload, header_len, - path_info_header.length); - - path = create_output_path(path_view.data); - if (!path) { - ERR("Failed to create output path"); - ret = -1; + ret = utils_mkdir_recursive( + full_session_path, S_IRWXU | S_IRWXG, -1, -1); + if (ret) { + ERR("Failed to create session output path \"%s\"", + full_session_path); goto end; } - ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); - if (ret < 0) { - ERR("relay creating output directory"); + ret = lttng_directory_handle_init(handle, full_session_path); + if (ret) { goto end; } - - ret = 0; - end: - memset(&reply, 0, sizeof(reply)); - if (ret < 0) { - reply.ret_code = htobe32(LTTNG_ERR_UNK); - } else { - reply.ret_code = htobe32(LTTNG_OK); - } - send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); - if (send_ret < (ssize_t) sizeof(reply)) { - ERR("Failed to send \"mkdir\" command reply (ret = %zd)", send_ret); - ret = -1; - } - -end_no_session: - free(path); - return ret; -} - -static int validate_rotate_rename_path_length(const char *path_type, - uint32_t path_length) -{ - int ret = 0; - - if (path_length > LTTNG_PATH_MAX) { - ret = -ENAMETOOLONG; - ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes", - path_type, path_length, LTTNG_PATH_MAX); - } else if (path_length == 0) { - ret = -EINVAL; - ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type); - } + free(session_directory); + free(full_session_path); return ret; } /* - * relay_rotate_rename: rename the trace folder after a rotation is - * completed. We are not closing any fd here, just moving the folder, so it - * works even if data is still in-flight. + * relay_create_trace_chunk: create a new trace chunk */ -static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn, const struct lttng_buffer_view *payload) { - int ret; + int ret = 0; ssize_t send_ret; struct relay_session *session = conn->session; - struct lttcomm_relayd_generic_reply reply; - struct lttcomm_relayd_rotate_rename header; - size_t header_len; - size_t received_paths_size; - char *complete_old_path = NULL, *complete_new_path = NULL; - struct lttng_buffer_view old_path_view; - struct lttng_buffer_view new_path_view; + struct lttcomm_relayd_create_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_buffer_view chunk_name_view; + struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; + struct lttng_directory_handle session_output; if (!session || !conn->version_check_done) { - ERR("Trying to rename a trace folder before version check"); + ERR("Trying to create a trace chunk before version check"); ret = -1; goto end_no_reply; } if (session->major == 2 && session->minor < 11) { - ERR("relay_rotate_rename command is unsupported before LTTng 2.11"); + ERR("Chunk creation command is unsupported before 2.11"); ret = -1; goto end_no_reply; } - header_len = sizeof(header); - if (payload->size < header_len) { - ERR("Unexpected payload size in \"relay_rotate_rename\": expected >= %zu bytes, got %zu bytes", - header_len, payload->size); + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk creation command"); ret = -1; goto end_no_reply; } - memcpy(&header, payload->data, header_len); - - header.old_path_length = be32toh(header.old_path_length); - header.new_path_length = be32toh(header.new_path_length); - received_paths_size = header.old_path_length + header.new_path_length; + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + msg->chunk_id = be64toh(msg->chunk_id); + msg->creation_timestamp = be64toh(msg->creation_timestamp); + msg->override_name_length = be32toh(msg->override_name_length); - if (payload->size < header_len + received_paths_size) { - ERR("Unexpected payload size in \"relay_rotate_rename\" including paths: expected >= %zu bytes, got %zu bytes", - header_len, payload->size); + chunk = lttng_trace_chunk_create( + msg->chunk_id, msg->creation_timestamp); + if (!chunk) { + ERR("Failed to create trace chunk in trace chunk creation command"); ret = -1; - goto end_no_reply; - } - - /* Ensure the paths don't exceed their allowed size. */ - ret = validate_rotate_rename_path_length("old", header.old_path_length); - if (ret) { - goto end; - } - ret = validate_rotate_rename_path_length("new", header.new_path_length); - if (ret) { + reply_code = LTTNG_ERR_NOMEM; goto end; } - old_path_view = lttng_buffer_view_from_view(payload, header_len, - header.old_path_length); - new_path_view = lttng_buffer_view_from_view(payload, - header_len + header.old_path_length, - header.new_path_length); + if (msg->override_name_length) { + const char *name; - /* Validate that both paths received are NULL terminated. */ - if (old_path_view.data[old_path_view.size - 1] != '\0') { - ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)"); - ret = -1; - goto end; + chunk_name_view = lttng_buffer_view_from_view(payload, + sizeof(*msg), + msg->override_name_length); + name = chunk_name_view.data; + if (!name || name[msg->override_name_length - 1]) { + ERR("Failed to receive payload of chunk creation command"); + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; + } + + chunk_status = lttng_trace_chunk_override_name( + chunk, chunk_name_view.data); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + break; + case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)"); + reply_code = LTTNG_ERR_INVALID; + ret = -1; + goto end; + default: + ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)"); + reply_code = LTTNG_ERR_UNK; + ret = -1; + goto end; + } } - if (new_path_view.data[new_path_view.size - 1] != '\0') { - ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)"); - ret = -1; + + ret = init_session_output_directory_handle( + conn->session, &session_output); + if (ret) { + reply_code = LTTNG_ERR_CREATE_DIR_FAIL; goto end; } - complete_old_path = create_output_path(old_path_view.data); - if (!complete_old_path) { - ERR("Failed to build old output path in rotate_rename command"); + chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; ret = -1; goto end; } - complete_new_path = create_output_path(new_path_view.data); - if (!complete_new_path) { - ERR("Failed to build new output path in rotate_rename command"); + chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + reply_code = LTTNG_ERR_UNK; ret = -1; goto end; } - ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, - -1, -1); - if (ret < 0) { - ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"", - complete_new_path); - goto end; - } + published_chunk = sessiond_trace_chunk_registry_publish_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk); + if (!published_chunk) { + char uuid_str[UUID_STR_LEN]; - /* - * If a domain has not yet created its channel, the domain-specific - * folder might not exist, but this is not an error. - */ - ret = rename(complete_old_path, complete_new_path); - if (ret < 0 && errno != ENOENT) { - PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"", - complete_old_path, complete_new_path); + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; goto end; } - ret = 0; + + pthread_mutex_lock(&conn->session->lock); + lttng_trace_chunk_put(conn->session->current_trace_chunk); + conn->session->current_trace_chunk = published_chunk; + pthread_mutex_unlock(&conn->session->lock); + published_chunk = NULL; end: - memset(&reply, 0, sizeof(reply)); - if (ret < 0) { - reply.ret_code = htobe32(LTTNG_ERR_UNK); - } else { - reply.ret_code = htobe32(LTTNG_OK); - } - send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(reply), 0); - if (send_ret < sizeof(reply)) { - ERR("Failed to send \"rotate rename\" command reply (ret = %zd)", + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", send_ret); ret = -1; } - end_no_reply: - free(complete_old_path); - free(complete_new_path); + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(published_chunk); + lttng_directory_handle_fini(&session_output); return ret; } /* - * Check if all the streams in the session have completed the last rotation. - * The chunk_id value is used to distinguish the cases where a stream was - * closed on the consumerd before the rotation started but it still active on - * the relayd, and the case where a stream appeared on the consumerd/relayd - * after the last rotation started (in that case, it is already writing in the - * new chunk folder). + * relay_close_trace_chunk: close a trace chunk */ -static -int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn, const struct lttng_buffer_view *payload) { - struct relay_session *session = conn->session; - struct lttcomm_relayd_rotate_pending msg; - struct lttcomm_relayd_rotate_pending_reply reply; - struct lttng_ht_iter iter; - struct relay_stream *stream; int ret = 0; ssize_t send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_close_trace_chunk *msg; + struct lttcomm_relayd_generic_reply reply = {}; + struct lttng_buffer_view header_view; + struct lttng_trace_chunk *chunk = NULL; + enum lttng_error_code reply_code = LTTNG_OK; + enum lttng_trace_chunk_status chunk_status; uint64_t chunk_id; - bool rotate_pending = false; - - DBG("Rotate pending command received"); + LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command; + time_t close_timestamp; if (!session || !conn->version_check_done) { - ERR("Trying to check for data before version check"); + ERR("Trying to close a trace chunk before version check"); ret = -1; goto end_no_reply; } if (session->major == 2 && session->minor < 11) { - ERR("Unsupported feature before 2.11"); + ERR("Chunk close command is unsupported before 2.11"); ret = -1; goto end_no_reply; } - if (payload->size < sizeof(msg)) { - ERR("Unexpected payload size in \"relay_rotate_pending\": expected >= %zu bytes, got %zu bytes", - sizeof(msg), payload->size); + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk close command"); ret = -1; goto end_no_reply; } - memcpy(&msg, payload->data, sizeof(msg)); - - chunk_id = be64toh(msg.chunk_id); + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + chunk_id = be64toh(msg->chunk_id); + close_timestamp = (time_t) be64toh(msg->close_timestamp); + close_command = (typeof(close_command)){ + .value = be32toh(msg->close_command.value), + .is_set = msg->close_command.is_set, + }; + + chunk = sessiond_trace_chunk_registry_get_chunk( + sessiond_trace_chunk_registry, + conn->session->sessiond_uuid, + conn->session->id, + chunk_id); + if (!chunk) { + char uuid_str[UUID_STR_LEN]; + + lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); + ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, + uuid_str, + conn->session->id, + msg->chunk_id); + ret = -1; + reply_code = LTTNG_ERR_NOMEM; + goto end; + } - DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + chunk_status = lttng_trace_chunk_set_close_timestamp( + chunk, close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk close timestamp"); + ret = -1; + reply_code = LTTNG_ERR_UNK; + goto end; + } - /* - * Iterate over all the streams in the session and check if they are - * still waiting for data to perform their rotation. - */ - rcu_read_lock(); - cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - node.node) { - if (!stream_get(stream)) { - continue; - } - if (stream->trace->session != session) { - stream_put(stream); - continue; - } - pthread_mutex_lock(&stream->lock); - if (stream->rotate_at_seq_num != -1ULL) { - /* We have not yet performed the rotation. */ - rotate_pending = true; - DBG("Stream %" PRIu64 " is still rotating", - stream->stream_handle); - } else if (stream->chunk_id < chunk_id) { - /* - * Stream closed on the consumer but still active on the - * relay. - */ - rotate_pending = true; - DBG("Stream %" PRIu64 " did not exist on the consumer " - "when the last rotation started, but is" - "still waiting for data before getting" - "closed", - stream->stream_handle); - } - pthread_mutex_unlock(&stream->lock); - stream_put(stream); - if (rotate_pending) { - goto send_reply; + if (close_command.is_set) { + chunk_status = lttng_trace_chunk_set_close_command( + chunk, close_command.value); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + reply_code = LTTNG_ERR_INVALID; + goto end; } } -send_reply: - rcu_read_unlock(); - memset(&reply, 0, sizeof(reply)); - reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK); - reply.is_pending = (uint8_t) !!rotate_pending; - send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, - sizeof(reply), 0); +end: + reply.ret_code = htobe32((uint32_t) reply_code); + send_ret = conn->sock->ops->sendmsg(conn->sock, + &reply, + sizeof(struct lttcomm_relayd_generic_reply), + 0); if (send_ret < (ssize_t) sizeof(reply)) { - ERR("Failed to send \"rotate pending\" command reply (ret = %zd)", + ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", send_ret); ret = -1; } - end_no_reply: + lttng_trace_chunk_put(chunk); return ret; } @@ -2921,17 +3083,13 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_ROTATE_STREAM", conn); ret = relay_rotate_session_stream(header, conn, payload); break; - case RELAYD_ROTATE_RENAME: - DBG_CMD("RELAYD_ROTATE_RENAME", conn); - ret = relay_rotate_rename(header, conn, payload); - break; - case RELAYD_ROTATE_PENDING: - DBG_CMD("RELAYD_ROTATE_PENDING", conn); - ret = relay_rotate_pending(header, conn, payload); + case RELAYD_CREATE_TRACE_CHUNK: + DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); + ret = relay_create_trace_chunk(header, conn, payload); break; - case RELAYD_MKDIR: - DBG_CMD("RELAYD_MKDIR", conn); - ret = relay_mkdir(header, conn, payload); + case RELAYD_CLOSE_TRACE_CHUNK: + DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn); + ret = relay_close_trace_chunk(header, conn, payload); break; case RELAYD_UPDATE_SYNC_INFO: default: @@ -2945,9 +3103,11 @@ end: return ret; } -static int relay_process_control_receive_payload(struct relay_connection *conn) +static enum relay_connection_status relay_process_control_receive_payload( + struct relay_connection *conn) { int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct lttng_dynamic_buffer *reception_buffer = &conn->protocol.ctrl.reception_buffer; struct ctrl_connection_state_receive_payload *state = @@ -2963,11 +3123,15 @@ static int relay_process_control_receive_payload(struct relay_connection *conn) reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive command payload on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive command payload on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -2985,7 +3149,6 @@ static int relay_process_control_receive_payload(struct relay_connection *conn) DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3004,17 +3167,23 @@ reception_complete: ret = relay_process_control_command(conn, &state->header, &payload_view); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } ret = connection_reset_protocol_state(conn); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + } end: - return ret; + return status; } -static int relay_process_control_receive_header(struct relay_connection *conn) +static enum relay_connection_status relay_process_control_receive_header( + struct relay_connection *conn) { int ret = 0; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct lttcomm_relayd_hdr header; struct lttng_dynamic_buffer *reception_buffer = &conn->protocol.ctrl.reception_buffer; @@ -3027,11 +3196,15 @@ static int relay_process_control_receive_header(struct relay_connection *conn) reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive control command header on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive control command header on sock %d", + conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -3049,7 +3222,6 @@ static int relay_process_control_receive_header(struct relay_connection *conn) DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3068,11 +3240,10 @@ static int relay_process_control_receive_header(struct relay_connection *conn) conn->sock->fd, header.cmd, header.cmd_version, header.data_size); - /* FIXME temporary arbitrary limit on data size. */ - if (header.data_size > (128 * 1024 * 1024)) { + if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", header.data_size); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3082,6 +3253,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn) ret = lttng_dynamic_buffer_set_size(reception_buffer, header.data_size); if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3090,32 +3262,33 @@ static int relay_process_control_receive_header(struct relay_connection *conn) * Manually invoke the next state as the poll loop * will not wake-up to allow us to proceed further. */ - ret = relay_process_control_receive_payload(conn); + status = relay_process_control_receive_payload(conn); } end: - return ret; + return status; } /* * Process the commands received on the control socket */ -static int relay_process_control(struct relay_connection *conn) +static enum relay_connection_status relay_process_control( + struct relay_connection *conn) { - int ret = 0; + enum relay_connection_status status; switch (conn->protocol.ctrl.state_id) { case CTRL_CONNECTION_STATE_RECEIVE_HEADER: - ret = relay_process_control_receive_header(conn); + status = relay_process_control_receive_header(conn); break; case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD: - ret = relay_process_control_receive_payload(conn); + status = relay_process_control_receive_payload(conn); break; default: ERR("Unknown control connection protocol state encountered."); abort(); } - return ret; + return status; } /* @@ -3150,7 +3323,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } if (rotate_index || !stream->index_file) { - ret = create_rotate_index_file(stream); + const char *stream_path; + + /* + * The data connection creates the stream's first index file. + * + * This can happen _after_ a ROTATE_STREAM command. In + * other words, the data of the first packet of this stream + * can be received after a ROTATE_STREAM command. + * + * The ROTATE_STREAM command changes the stream's path_name + * to point to the "next" chunk. If a rotation is pending for + * this stream, as indicated by "rotate_at_seq_num != -1ULL", + * it means that we are still receiving data that belongs in the + * stream's former path. + * + * In this very specific case, we must ensure that the index + * file is created in the streams's former path, + * "prev_path_name". + * + * All other rotations beyond the first one are not affected + * by this problem since the actual rotation operation creates + * the new chunk's index file. + */ + stream_path = stream->rotate_at_seq_num == -1ULL ? + stream->path_name: + stream->prev_path_name; + + ret = create_rotate_index_file(stream, stream_path); if (ret < 0) { ERR("Failed to rotate index"); /* Put self-ref for this index due to error. */ @@ -3178,18 +3378,24 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: return ret; } -static int relay_process_data_receive_header(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_header( + struct relay_connection *conn) { int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct data_connection_state_receive_header *state = &conn->protocol.data.state.receive_header; struct lttcomm_relayd_data_hdr header; @@ -3201,12 +3407,15 @@ static int relay_process_data_receive_header(struct relay_connection *conn) state->header_reception_buffer + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { - ERR("Unable to receive data header on sock %d", conn->sock->fd); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Unable to receive data header on sock %d", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end; } else if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd); - ret = -1; + status = RELAY_CONNECTION_STATUS_CLOSED; goto end; } @@ -3224,7 +3433,6 @@ static int relay_process_data_receive_header(struct relay_connection *conn) DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3253,7 +3461,8 @@ static int relay_process_data_receive_header(struct relay_connection *conn) if (!stream) { DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, header.stream_id); - ret = 0; + /* Protocol error. */ + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } @@ -3278,6 +3487,7 @@ static int relay_process_data_receive_header(struct relay_connection *conn) &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Failed to rotate stream output file"); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3289,17 +3499,18 @@ static int relay_process_data_receive_header(struct relay_connection *conn) conn->protocol.data.state.receive_payload.rotate_index = true; } - ret = 0; end_stream_unlock: pthread_mutex_unlock(&stream->lock); stream_put(stream); end: - return ret; + return status; } -static int relay_process_data_receive_payload(struct relay_connection *conn) +static enum relay_connection_status relay_process_data_receive_payload( + struct relay_connection *conn) { int ret; + enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK; struct relay_stream *stream; struct data_connection_state_receive_payload *state = &conn->protocol.data.state.receive_payload; @@ -3310,20 +3521,28 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", + state->header.stream_id, state->header.net_seq_num, + state->received, left_to_receive); + stream = stream_get_by_id(state->header.stream_id); if (!stream) { - DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, + /* Protocol error. */ + ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64, state->header.stream_id); - ret = 0; + status = RELAY_CONNECTION_STATUS_ERROR; goto end; } pthread_mutex_lock(&stream->lock); session = stream->trace->session; - - DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", - state->header.stream_id, state->header.net_seq_num, - state->received, left_to_receive); + if (!conn->session) { + ret = connection_set_session(conn, session); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + } /* * The size of the "chunk" received on any iteration is bounded by: @@ -3338,13 +3557,16 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, MSG_DONTWAIT); if (ret < 0) { - ERR("Socket %d error %d", conn->sock->fd, ret); - ret = -1; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PERROR("Socket %d error", conn->sock->fd); + status = RELAY_CONNECTION_STATUS_ERROR; + } goto end_stream_unlock; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, state->header.stream_id); + status = RELAY_CONNECTION_STATUS_CLOSED; break; } else if (ret < (int) recv_size) { /* @@ -3361,7 +3583,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); - ret = -1; + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3381,27 +3603,28 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - ret = 0; goto end_stream_unlock; } ret = write_padding_to_file(stream->stream_fd->fd, state->header.padding_size); - if (ret < 0) { + if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } - if (session->minor >= 4 && !session->snapshot) { + if (session_streams_have_index(session)) { ret = handle_index_data(stream, state->header.net_seq_num, state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size); if (ret < 0) { ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } } @@ -3409,15 +3632,20 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) stream->tracefile_size_current += state->header.data_size + state->header.padding_size; - if (stream->prev_seq == -1ULL) { + if (stream->prev_data_seq == -1ULL) { new_stream = true; } if (index_flushed) { stream->pos_after_last_complete_data_index = stream->tracefile_size_current; + stream->prev_index_seq = state->header.net_seq_num; + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } - stream->prev_seq = state->header.net_seq_num; + stream->prev_data_seq = state->header.net_seq_num; /* * Resetting the protocol state (to RECEIVE_HEADER) will trash the @@ -3427,8 +3655,9 @@ static int relay_process_data_receive_payload(struct relay_connection *conn) connection_reset_protocol_state(conn); state = NULL; - ret = try_rotate_stream(stream); + ret = try_rotate_stream_data(stream); if (ret < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; } @@ -3447,29 +3676,30 @@ end_stream_unlock: stream_put(stream); end: - return ret; + return status; } /* * relay_process_data: Process the data received on the data socket */ -static int relay_process_data(struct relay_connection *conn) +static enum relay_connection_status relay_process_data( + struct relay_connection *conn) { - int ret; + enum relay_connection_status status; switch (conn->protocol.data.state_id) { case DATA_CONNECTION_STATE_RECEIVE_HEADER: - ret = relay_process_data_receive_header(conn); + status = relay_process_data_receive_header(conn); break; case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD: - ret = relay_process_data_receive_payload(conn); + status = relay_process_data_receive_payload(conn); break; default: ERR("Unexpected data connection communication state."); abort(); } - return ret; + return status; } static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) @@ -3585,14 +3815,6 @@ restart: health_code_update(); - if (!revents) { - /* - * No activity for this FD (poll - * implementation). - */ - continue; - } - /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -3641,9 +3863,31 @@ restart: assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { - ret = relay_process_control(ctrl_conn); - if (ret < 0) { - /* Clear the connection on error. */ + enum relay_connection_status status; + + status = relay_process_control(ctrl_conn); + if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the control connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(ctrl_conn->session); + } + + /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, ctrl_conn); @@ -3717,9 +3961,30 @@ restart: assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - ret = relay_process_data(data_conn); - /* Connection closed */ - if (ret < 0) { + enum relay_connection_status status; + + status = relay_process_data(data_conn); + /* Connection closed or error. */ + if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the data connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(data_conn->session); + } relay_thread_close_connection(&events, pollfd, data_conn); /* @@ -3751,16 +4016,14 @@ restart: exit: error: - /* Cleanup reamaining connection object. */ + /* Cleanup remaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { health_code_update(); - if (session_abort(destroy_conn->session)) { - assert(0); - } + session_abort(destroy_conn->session); /* * No need to grab another ref, because we own @@ -3863,6 +4126,13 @@ int main(int argc, char **argv) } } + sessiond_trace_chunk_registry = sessiond_trace_chunk_registry_create(); + if (!sessiond_trace_chunk_registry) { + ERR("Failed to initialize session daemon trace chunk registry"); + retval = -1; + goto exit_sessiond_trace_chunk_registry; + } + /* Initialize thread health monitoring */ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); if (!health_relayd) { @@ -4012,7 +4282,9 @@ exit_health_quit_pipe: exit_init_data: health_app_destroy(health_relayd); + sessiond_trace_chunk_registry_destroy(sessiond_trace_chunk_registry); exit_health_app_create: +exit_sessiond_trace_chunk_registry: exit_options: /* * Wait for all pending call_rcu work to complete before tearing