+ assert(metadata);
+ assert(metadata_str);
+
+ DBG("UST consumer writing metadata to channel %s", metadata->name);
+
+ assert(target_offset <= metadata->metadata_cache->max_offset);
+ ret = ustctl_write_metadata_to_channel(metadata->uchan,
+ metadata_str + target_offset, len);
+ if (ret < 0) {
+ ERR("ustctl write metadata fail with ret %d, len %" PRIu64, ret, len);
+ goto error;
+ }
+
+ ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
+
+error:
+ return ret;
+}
+
+/*
+ * Flush channel's streams using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int flush_channel(uint64_t chan_key)
+{
+ int ret = 0;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht *ht;
+ struct lttng_ht_iter iter;
+
+ DBG("UST consumer flush channel key %" PRIu64, chan_key);
+
+ rcu_read_lock();
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ ht = consumer_data.stream_per_chan_id_ht;
+
+ /* For each stream of the channel id, flush it. */
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+ &channel->key, &iter.iter, stream, node_channel_id.node) {
+ ustctl_flush_buffer(stream->ustream, 1);
+ }
+error:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int close_metadata(uint64_t chan_key)
+{
+ int ret = 0;
+ struct lttng_consumer_channel *channel;
+
+ DBG("UST consumer close metadata key %" PRIu64, chan_key);
+
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer close metadata %" PRIu64 " not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ pthread_mutex_lock(&consumer_data.lock);
+ if (!cds_lfht_is_node_deleted(&channel->node.node)) {
+ if (channel->switch_timer_enabled == 1) {
+ DBG("Deleting timer on metadata channel");
+ consumer_timer_switch_stop(channel);
+ }
+ ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+ if (ret < 0) {
+ ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error_unlock;
+ }
+ }
+
+error_unlock:
+ pthread_mutex_unlock(&consumer_data.lock);
+error:
+ return ret;
+}
+
+/*
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
+{
+ int ret;
+ struct lttng_consumer_channel *metadata;
+
+ DBG("UST consumer setup metadata key %" PRIu64, key);
+
+ metadata = consumer_find_channel(key);
+ if (!metadata) {
+ ERR("UST consumer push metadata %" PRIu64 " not found", key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ /*
+ * Send metadata stream to relayd if one available. Availability is
+ * known if the stream is still in the list of the channel.
+ */
+ if (cds_list_empty(&metadata->streams.head)) {
+ ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+ /* Send metadata stream to relayd if needed. */
+ ret = send_stream_to_relayd(metadata->metadata_stream);
+ if (ret < 0) {
+ ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto error;
+ }
+
+ ret = send_streams_to_thread(metadata, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+ /* List MUST be empty after or else it could be reused. */
+ assert(cds_list_empty(&metadata->streams.head));
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Receive the metadata updates from the sessiond.
+ */
+int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
+ uint64_t len, struct lttng_consumer_channel *channel)
+{
+ int ret, ret_code = LTTNG_OK;
+ char *metadata_str;
+
+ DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
+
+ metadata_str = zmalloc(len * sizeof(char));
+ if (!metadata_str) {
+ PERROR("zmalloc metadata string");
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto end;
+ }
+
+ /* Receive metadata string. */
+ ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+ if (ret < 0) {
+ /* Session daemon is dead so return gracefully. */
+ ret_code = ret;
+ goto end_free;
+ }
+
+ pthread_mutex_lock(&channel->metadata_cache->lock);
+ ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
+ if (ret < 0) {
+ /* Unable to handle metadata. Notify session daemon. */
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;