+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_stream_get_wakeup_fd(stream->ustream);
+}
+
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_stream_close_wakeup_fd(stream->ustream);
+}
+
+/*
+ * Populate index values of a UST stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct ctf_packet_index *index,
+ struct ustctl_consumer_stream *ustream)
+{
+ int ret;
+
+ ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
+ if (ret < 0) {
+ PERROR("ustctl_get_timestamp_begin");
+ goto error;
+ }
+ index->timestamp_begin = htobe64(index->timestamp_begin);
+
+ ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
+ if (ret < 0) {
+ PERROR("ustctl_get_timestamp_end");
+ goto error;
+ }
+ index->timestamp_end = htobe64(index->timestamp_end);
+
+ ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
+ if (ret < 0) {
+ PERROR("ustctl_get_events_discarded");
+ goto error;
+ }
+ index->events_discarded = htobe64(index->events_discarded);
+
+ ret = ustctl_get_content_size(ustream, &index->content_size);
+ if (ret < 0) {
+ PERROR("ustctl_get_content_size");
+ goto error;
+ }
+ index->content_size = htobe64(index->content_size);
+
+ ret = ustctl_get_packet_size(ustream, &index->packet_size);
+ if (ret < 0) {
+ PERROR("ustctl_get_packet_size");
+ goto error;
+ }
+ index->packet_size = htobe64(index->packet_size);
+
+ ret = ustctl_get_stream_id(ustream, &index->stream_id);
+ if (ret < 0) {
+ PERROR("ustctl_get_stream_id");
+ goto error;
+ }
+ index->stream_id = htobe64(index->stream_id);
+
+error:
+ return ret;
+}
+
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+ ssize_t write_len;
+ int ret;
+
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ if (stream->chan->metadata_cache->max_offset
+ == stream->ust_metadata_pushed) {
+ ret = 0;
+ goto end;
+ }
+
+ write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+ &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->max_offset
+ - stream->ust_metadata_pushed);
+ assert(write_len != 0);
+ if (write_len < 0) {
+ ERR("Writing one metadata packet");
+ ret = -1;
+ goto end;
+ }
+ stream->ust_metadata_pushed += write_len;
+
+ assert(stream->chan->metadata_cache->max_offset >=
+ stream->ust_metadata_pushed);
+ ret = write_len;
+
+end:
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ return ret;
+}
+
+
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock is held here, but we need to release it when
+ * interacting with sessiond, else we cause a deadlock with live
+ * awaiting on metadata to be pushed out.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata)
+{
+ int ret;
+ int retry = 0;
+
+ assert(ctx);
+ assert(metadata);
+
+ pthread_mutex_unlock(&metadata->lock);
+ /*
+ * Request metadata from the sessiond, but don't wait for the flush
+ * because we locked the metadata thread.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+ if (ret < 0) {
+ goto end;
+ }
+ pthread_mutex_lock(&metadata->lock);
+
+ ret = commit_one_metadata_packet(metadata);
+ if (ret <= 0) {
+ goto end;
+ } else if (ret > 0) {
+ retry = 1;
+ }
+
+ ustctl_flush_buffer(metadata->ustream, 1);
+ ret = ustctl_snapshot(metadata->ustream);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking UST snapshot");
+ goto end;
+ }
+ DBG("No new metadata when syncing them.");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+ /*
+ * After this flush, we still need to extract metadata.
+ */
+ if (retry) {
+ ret = EAGAIN;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct ustctl_consumer_stream *ustream;
+
+ assert(stream);
+ assert(ctx);
+
+ ustream = stream->ustream;
+
+ /*
+ * First, we are going to check if there is a new subbuffer available
+ * before reading the stream wait_fd.
+ */
+ /* Get the next subbuffer */
+ ret = ustctl_get_next_subbuf(ustream);
+ if (ret) {
+ /* No more data found, flag the stream. */
+ stream->has_data = 0;
+ ret = 0;
+ goto end;
+ }
+
+ ret = ustctl_put_subbuf(ustream);
+ assert(!ret);
+
+ /* This stream still has data. Flag it and wake up the data thread. */
+ stream->has_data = 1;
+
+ if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+ ssize_t writelen;
+
+ writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+ if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ ret = writelen;
+ goto end;
+ }
+
+ /* The wake up pipe has been notified. */
+ ctx->has_wakeup = 1;
+ }
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */