X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=8825d094ce57b389662667bfd5d567ca0ed77cfd;hp=1d19759023264cbd6d4817cf989a669cef3ac217;hb=49e614cb2878f0664c9f44f9f24cb1d81116de21;hpb=77f7bd852edcc4f7227792553229c59fd590a447 diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 1d1975902..8825d094c 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -93,6 +93,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->stream_handle = stream_handle; stream->prev_seq = -1ULL; + stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; @@ -252,6 +253,11 @@ static void stream_unpublish(struct relay_stream *stream) static void stream_destroy(struct relay_stream *stream) { if (stream->indexes_ht) { + /* + * Calling lttng_ht_destroy in call_rcu worker thread so + * we don't hold the RCU read-side lock while calling + * it. + */ lttng_ht_destroy(stream->indexes_ht); } if (stream->tfa) { @@ -338,17 +344,104 @@ void stream_put(struct relay_stream *stream) rcu_read_unlock(); } -void stream_close(struct relay_stream *stream) +void try_stream_close(struct relay_stream *stream) { - DBG("closing stream %" PRIu64, stream->stream_handle); + DBG("Trying to close stream %" PRIu64, stream->stream_handle); pthread_mutex_lock(&stream->lock); + /* + * Can be called concurently by connection close and reception of last + * pending data. + */ + if (stream->closed) { + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle); + return; + } + + stream->close_requested = true; + + if (stream->last_net_seq_num == -1ULL) { + /* + * Handle connection close without explicit stream close + * command. + * + * We can be clever about indexes partially received in + * cases where we received the data socket part, but not + * the control socket part: since we're currently closing + * the stream on behalf of the control socket, we *know* + * there won't be any more control information for this + * socket. Therefore, we can destroy all indexes for + * which we have received only the file descriptor (from + * data socket). This takes care of consumerd crashes + * between sending the data and control information for + * a packet. Since those are sent in that order, we take + * care of consumerd crashes. + */ + relay_index_close_partial_fd(stream); + /* + * Use the highest net_seq_num we currently have pending + * As end of stream indicator. Leave last_net_seq_num + * at -1ULL if we cannot find any index. + */ + stream->last_net_seq_num = relay_index_find_last(stream); + /* Fall-through into the next check. */ + } + + if (stream->last_net_seq_num != -1ULL && + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + /* + * Don't close since we still have data pending. This + * handles cases where an explicit close command has + * been received for this stream, and cases where the + * connection has been closed, and we are awaiting for + * index information from the data socket. It is + * therefore expected that all the index fd information + * we need has already been received on the control + * socket. Matching index information from data socket + * should be Expected Soon(TM). + * + * TODO: We should implement a timer to garbage collect + * streams after a timeout to be resilient against a + * consumerd implementation that would not match this + * expected behavior. + */ + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); + return; + } + /* + * We received all the indexes we can expect. + */ stream_unpublish(stream); stream->closed = true; + /* Relay indexes are only used by the "consumer/sessiond" end. */ relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock); + DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); stream_put(stream); } +static void print_stream_indexes(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index, + index_n.node) { + DBG("index %p net_seq_num %" PRIu64 " refcount %ld" + " stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + index, + index->index_n.key, + stream->ref.refcount, + index->stream->stream_handle, + index->stream->trace->id, + index->stream->trace->session->id); + } + rcu_read_unlock(); +} + void print_relay_streams(void) { struct lttng_ht_iter iter; @@ -367,6 +460,7 @@ void print_relay_streams(void) stream->stream_handle, stream->trace->id, stream->trace->session->id); + print_stream_indexes(stream); stream_put(stream); } rcu_read_unlock();