+ ret_func = -1;
+ goto end;
+
+end:
+ rcu_read_unlock();
+ health_code_update();
+ return ret_func;
+}
+
+int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream,
+ int producer_active)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_flush_buffer(stream->ustream, producer_active);
+}
+
+/*
+ * Take a snapshot for a specific stream.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_snapshot(stream->ustream);
+}
+
+/*
+ * Sample consumed and produced positions for a specific stream.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_ustconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_snapshot_sample_positions(stream->ustream);
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_get_produced_snapshot(
+ struct lttng_consumer_stream *stream, unsigned long *pos)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+ LTTNG_ASSERT(pos);
+
+ return lttng_ust_ctl_snapshot_get_produced(stream->ustream, pos);
+}
+
+/*
+ * Get the consumed position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_ustconsumer_get_consumed_snapshot(
+ struct lttng_consumer_stream *stream, unsigned long *pos)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+ LTTNG_ASSERT(pos);
+
+ return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos);
+}
+
+int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
+ int producer)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_flush_buffer(stream->ustream, producer);
+}
+
+int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_clear_buffer(stream->ustream);
+}
+
+int lttng_ustconsumer_get_current_timestamp(
+ struct lttng_consumer_stream *stream, uint64_t *ts)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+ LTTNG_ASSERT(ts);
+
+ return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts);
+}
+
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+ LTTNG_ASSERT(seq);
+
+ return lttng_ust_ctl_get_sequence_number(stream->ustream, seq);
+}
+
+/*
+ * Called when the stream signals the consumer that it has hung up.
+ */
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->quiescent) {
+ if (lttng_ust_ctl_flush_buffer(stream->ustream, 0) < 0) {
+ ERR("Failed to flush buffer on stream hang-up");
+ } else {
+ stream->quiescent = true;
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ stream->hangup_flush_done = 1;
+}
+
+void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
+{
+ int i;
+
+ LTTNG_ASSERT(chan);
+ LTTNG_ASSERT(chan->uchan);
+ LTTNG_ASSERT(chan->buffer_credentials.is_set);
+
+ if (chan->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(chan);
+ }
+ for (i = 0; i < chan->nr_stream_fds; i++) {
+ int ret;
+
+ ret = close(chan->stream_fds[i]);
+ if (ret) {
+ PERROR("close");
+ }
+ if (chan->shm_path[0]) {
+ char shm_path[PATH_MAX];
+
+ ret = get_stream_shm_path(shm_path, chan->shm_path, i);
+ if (ret) {
+ ERR("Cannot get stream shm path");
+ }
+ ret = run_as_unlink(shm_path,
+ lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+ chan->buffer_credentials)),
+ lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+ chan->buffer_credentials)));
+ if (ret) {
+ PERROR("unlink %s", shm_path);
+ }
+ }
+ }
+}
+
+void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
+{
+ LTTNG_ASSERT(chan);
+ LTTNG_ASSERT(chan->uchan);
+ LTTNG_ASSERT(chan->buffer_credentials.is_set);
+
+ consumer_metadata_cache_destroy(chan);
+ lttng_ust_ctl_destroy_channel(chan->uchan);
+ /* Try to rmdir all directories under shm_path root. */
+ if (chan->root_shm_path[0]) {
+ (void) run_as_rmdir_recursive(chan->root_shm_path,
+ lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+ chan->buffer_credentials)),
+ lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+ chan->buffer_credentials)),
+ LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
+ }
+ free(chan->stream_fds);
+}
+
+void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ if (stream->chan->switch_timer_enabled == 1) {
+ consumer_timer_switch_stop(stream->chan);
+ }
+ lttng_ust_ctl_destroy_stream(stream->ustream);
+}
+
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_stream_get_wakeup_fd(stream->ustream);
+}
+
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+ LTTNG_ASSERT(stream);
+ LTTNG_ASSERT(stream->ustream);
+
+ return lttng_ust_ctl_stream_close_wakeup_fd(stream->ustream);
+}
+
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed from the cache into the ring buffer, or a
+ * negative value on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+ ssize_t write_len;
+ int ret;
+
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ if (stream->chan->metadata_cache->contents.size ==
+ stream->ust_metadata_pushed) {
+ /*
+ * In the context of a user space metadata channel, a
+ * change in version can be detected in two ways:
+ * 1) During the pre-consume of the `read_subbuffer` loop,
+ * 2) When populating the metadata ring buffer (i.e. here).
+ *
+ * This function is invoked when there is no metadata
+ * available in the ring-buffer. If all data was consumed
+ * up to the size of the metadata cache, there is no metadata
+ * to insert in the ring-buffer.
+ *
+ * However, the metadata version could still have changed (a
+ * regeneration without any new data will yield the same cache
+ * size).
+ *
+ * The cache's version is checked for a version change and the
+ * consumed position is reset if one occurred.
+ *
+ * This check is only necessary for the user space domain as
+ * it has to manage the cache explicitly. If this reset was not
+ * performed, no metadata would be consumed (and no reset would
+ * occur as part of the pre-consume) until the metadata size
+ * exceeded the cache size.
+ */
+ if (stream->metadata_version !=
+ stream->chan->metadata_cache->version) {
+ metadata_stream_reset_cache_consumed_position(stream);
+ consumer_stream_metadata_set_version(stream,
+ stream->chan->metadata_cache->version);
+ } else {
+ ret = 0;
+ goto end;
+ }
+ }
+
+ write_len = lttng_ust_ctl_write_one_packet_to_channel(stream->chan->uchan,
+ &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->contents.size -
+ stream->ust_metadata_pushed);
+ LTTNG_ASSERT(write_len != 0);
+ if (write_len < 0) {
+ ERR("Writing one metadata packet");
+ ret = write_len;
+ goto end;
+ }
+ stream->ust_metadata_pushed += write_len;
+
+ LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >=
+ stream->ust_metadata_pushed);
+ ret = write_len;
+
+ /*
+ * Switch packet (but don't open the next one) on every commit of
+ * a metadata packet. Since the subbuffer is fully filled (with padding,
+ * if needed), the stream is "quiescent" after this commit.
+ */
+ if (lttng_ust_ctl_flush_buffer(stream->ustream, 1)) {
+ ERR("Failed to flush buffer while commiting one metadata packet");
+ ret = -EIO;
+ } else {
+ stream->quiescent = true;
+ }
+end:
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ return ret;
+}
+
+
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock is held here, but we need to release it when
+ * interacting with sessiond, else we cause a deadlock with live
+ * awaiting on metadata to be pushed out.
+ *
+ * The RCU read side lock must be held by the caller.
+ */
+enum sync_metadata_status lttng_ustconsumer_sync_metadata(
+ struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata_stream)
+{
+ int ret;
+ enum sync_metadata_status status;
+ struct lttng_consumer_channel *metadata_channel;
+
+ LTTNG_ASSERT(ctx);
+ LTTNG_ASSERT(metadata_stream);
+
+ metadata_channel = metadata_stream->chan;
+ pthread_mutex_unlock(&metadata_stream->lock);
+ /*
+ * Request metadata from the sessiond, but don't wait for the flush
+ * because we locked the metadata thread.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+ pthread_mutex_lock(&metadata_stream->lock);
+ if (ret < 0) {
+ status = SYNC_METADATA_STATUS_ERROR;
+ goto end;
+ }
+
+ /*
+ * The metadata stream and channel can be deleted while the
+ * metadata stream lock was released. The streamed is checked
+ * for deletion before we use it further.
+ *
+ * Note that it is safe to access a logically-deleted stream since its
+ * existence is still guaranteed by the RCU read side lock. However,
+ * it should no longer be used. The close/deletion of the metadata
+ * channel and stream already guarantees that all metadata has been
+ * consumed. Therefore, there is nothing left to do in this function.
+ */
+ if (consumer_stream_is_deleted(metadata_stream)) {
+ DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
+ metadata_stream->key);
+ status = SYNC_METADATA_STATUS_NO_DATA;
+ goto end;
+ }
+
+ ret = commit_one_metadata_packet(metadata_stream);
+ if (ret < 0) {
+ status = SYNC_METADATA_STATUS_ERROR;
+ goto end;
+ } else if (ret > 0) {
+ status = SYNC_METADATA_STATUS_NEW_DATA;
+ } else /* ret == 0 */ {
+ status = SYNC_METADATA_STATUS_NO_DATA;
+ goto end;
+ }
+
+ ret = lttng_ust_ctl_snapshot(metadata_stream->ustream);
+ if (ret < 0) {
+ ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret);
+ status = SYNC_METADATA_STATUS_ERROR;
+ goto end;
+ }
+
+end:
+ return status;