X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=2a59d1ed3e166edd319209ea97065eebf81e3876;hp=1d19759023264cbd6d4817cf989a669cef3ac217;hb=bda7c7b9b4c633de16f3d8bf109f9d21fdd9a5fb;hpb=77f7bd852edcc4f7227792553229c59fd590a447 diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 1d1975902..2a59d1ed3 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -93,6 +93,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->stream_handle = stream_handle; stream->prev_seq = -1ULL; + stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; @@ -338,14 +339,47 @@ void stream_put(struct relay_stream *stream) rcu_read_unlock(); } -void stream_close(struct relay_stream *stream) +void try_stream_close(struct relay_stream *stream) { - DBG("closing stream %" PRIu64, stream->stream_handle); + DBG("Trying to close stream %" PRIu64, stream->stream_handle); pthread_mutex_lock(&stream->lock); + /* + * Can be called concurently by connection close and reception of last + * pending data. + */ + if (stream->closed) { + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle); + return; + } + + stream->close_requested = true; + /* + * We shortcut the data pending check if no bound is known for this + * stream. This prevents us from never closing the stream in the case + * where a connection would be closed before a "close" command has + * been received. + * + * TODO + * This still leaves open the question of handling missing data after + * a bound has been set by a stream close command. Since we have no + * way of pairing data and control connection, and that a data + * connection has no ownership of a stream, it is likely that a + * timeout approach would be appropriate to handle dangling streams. + */ + if (stream->last_net_seq_num != -1ULL && + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + /* Don't close since we still have data pending. */ + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); + return; + } stream_unpublish(stream); stream->closed = true; + /* Relay indexes are only used by the "consumer/sessiond" end. */ relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock); + DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); stream_put(stream); }