X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=2a59d1ed3e166edd319209ea97065eebf81e3876;hb=32d1569c14b4e1efce9099a1e04c338a9c42f1f7;hp=af4ef1bbb80d9e17fe331298c4fac558d0795d3c;hpb=7591bab11eceedc6a0d1e02fd6f85592267a63b5;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index af4ef1bbb..2a59d1ed3 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -93,6 +93,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->stream_handle = stream_handle; stream->prev_seq = -1ULL; + stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; @@ -137,6 +138,11 @@ struct relay_stream *stream_create(struct ctf_trace *trace, ret = -1; goto end; } + stream->tfa = tracefile_array_create(stream->tracefile_count); + if (!stream->tfa) { + ret = -1; + goto end; + } if (stream->tracefile_size) { DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); } else { @@ -163,6 +169,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, * side of the relayd does not have the concept of session. */ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + stream->in_stream_ht = true; DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, stream->stream_handle); @@ -220,20 +227,27 @@ unlock: } /* - * Only called from destroy. No stream lock needed, since there is a - * single user at this point. This is ensured by having the refcount - * reaching 0. + * Stream must be protected by holding the stream lock or by virtue of being + * called from stream_destroy, in which case it is guaranteed to be accessed + * from a single thread by the reflock. */ static void stream_unpublish(struct relay_stream *stream) { - if (!stream->published) { - return; + if (stream->in_stream_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(relay_streams_ht, &iter); + assert(!ret); + stream->in_stream_ht = false; + } + if (stream->published) { + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_del_rcu(&stream->stream_node); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + stream->published = false; } - pthread_mutex_lock(&stream->trace->stream_list_lock); - cds_list_del_rcu(&stream->stream_node); - pthread_mutex_unlock(&stream->trace->stream_list_lock); - - stream->published = false; } static void stream_destroy(struct relay_stream *stream) @@ -241,6 +255,9 @@ static void stream_destroy(struct relay_stream *stream) if (stream->indexes_ht) { lttng_ht_destroy(stream->indexes_ht); } + if (stream->tfa) { + tracefile_array_destroy(stream->tfa); + } free(stream->path_name); free(stream->channel_name); free(stream); @@ -266,8 +283,6 @@ static void stream_release(struct urcu_ref *ref) struct relay_stream *stream = caa_container_of(ref, struct relay_stream, ref); struct relay_session *session; - int ret; - struct lttng_ht_iter iter; session = stream->trace->session; @@ -281,10 +296,6 @@ static void stream_release(struct urcu_ref *ref) } pthread_mutex_unlock(&session->recv_list_lock); - iter.iter.node = &stream->node.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - stream_unpublish(stream); if (stream->stream_fd) { @@ -328,12 +339,47 @@ void stream_put(struct relay_stream *stream) rcu_read_unlock(); } -void stream_close(struct relay_stream *stream) +void try_stream_close(struct relay_stream *stream) { - DBG("closing stream %" PRIu64, stream->stream_handle); + DBG("Trying to close stream %" PRIu64, stream->stream_handle); pthread_mutex_lock(&stream->lock); + /* + * Can be called concurently by connection close and reception of last + * pending data. + */ + if (stream->closed) { + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle); + return; + } + + stream->close_requested = true; + /* + * We shortcut the data pending check if no bound is known for this + * stream. This prevents us from never closing the stream in the case + * where a connection would be closed before a "close" command has + * been received. + * + * TODO + * This still leaves open the question of handling missing data after + * a bound has been set by a stream close command. Since we have no + * way of pairing data and control connection, and that a data + * connection has no ownership of a stream, it is likely that a + * timeout approach would be appropriate to handle dangling streams. + */ + if (stream->last_net_seq_num != -1ULL && + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + /* Don't close since we still have data pending. */ + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); + return; + } + stream_unpublish(stream); + stream->closed = true; + /* Relay indexes are only used by the "consumer/sessiond" end. */ relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock); + DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); stream_put(stream); }