X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=1d19759023264cbd6d4817cf989a669cef3ac217;hp=a314eb9f984155561bf64b91ffd920c902fa0379;hb=77f7bd852edcc4f7227792553229c59fd590a447;hpb=8726a0458b9798953279448c13d8bd60d9907be3 diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index a314eb9f9..1d1975902 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -168,6 +168,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, * side of the relayd does not have the concept of session. */ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + stream->in_stream_ht = true; DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, stream->stream_handle); @@ -225,20 +226,27 @@ unlock: } /* - * Only called from destroy. No stream lock needed, since there is a - * single user at this point. This is ensured by having the refcount - * reaching 0. + * Stream must be protected by holding the stream lock or by virtue of being + * called from stream_destroy, in which case it is guaranteed to be accessed + * from a single thread by the reflock. */ static void stream_unpublish(struct relay_stream *stream) { - if (!stream->published) { - return; + if (stream->in_stream_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(relay_streams_ht, &iter); + assert(!ret); + stream->in_stream_ht = false; + } + if (stream->published) { + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_del_rcu(&stream->stream_node); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + stream->published = false; } - pthread_mutex_lock(&stream->trace->stream_list_lock); - cds_list_del_rcu(&stream->stream_node); - pthread_mutex_unlock(&stream->trace->stream_list_lock); - - stream->published = false; } static void stream_destroy(struct relay_stream *stream) @@ -274,8 +282,6 @@ static void stream_release(struct urcu_ref *ref) struct relay_stream *stream = caa_container_of(ref, struct relay_stream, ref); struct relay_session *session; - int ret; - struct lttng_ht_iter iter; session = stream->trace->session; @@ -289,10 +295,6 @@ static void stream_release(struct urcu_ref *ref) } pthread_mutex_unlock(&session->recv_list_lock); - iter.iter.node = &stream->node.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - stream_unpublish(stream); if (stream->stream_fd) { @@ -340,6 +342,7 @@ void stream_close(struct relay_stream *stream) { DBG("closing stream %" PRIu64, stream->stream_handle); pthread_mutex_lock(&stream->lock); + stream_unpublish(stream); stream->closed = true; relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock);