+ call_rcu(&relayd->node.head, free_relayd_rcu);
+}
+
+/*
+ * Remove a channel from the global list protected by a mutex. This function is
+ * also responsible for freeing its data structures.
+ */
+void consumer_del_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ DBG("Consumer delete channel key %" PRIu64, channel->key);
+
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_del_channel(channel);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ goto end;
+ }
+
+ rcu_read_lock();
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
+ call_rcu(&channel->node.head, free_channel_rcu);
+end:
+ pthread_mutex_unlock(&consumer_data.lock);
+}
+
+/*
+ * Iterate over the relayd hash table and destroy each element. Finally,
+ * destroy the whole hash table.
+ */
+static void cleanup_relayd_ht(void)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ destroy_relayd(relayd);
+ }
+
+ rcu_read_unlock();
+
+ lttng_ht_destroy(consumer_data.relayd_ht);
+}
+
+/*
+ * Update the end point status of all streams having the given network sequence
+ * index (relayd index).
+ *
+ * It's atomically set without having the stream mutex locked which is fine
+ * because we handle the write/read race with a pipe wakeup for each thread.
+ */
+static void update_endpoint_status_by_netidx(int net_seq_idx,
+ enum consumer_endpoint_status status)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+
+ rcu_read_lock();
+
+ /* Let's begin with metadata */
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ if (stream->net_seq_idx == net_seq_idx) {
+ uatomic_set(&stream->endpoint_status, status);
+ DBG("Delete flag set to metadata stream %d", stream->wait_fd);
+ }
+ }
+
+ /* Follow up by the data streams */
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ if (stream->net_seq_idx == net_seq_idx) {
+ uatomic_set(&stream->endpoint_status, status);
+ DBG("Delete flag set to data stream %d", stream->wait_fd);
+ }
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Cleanup a relayd object by flagging every associated streams for deletion,
+ * destroying the object meaning removing it from the relayd hash table,
+ * closing the sockets and freeing the memory in a RCU call.
+ *
+ * If a local data context is available, notify the threads that the streams'
+ * state have changed.
+ */
+static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
+ struct lttng_consumer_local_data *ctx)
+{
+ int netidx;
+
+ assert(relayd);
+
+ DBG("Cleaning up relayd sockets");
+
+ /* Save the net sequence index before destroying the object */
+ netidx = relayd->net_seq_idx;
+
+ /*
+ * Delete the relayd from the relayd hash table, close the sockets and free
+ * the object in a RCU call.
+ */
+ destroy_relayd(relayd);
+
+ /* Set inactive endpoint to all streams */
+ update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
+
+ /*
+ * With a local data context, notify the threads that the streams' state
+ * have changed. The write() action on the pipe acts as an "implicit"
+ * memory barrier ordering the updates of the end point status from the
+ * read of this status which happens AFTER receiving this notify.
+ */
+ if (ctx) {
+ notify_thread_lttng_pipe(ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
+ }