+/*
+ * Update the end point status of all streams having the given network sequence
+ * index (relayd index).
+ *
+ * It's atomically set without having the stream mutex locked which is fine
+ * because we handle the write/read race with a pipe wakeup for each thread.
+ */
+static void update_endpoint_status_by_netidx(int net_seq_idx,
+ enum consumer_endpoint_status status)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+
+ rcu_read_lock();
+
+ /* Let's begin with metadata */
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ if (stream->net_seq_idx == net_seq_idx) {
+ uatomic_set(&stream->endpoint_status, status);
+ DBG("Delete flag set to metadata stream %d", stream->wait_fd);
+ }
+ }
+
+ /* Follow up by the data streams */
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ if (stream->net_seq_idx == net_seq_idx) {
+ uatomic_set(&stream->endpoint_status, status);
+ DBG("Delete flag set to data stream %d", stream->wait_fd);
+ }
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Cleanup a relayd object by flagging every associated streams for deletion,
+ * destroying the object meaning removing it from the relayd hash table,
+ * closing the sockets and freeing the memory in a RCU call.
+ *
+ * If a local data context is available, notify the threads that the streams'
+ * state have changed.
+ */
+static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
+ struct lttng_consumer_local_data *ctx)
+{
+ int netidx;
+
+ assert(relayd);
+
+ DBG("Cleaning up relayd sockets");
+
+ /* Save the net sequence index before destroying the object */
+ netidx = relayd->net_seq_idx;
+
+ /*
+ * Delete the relayd from the relayd hash table, close the sockets and free
+ * the object in a RCU call.
+ */
+ destroy_relayd(relayd);
+
+ /* Set inactive endpoint to all streams */
+ update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
+
+ /*
+ * With a local data context, notify the threads that the streams' state
+ * have changed. The write() action on the pipe acts as an "implicit"
+ * memory barrier ordering the updates of the end point status from the
+ * read of this status which happens AFTER receiving this notify.
+ */
+ if (ctx) {
+ notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+ }
+}
+