X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=1e2e9050fef2d6d77ca78e9767a2fea759ba99be;hp=569a3a4707862c8fa1cc51c2c4ee575615c6ad8f;hb=c6db3843828a8fbf08444a2bc4191291a4807936;hpb=71efa8ef5fdf66ce41199a4735cf88eff0b34174 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 569a3a470..1e2e9050f 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1083,6 +1083,11 @@ static int set_index_control_data(struct relay_index *index, return relay_index_set_data(index, &index_data); } +static bool session_streams_have_index(const struct relay_session *session) +{ + return session->minor >= 4 && !session->snapshot; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -1106,16 +1111,19 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, memset(&reply, 0, sizeof(reply)); - switch (conn->minor) { - case 1: - case 2: - case 3: - break; - case 4: /* LTTng sessiond 2.4 */ - default: + if (conn->minor < 4) { + /* From 2.1 to 2.3 */ + ret = 0; + } else if (conn->minor >= 4 && conn->minor < 11) { + /* From 2.4 to 2.10 */ ret = cmd_create_session_2_4(payload, session_name, hostname, &live_timer, &snapshot); + } else { + /* From 2.11 to ... */ + ret = cmd_create_session_2_11(payload, session_name, + hostname, &live_timer, &snapshot); } + if (ret < 0) { goto send_reply; } @@ -1195,6 +1203,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, uint64_t stream_handle = -1ULL; char *path_name = NULL, *channel_name = NULL; uint64_t tracefile_size = 0, tracefile_count = 0; + struct relay_stream_chunk_id stream_chunk_id = { 0 }; if (!session || !conn->version_check_done) { ERR("Trying to add a stream before version check"); @@ -1202,17 +1211,22 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - switch (session->minor) { - case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */ + if (session->minor == 1) { + /* For 2.1 */ ret = cmd_recv_stream_2_1(payload, &path_name, &channel_name); - break; - case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */ - default: + } else if (session->minor > 1 && session->minor < 11) { + /* From 2.2 to 2.10 */ ret = cmd_recv_stream_2_2(payload, &path_name, &channel_name, &tracefile_size, &tracefile_count); - break; + } else { + /* From 2.11 to ... */ + ret = cmd_recv_stream_2_11(payload, &path_name, + &channel_name, &tracefile_size, &tracefile_count, + &stream_chunk_id.value); + stream_chunk_id.is_set = true; } + if (ret < 0) { goto send_reply; } @@ -1229,7 +1243,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, /* We pass ownership of path_name and channel_name. */ stream = stream_create(trace, stream_handle, path_name, - channel_name, tracefile_size, tracefile_count); + channel_name, tracefile_size, tracefile_count, + &stream_chunk_id); path_name = NULL; channel_name = NULL; @@ -1524,7 +1539,8 @@ end: * Return 0 on success, -1 on error. */ static -int create_rotate_index_file(struct relay_stream *stream) +int create_rotate_index_file(struct relay_stream *stream, + const char *stream_path) { int ret; uint32_t major, minor; @@ -1536,7 +1552,7 @@ int create_rotate_index_file(struct relay_stream *stream) } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, + stream->index_file = lttng_index_file_create(stream_path, stream->channel_name, -1, -1, stream->tracefile_size, tracefile_array_get_file_index_head(stream->tfa), @@ -1554,10 +1570,12 @@ end: } static -int do_rotate_stream(struct relay_stream *stream) +int do_rotate_stream_data(struct relay_stream *stream) { int ret; + DBG("Rotating stream %" PRIu64 " data file", + stream->stream_handle); /* Perform the stream rotation. */ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, @@ -1569,19 +1587,17 @@ int do_rotate_stream(struct relay_stream *stream) goto end; } stream->tracefile_size_current = 0; - - /* Rotate also the index if the stream is not a metadata stream. */ - if (!stream->is_metadata) { - ret = create_rotate_index_file(stream); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end; - } - } - - stream->rotate_at_seq_num = -1ULL; stream->pos_after_last_complete_data_index = 0; + stream->data_rotated = true; + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } end: return ret; } @@ -1593,9 +1609,7 @@ end: * connections are separate, the indexes as well as the commands arrive from * the control connection and we have no control over the order so we could be * in a situation where too much data has been received on the data connection - * before the rotation command on the control connection arrives. We don't need - * to update the index because its order is guaranteed with the rotation - * command message. + * before the rotation command on the control connection arrives. */ static int rotate_truncate_stream(struct relay_stream *stream) @@ -1694,12 +1708,6 @@ int rotate_truncate_stream(struct relay_stream *stream) goto end; } - ret = create_rotate_index_file(stream); - if (ret < 0) { - ERR("Rotate stream index file"); - goto end; - } - /* * Update the offset and FD of all the eventual indexes created by the * data connection before the rotation command arrived. @@ -1722,27 +1730,100 @@ end: } /* - * Check if a stream should perform a rotation (for session rotation). + * Check if a stream's index file should be rotated (for session rotation). * Must be called with the stream lock held. * * Return 0 on success, a negative value on error. */ static -int try_rotate_stream(struct relay_stream *stream) +int try_rotate_stream_index(struct relay_stream *stream) { int ret = 0; - /* No rotation expected. */ if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ + goto end; + } + + if (stream->index_rotated) { + /* Rotation of the index has already occurred. */ goto end; } - if (stream->prev_seq < stream->rotate_at_seq_num || - stream->prev_seq == -1ULL) { - DBG("Stream %" PRIu64 " no yet ready for rotation", + if (stream->prev_index_seq == -1ULL || + stream->prev_index_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_index_seq); + goto end; + } else if (stream->prev_index_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq, + stream->prev_index_seq); + ret = -1; + goto end; + } else { + DBG("Rotating stream %" PRIu64 " index file", stream->stream_handle); + ret = create_rotate_index_file(stream, stream->path_name); + stream->index_rotated = true; + + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } + } + +end: + return ret; +} + +/* + * Check if a stream's data file (as opposed to index) should be rotated + * (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int try_rotate_stream_data(struct relay_stream *stream) +{ + int ret = 0; + + if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ + goto end; + } + + if (stream->data_rotated) { + /* Rotation of the data file has already occurred. */ + goto end; + } + + if (stream->prev_data_seq == -1ULL || + stream->prev_data_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); goto end; - } else if (stream->prev_seq > stream->rotate_at_seq_num) { + } else if (stream->prev_data_seq > stream->rotate_at_seq_num) { + /* + * prev_data_seq is checked here since indexes and rotation + * commands are serialized with respect to each other. + */ DBG("Rotation after too much data has been written in tracefile " "for stream %" PRIu64 ", need to truncate before " "rotating", stream->stream_handle); @@ -1751,11 +1832,20 @@ int try_rotate_stream(struct relay_stream *stream) ERR("Failed to truncate stream"); goto end; } + } else if (stream->prev_data_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); + ret = -1; + goto end; } else { - /* stream->prev_seq == stream->rotate_at_seq_num */ - DBG("Stream %" PRIu64 " ready for rotation", - stream->stream_handle); - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } end: @@ -1826,7 +1916,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); - ret = try_rotate_stream(metadata_stream); + ret = try_rotate_stream_data(metadata_stream); if (ret < 0) { goto end_put; } @@ -1921,6 +2011,7 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_stream *stream; ssize_t send_ret; int ret; + uint64_t stream_seq; DBG("Data pending command received"); @@ -1948,12 +2039,23 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&stream->lock); - DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 - " and last_seq %" PRIu64, msg.stream_id, - stream->prev_seq, msg.last_net_seq_num); + if (session_streams_have_index(session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64 + ", prev_index_seq %" PRIu64 + ", and last_seq %" PRIu64, msg.stream_id, + stream->prev_data_seq, stream->prev_index_seq, + msg.last_net_seq_num); /* Avoid wrapping issue */ - if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) { + if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) { /* Data has in fact been written and is NOT pending */ ret = 0; } else { @@ -2173,7 +2275,18 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&stream->lock); if (!stream->data_pending_check_done) { - if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, stream->stream_handle); @@ -2304,12 +2417,23 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info.net_seq_num; + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_put; + } } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -2460,7 +2584,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr * Update the trace path (just the folder, the stream name does not * change). */ - free(stream->path_name); + free(stream->prev_path_name); + stream->prev_path_name = stream->path_name; stream->path_name = create_output_path(new_path_view.data); if (!stream->path_name) { ERR("Failed to create a new output path"); @@ -2475,21 +2600,32 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr goto end_stream_unlock; } - stream->chunk_id = stream_info.new_chunk_id; + assert(stream->current_chunk_id.is_set); + stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { + /* + * Metadata streams have no index; consider its rotation + * complete. + */ + stream->index_rotated = true; /* * The metadata stream is sent only over the control connection * so we know we have all the data to perform the stream * rotation. */ - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } else { stream->rotate_at_seq_num = stream_info.rotate_at_seq_num; - ret = try_rotate_stream(stream); - } - if (ret < 0) { - goto end_stream_unlock; + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end_stream_unlock; + } + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } end_stream_unlock: @@ -2583,6 +2719,7 @@ static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } + DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path); ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); if (ret < 0) { ERR("relay creating output directory"); @@ -2706,6 +2843,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } + DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"", + old_path_view.data, new_path_view.data); complete_old_path = create_output_path(old_path_view.data); if (!complete_old_path) { ERR("Failed to build old output path in rotate_rename command"); @@ -2719,6 +2858,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"", + complete_old_path, complete_new_path); ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, -1, -1); @@ -2809,7 +2950,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, chunk_id = be64toh(msg.chunk_id); - DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64, + session->session_name, chunk_id); /* * Iterate over all the streams in the session and check if they are @@ -2831,7 +2973,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, rotate_pending = true; DBG("Stream %" PRIu64 " is still rotating", stream->stream_handle); - } else if (stream->chunk_id < chunk_id) { + } else if (stream->current_chunk_id.value < chunk_id) { /* * Stream closed on the consumer but still active on the * relay. @@ -3177,7 +3319,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } if (rotate_index || !stream->index_file) { - ret = create_rotate_index_file(stream); + const char *stream_path; + + /* + * The data connection creates the stream's first index file. + * + * This can happen _after_ a ROTATE_STREAM command. In + * other words, the data of the first packet of this stream + * can be received after a ROTATE_STREAM command. + * + * The ROTATE_STREAM command changes the stream's path_name + * to point to the "next" chunk. If a rotation is pending for + * this stream, as indicated by "rotate_at_seq_num != -1ULL", + * it means that we are still receiving data that belongs in the + * stream's former path. + * + * In this very specific case, we must ensure that the index + * file is created in the streams's former path, + * "prev_path_name". + * + * All other rotations beyond the first one are not affected + * by this problem since the actual rotation operation creates + * the new chunk's index file. + */ + stream_path = stream->rotate_at_seq_num == -1ULL ? + stream->path_name: + stream->prev_path_name; + + ret = create_rotate_index_file(stream, stream_path); if (ret < 0) { ERR("Failed to rotate index"); /* Put self-ref for this index due to error. */ @@ -3205,9 +3374,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: @@ -3344,10 +3517,14 @@ static enum relay_connection_status relay_process_data_receive_payload( uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", + state->header.stream_id, state->header.net_seq_num, + state->received, left_to_receive); + stream = stream_get_by_id(state->header.stream_id); if (!stream) { /* Protocol error. */ - DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, + ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64, state->header.stream_id); status = RELAY_CONNECTION_STATUS_ERROR; goto end; @@ -3355,10 +3532,13 @@ static enum relay_connection_status relay_process_data_receive_payload( pthread_mutex_lock(&stream->lock); session = stream->trace->session; - - DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", - state->header.stream_id, state->header.net_seq_num, - state->received, left_to_receive); + if (!conn->session) { + ret = connection_set_session(conn, session); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + } /* * The size of the "chunk" received on any iteration is bounded by: @@ -3433,7 +3613,7 @@ static enum relay_connection_status relay_process_data_receive_payload( } - if (session->minor >= 4 && !session->snapshot) { + if (session_streams_have_index(session)) { ret = handle_index_data(stream, state->header.net_seq_num, state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size); if (ret < 0) { @@ -3448,15 +3628,20 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->tracefile_size_current += state->header.data_size + state->header.padding_size; - if (stream->prev_seq == -1ULL) { + if (stream->prev_data_seq == -1ULL) { new_stream = true; } if (index_flushed) { stream->pos_after_last_complete_data_index = stream->tracefile_size_current; + stream->prev_index_seq = state->header.net_seq_num; + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } - stream->prev_seq = state->header.net_seq_num; + stream->prev_data_seq = state->header.net_seq_num; /* * Resetting the protocol state (to RECEIVE_HEADER) will trash the @@ -3466,7 +3651,7 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; - ret = try_rotate_stream(stream); + ret = try_rotate_stream_data(stream); if (ret < 0) { status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; @@ -3686,6 +3871,26 @@ restart: status = relay_process_control(ctrl_conn); if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the control connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(ctrl_conn->session); + } + /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, @@ -3765,6 +3970,25 @@ restart: status = relay_process_data(data_conn); /* Connection closed or error. */ if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the data connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(data_conn->session); + } relay_thread_close_connection(&events, pollfd, data_conn); /* @@ -3803,9 +4027,7 @@ error: sock_n.node) { health_code_update(); - if (session_abort(destroy_conn->session)) { - assert(0); - } + session_abort(destroy_conn->session); /* * No need to grab another ref, because we own