X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=e81bdeffc82b97790ddf99bf6bb473636b0f4064;hp=cf6e01ee0ffd1409269ffc9cfc0f2dc662261d04;hb=4c6885d2d540ce66b9050e9f90c49575fb76dde0;hpb=cb523e0290a439cf57fa7823ffa78803500ba4c3 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index cf6e01ee0..e81bdeffc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -85,7 +85,7 @@ NULL enum relay_connection_status { RELAY_CONNECTION_STATUS_OK, - /* An error occured while processing an event on the connection. */ + /* An error occurred while processing an event on the connection. */ RELAY_CONNECTION_STATUS_ERROR, /* Connection closed/shutdown cleanly. */ RELAY_CONNECTION_STATUS_CLOSED, @@ -1099,7 +1099,7 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, { int ret = 0; ssize_t send_ret; - struct relay_session *session; + struct relay_session *session = NULL; struct lttcomm_relayd_status_session reply; char session_name[LTTNG_NAME_MAX]; char hostname[LTTNG_HOST_NAME_MAX]; @@ -1153,7 +1153,9 @@ send_reply: send_ret); ret = -1; } - + if (ret < 0 && session) { + session_put(session); + } return ret; } @@ -1570,10 +1572,12 @@ end: } static -int do_rotate_stream(struct relay_stream *stream) +int do_rotate_stream_data(struct relay_stream *stream) { int ret; + DBG("Rotating stream %" PRIu64 " data file", + stream->stream_handle); /* Perform the stream rotation. */ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, @@ -1585,19 +1589,17 @@ int do_rotate_stream(struct relay_stream *stream) goto end; } stream->tracefile_size_current = 0; - - /* Rotate also the index if the stream is not a metadata stream. */ - if (!stream->is_metadata) { - ret = create_rotate_index_file(stream, stream->path_name); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end; - } - } - - stream->rotate_at_seq_num = -1ULL; stream->pos_after_last_complete_data_index = 0; + stream->data_rotated = true; + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } end: return ret; } @@ -1609,9 +1611,7 @@ end: * connections are separate, the indexes as well as the commands arrive from * the control connection and we have no control over the order so we could be * in a situation where too much data has been received on the data connection - * before the rotation command on the control connection arrives. We don't need - * to update the index because its order is guaranteed with the rotation - * command message. + * before the rotation command on the control connection arrives. */ static int rotate_truncate_stream(struct relay_stream *stream) @@ -1642,7 +1642,7 @@ int rotate_truncate_stream(struct relay_stream *stream) /* * Rewind the current tracefile to the position at which the rotation - * should have occured. + * should have occurred. */ lseek_ret = lseek(stream->stream_fd->fd, stream->pos_after_last_complete_data_index, SEEK_SET); @@ -1710,12 +1710,6 @@ int rotate_truncate_stream(struct relay_stream *stream) goto end; } - ret = create_rotate_index_file(stream, stream->path_name); - if (ret < 0) { - ERR("Rotate stream index file"); - goto end; - } - /* * Update the offset and FD of all the eventual indexes created by the * data connection before the rotation command arrived. @@ -1738,30 +1732,94 @@ end: } /* - * Check if a stream should perform a rotation (for session rotation). + * Check if a stream's index file should be rotated (for session rotation). * Must be called with the stream lock held. * * Return 0 on success, a negative value on error. */ static -int try_rotate_stream(struct relay_stream *stream) +int try_rotate_stream_index(struct relay_stream *stream) { int ret = 0; - uint64_t trace_seq; - /* No rotation expected. */ if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ goto end; } - trace_seq = min(stream->prev_data_seq, stream->prev_index_seq); - if (stream->prev_data_seq == -1ULL || stream->prev_index_seq == -1ULL || - trace_seq < stream->rotate_at_seq_num) { - DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + if (stream->index_rotated) { + /* Rotation of the index has already occurred. */ + goto end; + } + + if (stream->prev_index_seq == -1ULL || + stream->prev_index_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_index_seq); + goto end; + } else if (stream->prev_index_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", stream->stream_handle, stream->rotate_at_seq_num, stream->prev_data_seq, stream->prev_index_seq); + ret = -1; + goto end; + } else { + DBG("Rotating stream %" PRIu64 " index file", + stream->stream_handle); + ret = create_rotate_index_file(stream, stream->path_name); + stream->index_rotated = true; + + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } + } + +end: + return ret; +} + +/* + * Check if a stream's data file (as opposed to index) should be rotated + * (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int try_rotate_stream_data(struct relay_stream *stream) +{ + int ret = 0; + + if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ + goto end; + } + + if (stream->data_rotated) { + /* Rotation of the data file has already occurred. */ + goto end; + } + + if (stream->prev_data_seq == -1ULL || + stream->prev_data_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); goto end; } else if (stream->prev_data_seq > stream->rotate_at_seq_num) { /* @@ -1776,24 +1834,20 @@ int try_rotate_stream(struct relay_stream *stream) ERR("Failed to truncate stream"); goto end; } - } else { - if (trace_seq != stream->rotate_at_seq_num) { - /* - * Unexpected, protocol error/bug. - * It could mean that we received a rotation position - * that is in the past. - */ - ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + } else if (stream->prev_data_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", stream->stream_handle, stream->rotate_at_seq_num, - stream->prev_data_seq, - stream->prev_index_seq); - ret = -1; - goto end; - } - DBG("Stream %" PRIu64 " ready for rotation", - stream->stream_handle); - ret = do_rotate_stream(stream); + stream->prev_data_seq); + ret = -1; + goto end; + } else { + ret = do_rotate_stream_data(stream); } end: @@ -1864,7 +1918,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); - ret = try_rotate_stream(metadata_stream); + ret = try_rotate_stream_data(metadata_stream); if (ret < 0) { goto end_put; } @@ -2366,6 +2420,11 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; stream->prev_index_seq = index_info.net_seq_num; + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_put; + } } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2547,18 +2606,28 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { + /* + * Metadata streams have no index; consider its rotation + * complete. + */ + stream->index_rotated = true; /* * The metadata stream is sent only over the control connection * so we know we have all the data to perform the stream * rotation. */ - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } else { stream->rotate_at_seq_num = stream_info.rotate_at_seq_num; - ret = try_rotate_stream(stream); - } - if (ret < 0) { - goto end_stream_unlock; + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end_stream_unlock; + } + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } end_stream_unlock: @@ -2906,7 +2975,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, rotate_pending = true; DBG("Stream %" PRIu64 " is still rotating", stream->stream_handle); - } else if (stream->current_chunk_id.value < chunk_id) { + } else if (stream->current_chunk_id.value <= chunk_id) { /* * Stream closed on the consumer but still active on the * relay. @@ -3568,6 +3637,10 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->pos_after_last_complete_data_index = stream->tracefile_size_current; stream->prev_index_seq = state->header.net_seq_num; + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } stream->prev_data_seq = state->header.net_seq_num; @@ -3580,7 +3653,7 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; - ret = try_rotate_stream(stream); + ret = try_rotate_stream_data(stream); if (ret < 0) { status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock;