From: Mathieu Desnoyers Date: Fri, 4 Sep 2015 19:44:23 +0000 (-0400) Subject: Fix: unpublish stream on close X-Git-Tag: v2.8.0-rc1~401 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=77f7bd852edcc4f7227792553229c59fd590a447 Fix: unpublish stream on close Fixes a race where data connection can still add indexes after close, preventing graceful teardown of the stream. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 7b385b49f..fe52702c7 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1266,9 +1266,17 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + stream_close(stream); + if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1287,7 +1295,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index a314eb9f9..1d1975902 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -168,6 +168,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, * side of the relayd does not have the concept of session. */ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + stream->in_stream_ht = true; DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, stream->stream_handle); @@ -225,20 +226,27 @@ unlock: } /* - * Only called from destroy. No stream lock needed, since there is a - * single user at this point. This is ensured by having the refcount - * reaching 0. + * Stream must be protected by holding the stream lock or by virtue of being + * called from stream_destroy, in which case it is guaranteed to be accessed + * from a single thread by the reflock. */ static void stream_unpublish(struct relay_stream *stream) { - if (!stream->published) { - return; + if (stream->in_stream_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(relay_streams_ht, &iter); + assert(!ret); + stream->in_stream_ht = false; + } + if (stream->published) { + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_del_rcu(&stream->stream_node); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + stream->published = false; } - pthread_mutex_lock(&stream->trace->stream_list_lock); - cds_list_del_rcu(&stream->stream_node); - pthread_mutex_unlock(&stream->trace->stream_list_lock); - - stream->published = false; } static void stream_destroy(struct relay_stream *stream) @@ -274,8 +282,6 @@ static void stream_release(struct urcu_ref *ref) struct relay_stream *stream = caa_container_of(ref, struct relay_stream, ref); struct relay_session *session; - int ret; - struct lttng_ht_iter iter; session = stream->trace->session; @@ -289,10 +295,6 @@ static void stream_release(struct urcu_ref *ref) } pthread_mutex_unlock(&session->recv_list_lock); - iter.iter.node = &stream->node.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - stream_unpublish(stream); if (stream->stream_fd) { @@ -340,6 +342,7 @@ void stream_close(struct relay_stream *stream) { DBG("closing stream %" PRIu64, stream->stream_handle); pthread_mutex_lock(&stream->lock); + stream_unpublish(stream); stream->closed = true; relay_index_close_all(stream); pthread_mutex_unlock(&stream->lock); diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index ca6be8133..542e05c0f 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -132,6 +132,7 @@ struct relay_stream { * Node of stream within global stream hash table. */ struct lttng_ht_node_u64 node; + bool in_stream_ht; /* is stream in stream hash table. */ struct rcu_head rcu_node; /* For call_rcu teardown. */ };