- long ret = 0;
- loff_t offset = 0;
- off_t orig_offset = stream->out_fd_offset;
- int fd = stream->wait_fd;
- int outfd = stream->out_fd;
-
- while (len > 0) {
- DBG("splice chan to pipe offset %lu (fd : %d)",
- (unsigned long)offset, fd);
- ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
- SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("splice chan to pipe ret %ld", ret);
+ int ret;
+ unsigned long consumed_pos, produced_pos;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Kernel consumer snapshot channel %" PRIu64, key);
+
+ rcu_read_lock();
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("No channel found for key %" PRIu64, key);
+ ret = -1;
+ goto end;
+ }
+
+ /* Splice is not supported yet for channel snapshot. */
+ if (channel->output != CONSUMER_CHANNEL_MMAP) {
+ ERR("Unsupported output %d", channel->output);
+ ret = -1;
+ goto end;
+ }
+
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Assign the received relayd ID so we can use it for streaming. The streams
+ * are not visible to anyone so this is OK to change it.
+ */
+ stream->net_seq_idx = relayd_id;
+ channel->relayd_id = relayd_id;
+ if (relayd_id != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(stream, path);
+ if (ret < 0) {
+ ERR("sending stream to relayd");
+ goto end_unlock;
+ }
+ } else {
+ ret = utils_create_stream_file(path, stream->name,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current,
+ stream->uid, stream->gid, NULL);
+ if (ret < 0) {
+ ERR("utils_create_stream_file");
+ goto end_unlock;
+ }
+
+ stream->out_fd = ret;
+ stream->tracefile_size_current = 0;
+
+ DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
+ path, stream->name, stream->key);
+ }
+ if (relayd_id != -1ULL) {
+ ret = consumer_send_relayd_streams_sent(relayd_id);
+ if (ret < 0) {
+ ERR("sending streams sent to relayd");
+ goto end_unlock;
+ }
+ }
+
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ ret = -errno;
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_take_snapshot(stream);
+ if (ret < 0) {
+ ERR("Taking kernel snapshot");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);