From bda7c7b9b4c633de16f3d8bf109f9d21fdd9a5fb Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 4 Sep 2015 22:04:12 -0400 Subject: [PATCH] Fix: Wait for in-flight data before closing a stream MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit A stream's closing conditions are evaluated in three places: 1) When a close command is received 2) When the control connection owning it is closed 3) The stream has received all of its data following a close request. These checks are performed in try_stream_close(). A known downside of this approach is that a stream will never be closed if it has not received all of its data. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/ctf-trace.c | 5 +++-- src/bin/lttng-relayd/main.c | 18 ++++++++++++--- src/bin/lttng-relayd/session.c | 3 ++- src/bin/lttng-relayd/stream.c | 38 ++++++++++++++++++++++++++++++-- src/bin/lttng-relayd/stream.h | 5 +++-- 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c index d965cec8c..7c882620e 100644 --- a/src/bin/lttng-relayd/ctf-trace.c +++ b/src/bin/lttng-relayd/ctf-trace.c @@ -183,9 +183,10 @@ int ctf_trace_close(struct ctf_trace *trace) cds_list_for_each_entry_rcu(stream, &trace->stream_list, stream_node) { /* - * Close the stream. + * Close stream since the connection owning the trace is being + * torn down. */ - stream_close(stream); + try_stream_close(stream); } rcu_read_unlock(); /* diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index fe52702c7..e769daf88 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1275,8 +1275,15 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); pthread_mutex_unlock(&stream->lock); - stream_close(stream); - + /* + * This is one of the conditions which may trigger a stream close + * with the others being: + * 1) A close command is received for a stream + * 2) The control connection owning the stream is closed + * 3) We have received all of the stream's data _after_ a close + * request. + */ + try_stream_close(stream); if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -2156,7 +2163,7 @@ static int relay_process_data(struct relay_connection *conn) uint64_t net_seq_num; uint32_t data_size; struct relay_session *session; - bool new_stream = false; + bool new_stream = false, close_requested = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2275,7 +2282,12 @@ static int relay_process_data(struct relay_connection *conn) stream->prev_seq = net_seq_num; end_stream_unlock: + close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); + if (close_requested) { + try_stream_close(stream); + } + if (new_stream) { pthread_mutex_lock(&session->lock); uatomic_set(&session->new_streams, 1); diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c index 33d7d43c0..14ae874ea 100644 --- a/src/bin/lttng-relayd/session.c +++ b/src/bin/lttng-relayd/session.c @@ -212,7 +212,8 @@ unlock: } cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node) { - stream_close(stream); + /* Close streams which have not been published yet. */ + try_stream_close(stream); } rcu_unlock: rcu_read_unlock(); diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 1d1975902..2a59d1ed3 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -93,6 +93,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->stream_handle = stream_handle; stream->prev_seq = -1ULL; + stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; @@ -338,14 +339,47 @@ void stream_put(struct relay_stream *stream) rcu_read_unlock(); } -void stream_close(struct relay_stream *stream) +void try_stream_close(struct relay_stream *stream) { - DBG("closing stream %" PRIu64, stream->stream_handle); + DBG("Trying to close stream %" PRIu64, stream->stream_handle); pthread_mutex_lock(&stream->lock); + /* + * Can be called concurently by connection close and reception of last + * pending data. + */ + if (stream->closed) { + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle); + return; + } + + stream->close_requested = true; + /* + * We shortcut the data pending check if no bound is known for this + * stream. This prevents us from never closing the stream in the case + * where a connection would be closed before a "close" command has + * been received. + * + * TODO + * This still leaves open the question of handling missing data after + * a bound has been set by a stream close command. Since we have no + * way of pairing data and control connection, and that a data + * connection has no ownership of a stream, it is likely that a + * timeout approach would be appropriate to handle dangling streams. + */ + if (stream->last_net_seq_num != -1ULL && + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + /* Don't close since we still have data pending. */ + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); + return; + } stream_unpublish(stream); stream->closed = true; + /* Relay indexes are only used by the "consumer/sessiond" end. */ relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock); + DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); stream_put(stream); } diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index 542e05c0f..5030e5d4c 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -85,7 +85,8 @@ struct relay_stream { */ struct tracefile_array *tfa; - bool closed; /* Stream is closed. */ + bool closed; /* Stream is closed. */ + bool close_requested; /* Close command has been received. */ /* * Counts number of indexes in indexes_ht. Redundant info. @@ -144,7 +145,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, struct relay_stream *stream_get_by_id(uint64_t stream_id); bool stream_get(struct relay_stream *stream); void stream_put(struct relay_stream *stream); -void stream_close(struct relay_stream *stream); +void try_stream_close(struct relay_stream *stream); void stream_publish(struct relay_stream *stream); void print_relay_streams(void); -- 2.34.1