X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=2ce9bf048bdd17e0c0402a7ef9ac467683885392;hb=f5ac4bd72b128dc3e6e5d41d3f76755e6215a02d;hp=7b385b49f5264e2ace8823ffbc0f8fe4747630a5;hpb=a44ca2ca85e4b64729f7b88b1919fd6737dfff8a;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 7b385b49f..2ce9bf048 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1266,9 +1266,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + /* + * This is one of the conditions which may trigger a stream close + * with the others being: + * 1) A close command is received for a stream + * 2) The control connection owning the stream is closed + * 3) We have received all of the stream's data _after_ a close + * request. + */ + try_stream_close(stream); if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1287,7 +1302,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: @@ -1388,7 +1402,7 @@ end: static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret = htobe32(LTTNG_OK); + int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_metadata_payload *metadata_struct; @@ -1425,9 +1439,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret < 0 || ret != data_size) { - if (ret == 0) { + size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); + if (size_ret < 0 || size_ret != data_size) { + if (size_ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); } else { @@ -1454,9 +1468,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_put; } - ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); - if (ret < 0) { + if (size_ret < 0) { goto end_put; } @@ -1468,7 +1482,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); - end: return ret; } @@ -2149,7 +2162,7 @@ static int relay_process_data(struct relay_connection *conn) uint64_t net_seq_num; uint32_t data_size; struct relay_session *session; - bool new_stream = false; + bool new_stream = false, close_requested = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2268,7 +2281,12 @@ static int relay_process_data(struct relay_connection *conn) stream->prev_seq = net_seq_num; end_stream_unlock: + close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); + if (close_requested) { + try_stream_close(stream); + } + if (new_stream) { pthread_mutex_lock(&session->lock); uatomic_set(&session->new_streams, 1);