Fix: unpublish stream on close
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 4 Sep 2015 19:44:23 +0000 (15:44 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sat, 5 Sep 2015 02:48:47 +0000 (22:48 -0400)
Fixes a race where data connection can still add indexes after close,
preventing graceful teardown of the stream.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h

index 7b385b49f5264e2ace8823ffbc0f8fe4747630a5..fe52702c77489209dcb43208206bb6e7dd1fb04c 100644 (file)
@@ -1266,9 +1266,17 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+
+       /*
+        * Set last_net_seq_num before the close flag. Required by data
+        * pending check.
+        */
        pthread_mutex_lock(&stream->lock);
-       stream->closed = true;
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+       pthread_mutex_unlock(&stream->lock);
+
+       stream_close(stream);
+
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
@@ -1287,7 +1295,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        viewer_stream_put(vstream);
                }
        }
-       pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 
 end:
index a314eb9f984155561bf64b91ffd920c902fa0379..1d19759023264cbd6d4817cf989a669cef3ac217 100644 (file)
@@ -168,6 +168,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
         * side of the relayd does not have the concept of session.
         */
        lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+       stream->in_stream_ht = true;
 
        DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
                        stream->stream_handle);
@@ -225,20 +226,27 @@ unlock:
 }
 
 /*
- * Only called from destroy. No stream lock needed, since there is a
- * single user at this point. This is ensured by having the refcount
- * reaching 0.
+ * Stream must be protected by holding the stream lock or by virtue of being
+ * called from stream_destroy, in which case it is guaranteed to be accessed
+ * from a single thread by the reflock.
  */
 static void stream_unpublish(struct relay_stream *stream)
 {
-       if (!stream->published) {
-               return;
+       if (stream->in_stream_ht) {
+               struct lttng_ht_iter iter;
+               int ret;
+
+               iter.iter.node = &stream->node.node;
+               ret = lttng_ht_del(relay_streams_ht, &iter);
+               assert(!ret);
+               stream->in_stream_ht = false;
+       }
+       if (stream->published) {
+               pthread_mutex_lock(&stream->trace->stream_list_lock);
+               cds_list_del_rcu(&stream->stream_node);
+               pthread_mutex_unlock(&stream->trace->stream_list_lock);
+               stream->published = false;
        }
-       pthread_mutex_lock(&stream->trace->stream_list_lock);
-       cds_list_del_rcu(&stream->stream_node);
-       pthread_mutex_unlock(&stream->trace->stream_list_lock);
-
-       stream->published = false;
 }
 
 static void stream_destroy(struct relay_stream *stream)
@@ -274,8 +282,6 @@ static void stream_release(struct urcu_ref *ref)
        struct relay_stream *stream =
                caa_container_of(ref, struct relay_stream, ref);
        struct relay_session *session;
-       int ret;
-       struct lttng_ht_iter iter;
 
        session = stream->trace->session;
 
@@ -289,10 +295,6 @@ static void stream_release(struct urcu_ref *ref)
        }
        pthread_mutex_unlock(&session->recv_list_lock);
 
-       iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(relay_streams_ht, &iter);
-       assert(!ret);
-
        stream_unpublish(stream);
 
        if (stream->stream_fd) {
@@ -340,6 +342,7 @@ void stream_close(struct relay_stream *stream)
 {
        DBG("closing stream %" PRIu64, stream->stream_handle);
        pthread_mutex_lock(&stream->lock);
+       stream_unpublish(stream);
        stream->closed = true;
        relay_index_close_all(stream);
        pthread_mutex_unlock(&stream->lock);
index ca6be8133dc9cd963f5fcdbdae379b813119ef84..542e05c0fb9e36b8fca3ec6b0995d8d20edb1b18 100644 (file)
@@ -132,6 +132,7 @@ struct relay_stream {
         * Node of stream within global stream hash table.
         */
        struct lttng_ht_node_u64 node;
+       bool in_stream_ht;              /* is stream in stream hash table. */
        struct rcu_head rcu_node;       /* For call_rcu teardown. */
 };
 
This page took 0.038745 seconds and 4 git commands to generate.