+/*
+ * Get the consumerd position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ int ret;
+ int infd = stream->wait_fd;
+
+ ret = kernctl_snapshot_get_consumed(infd, pos);
+ if (ret != 0) {
+ errno = -ret;
+ perror("kernctl_snapshot_get_consumed");
+ }
+
+ return ret;
+}
+
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int send_relayd_stream(struct lttng_consumer_stream *stream,
+ char *path)
+{
+ int ret = 0;
+ const char *stream_path;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+ assert(stream->net_seq_idx != -1ULL);
+
+ if (path != NULL) {
+ stream_path = path;
+ } else {
+ stream_path = stream->chan->pathname;
+ }
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ stream_path, &stream->relayd_stream_id,
+ stream->chan->tracefile_size, stream->chan->tracefile_count);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+ uatomic_inc(&relayd->refcount);
+ } else {
+ ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+ stream->key, stream->net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+ stream->name, stream->key, stream->net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+static void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+ struct consumer_relayd_sock_pair *relayd;
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd) {
+ consumer_stream_relayd_close(stream, relayd);
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Take a snapshot of all the stream of a channel
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
+ uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ unsigned long consumed_pos, produced_pos;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Kernel consumer snapshot channel %lu", key);
+
+ rcu_read_lock();
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("No channel found for key %lu", key);
+ ret = -1;
+ goto end;
+ }
+
+ /* Splice is not supported yet for channel snapshot. */
+ if (channel->output != CONSUMER_CHANNEL_MMAP) {
+ ERR("Unsupported output %d", channel->output);
+ ret = -1;
+ goto end;
+ }
+
+ cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
+ no_monitor_node) {
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Assign the received relayd ID so we can use it for streaming. The streams
+ * are not visible to anyone so this is OK to change it.
+ */
+ stream->net_seq_idx = relayd_id;
+ channel->relayd_id = relayd_id;
+ if (relayd_id != (uint64_t) -1ULL) {
+ ret = send_relayd_stream(stream, path);
+ if (ret < 0) {
+ ERR("sending stream to relayd");
+ goto end_unlock;
+ }
+ } else {
+ ret = utils_create_stream_file(path, stream->name,
+ stream->chan->tracefile_size, stream->tracefile_count_current,
+ stream->uid, stream->gid);
+ if (ret < 0) {
+ ERR("utils_create_stream_file");
+ goto end_unlock;
+ }
+
+ stream->out_fd = ret;
+ stream->tracefile_size_current = 0;
+
+ DBG("Kernel consumer snapshot stream %s/%s (%lu)", path,
+ stream->name, stream->key);
+ }
+
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel metadata stream");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_take_snapshot(stream);
+ if (ret < 0) {
+ ERR("Taking kernel snapshot");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
+ if (ret < 0) {
+ ERR("Produced kernel snapshot position");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumerd kernel snapshot position");
+ goto end_unlock;
+ }
+
+ if (stream->max_sb_size == 0) {
+ ret = kernctl_get_max_subbuf_size(stream->wait_fd,
+ &stream->max_sb_size);
+ if (ret < 0) {
+ ERR("Getting kernel max_sb_size");
+ goto end_unlock;
+ }
+ }
+
+ while (consumed_pos < produced_pos) {
+ ssize_t read_len;
+ unsigned long len, padded_len;
+
+ DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
+
+ ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ PERROR("kernctl_get_subbuf snapshot");
+ goto end_unlock;
+ }
+ DBG("Kernel consumer get subbuf failed. Skipping it.");
+ consumed_pos += stream->max_sb_size;
+ continue;
+ }
+
+ ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_get_subbuf_size");
+ goto error_put_subbuf;
+ }
+
+ ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_get_padded_subbuf_size");
+ goto error_put_subbuf;
+ }
+
+ read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+ padded_len - len);
+ /*
+ * We write the padded len in local tracefiles but the data len
+ * when using a relay. Display the error but continue processing
+ * to try to release the subbuffer.
+ */
+ if (relayd_id != (uint64_t) -1ULL) {
+ if (read_len != len) {
+ ERR("Error sending to the relay (ret: %zd != len: %lu)",
+ read_len, len);
+ }
+ } else {
+ if (read_len != padded_len) {
+ ERR("Error writing to tracefile (ret: %zd != len: %lu)",
+ read_len, padded_len);
+ }
+ }
+
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf");
+ goto end_unlock;
+ }
+ consumed_pos += stream->max_sb_size;
+ }
+
+ if (relayd_id == (uint64_t) -1ULL) {
+ ret = close(stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot close out_fd");
+ goto end_unlock;
+ }
+ stream->out_fd = -1;
+ } else {
+ close_relayd_stream(stream);
+ stream->net_seq_idx = (uint64_t) -1ULL;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ /* All good! */
+ ret = 0;
+ goto end;
+
+error_put_subbuf:
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf error path");
+ }
+end_unlock:
+ pthread_mutex_unlock(&stream->lock);
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Read the whole metadata available for a snapshot.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
+ uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+{
+ int ret, use_relayd = 0;
+ ssize_t ret_read;
+ struct lttng_consumer_channel *metadata_channel;
+ struct lttng_consumer_stream *metadata_stream;
+
+ assert(ctx);
+
+ DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
+ key, path);
+
+ rcu_read_lock();
+
+ metadata_channel = consumer_find_channel(key);
+ if (!metadata_channel) {
+ ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
+ ret = -1;
+ goto error;
+ }
+
+ metadata_stream = metadata_channel->metadata_stream;
+ assert(metadata_stream);
+
+ /* Flag once that we have a valid relayd for the stream. */
+ if (relayd_id != (uint64_t) -1ULL) {
+ use_relayd = 1;
+ }
+
+ if (use_relayd) {
+ ret = send_relayd_stream(metadata_stream, path);
+ if (ret < 0) {
+ goto error;
+ }
+ } else {
+ ret = utils_create_stream_file(path, metadata_stream->name,
+ metadata_stream->chan->tracefile_size,
+ metadata_stream->tracefile_count_current,
+ metadata_stream->uid, metadata_stream->gid);
+ if (ret < 0) {
+ goto error;
+ }
+ metadata_stream->out_fd = ret;
+ }
+
+ do {
+ ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ if (ret_read < 0) {
+ if (ret_read != -EPERM) {
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)",
+ ret_read);
+ goto error;
+ }
+ /* ret_read is negative at this point so we will exit the loop. */
+ continue;
+ }
+ } while (ret_read >= 0);
+
+ if (use_relayd) {
+ close_relayd_stream(metadata_stream);
+ metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+ } else {
+ ret = close(metadata_stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot metadata close out_fd");
+ /*
+ * Don't go on error here since the snapshot was successful at this
+ * point but somehow the close failed.
+ */
+ }
+ metadata_stream->out_fd = -1;
+ }
+
+ ret = 0;
+
+error:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */