+
+static void stream_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_stream *stream =
+ caa_container_of(rcu_head, struct relay_stream, rcu_node);
+
+ stream_destroy(stream);
+}
+
+/*
+ * No need to take stream->lock since this is only called on the final
+ * stream_put which ensures that a single thread may act on the stream.
+ *
+ * At that point, the object is also protected by the reflock which
+ * guarantees that no other thread may share ownership of this stream.
+ */
+static void stream_release(struct urcu_ref *ref)
+{
+ struct relay_stream *stream =
+ caa_container_of(ref, struct relay_stream, ref);
+ struct relay_session *session;
+
+ session = stream->trace->session;
+
+ DBG("Releasing stream id %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ session->stream_count--;
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ stream_unpublish(stream);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+ if (stream->trace) {
+ ctf_trace_put(stream->trace);
+ stream->trace = NULL;
+ }
+
+ call_rcu(&stream->rcu_node, stream_destroy_rcu);
+}
+
+void stream_put(struct relay_stream *stream)
+{
+ DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+ /*
+ * Ensure existence of stream->reflock for stream unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Stream reflock ensures that concurrent test and update of
+ * stream ref is atomic.
+ */
+ pthread_mutex_lock(&stream->reflock);
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ DBG("stream put stream id %" PRIu64 " refcount %d",
+ stream->stream_handle,
+ (int) stream->ref.refcount);
+ urcu_ref_put(&stream->ref, stream_release);
+ pthread_mutex_unlock(&stream->reflock);
+ rcu_read_unlock();
+}
+
+void stream_close(struct relay_stream *stream)
+{
+ DBG("closing stream %" PRIu64, stream->stream_handle);
+ pthread_mutex_lock(&stream->lock);
+ stream_unpublish(stream);
+ stream->closed = true;
+ relay_index_close_all(stream);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+}
+
+void print_relay_streams(void)
+{
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ stream,
+ stream->ref.refcount,
+ stream->stream_handle,
+ stream->trace->id,
+ stream->trace->session->id);
+ stream_put(stream);
+ }
+ rcu_read_unlock();
+}