X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=2ce9bf048bdd17e0c0402a7ef9ac467683885392;hb=32d1569c14b4e1efce9099a1e04c338a9c42f1f7;hp=adb044f1d3ebd4b6b6d58a035c6e4d9eece706b9;hpb=c0bae11d346fa301d993430a2cf33b3c426e3140;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index adb044f1d..2ce9bf048 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -71,6 +71,7 @@ #include "session.h" #include "stream.h" #include "connection.h" +#include "tracefile-array.h" /* command line options */ char *opt_output_path; @@ -1265,9 +1266,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + /* + * This is one of the conditions which may trigger a stream close + * with the others being: + * 1) A close command is received for a stream + * 2) The control connection owning the stream is closed + * 3) We have received all of the stream's data _after_ a close + * request. + */ + try_stream_close(stream); if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1286,7 +1302,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: @@ -1387,7 +1402,7 @@ end: static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret = htobe32(LTTNG_OK); + int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_metadata_payload *metadata_struct; @@ -1424,9 +1439,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret < 0 || ret != data_size) { - if (ret == 0) { + size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); + if (size_ret < 0 || size_ret != data_size) { + if (size_ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); } else { @@ -1453,9 +1468,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_put; } - ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); - if (ret < 0) { + if (size_ret < 0) { goto end_put; } @@ -1467,7 +1482,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); - end: return ret; } @@ -1890,7 +1904,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, * Only flag a stream inactive when it has already * received data and no indexes are in flight. */ - if (stream->total_index_received > 0 + if (stream->index_received_seqcount > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); @@ -1918,7 +1932,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2091,7 +2106,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, fd = index_create_file(stream->path_name, stream->channel_name, -1, -1, stream->tracefile_size, - stream->current_tracefile_id); + tracefile_array_get_file_index_head(stream->tfa)); if (fd < 0) { ret = -1; /* Put self-ref for this index due to error. */ @@ -2120,7 +2135,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* No flush. */ ret = 0; @@ -2146,7 +2162,7 @@ static int relay_process_data(struct relay_connection *conn) uint64_t net_seq_num; uint32_t data_size; struct relay_session *session; - bool new_stream = false; + bool new_stream = false, close_requested = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2204,35 +2220,23 @@ static int relay_process_data(struct relay_connection *conn) if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - uint64_t new_id; + uint64_t old_id, new_id; + + old_id = tracefile_array_get_file_index_head(stream->tfa); + tracefile_array_file_rotate(stream->tfa); + + /* new_id is updated by utils_rotate_stream_file. */ + new_id = old_id; - new_id = (stream->current_tracefile_id + 1) % - stream->tracefile_count; - /* - * Move viewer oldest available data position forward if - * we are overwriting a tracefile. - */ - if (new_id == stream->oldest_tracefile_id) { - stream->oldest_tracefile_id = - (stream->oldest_tracefile_id + 1) % - stream->tracefile_count; - } ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, stream->tracefile_count, -1, -1, stream->stream_fd->fd, - &stream->current_tracefile_id, - &stream->stream_fd->fd); + &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Rotating stream output file"); goto end_stream_unlock; } - stream->current_tracefile_seq++; - if (stream->current_tracefile_seq - - stream->oldest_tracefile_seq >= - stream->tracefile_count) { - stream->oldest_tracefile_seq++; - } /* * Reset current size because we just performed a stream * rotation. @@ -2277,7 +2281,12 @@ static int relay_process_data(struct relay_connection *conn) stream->prev_seq = net_seq_num; end_stream_unlock: + close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); + if (close_requested) { + try_stream_close(stream); + } + if (new_stream) { pthread_mutex_lock(&session->lock); uatomic_set(&session->new_streams, 1);