+ PERROR("kernctl_snapshot_get_produced");
+ }
+
+ return ret;
+}
+
+/*
+ * Get the consumerd position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ int ret;
+ int infd = stream->wait_fd;
+
+ ret = kernctl_snapshot_get_consumed(infd, pos);
+ if (ret != 0) {
+ PERROR("kernctl_snapshot_get_consumed");
+ }
+
+ return ret;
+}
+
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+ const char **addr)
+{
+ int ret;
+ unsigned long mmap_offset;
+ const char *mmap_base = stream->mmap_base;
+
+ ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+ if (ret < 0) {
+ PERROR("Failed to get mmap read offset");
+ goto error;
+ }
+
+ *addr = mmap_base + mmap_offset;
+error:
+ return ret;
+}
+
+/*
+ * Take a snapshot of all the stream of a channel
+ * RCU read-side lock must be held across this function to ensure existence of
+ * channel. The channel lock must be held by the caller.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int lttng_kconsumer_snapshot_channel(
+ struct lttng_consumer_channel *channel,
+ uint64_t key, char *path, uint64_t relayd_id,
+ uint64_t nb_packets_per_stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Kernel consumer snapshot channel %" PRIu64, key);
+
+ rcu_read_lock();
+
+ /* Splice is not supported yet for channel snapshot. */
+ if (channel->output != CONSUMER_CHANNEL_MMAP) {
+ ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
+ channel->name);
+ ret = -1;
+ goto end;
+ }
+
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ unsigned long consumed_pos, produced_pos;
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ assert(channel->trace_chunk);
+ if (!lttng_trace_chunk_get(channel->trace_chunk)) {
+ /*
+ * Can't happen barring an internal error as the channel
+ * holds a reference to the trace chunk.
+ */
+ ERR("Failed to acquire reference to channel's trace chunk");
+ ret = -1;
+ goto end_unlock;
+ }
+ assert(!stream->trace_chunk);
+ stream->trace_chunk = channel->trace_chunk;
+
+ /*
+ * Assign the received relayd ID so we can use it for streaming. The streams
+ * are not visible to anyone so this is OK to change it.
+ */
+ stream->net_seq_idx = relayd_id;
+ channel->relayd_id = relayd_id;
+ if (relayd_id != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(stream, path);
+ if (ret < 0) {
+ ERR("sending stream to relayd");
+ goto end_unlock;
+ }
+ } else {
+ ret = consumer_stream_create_output_files(stream,
+ false);
+ if (ret < 0) {
+ goto end_unlock;
+ }
+ DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
+ stream->key);
+ }
+
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
+ if (ret < 0) {
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect
+ * for stream intersection, but required as a
+ * fall-back when "flush_empty" is not
+ * implemented by lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end_unlock;
+ }
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_take_snapshot(stream);
+ if (ret < 0) {
+ ERR("Taking kernel snapshot");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
+ if (ret < 0) {
+ ERR("Produced kernel snapshot position");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumerd kernel snapshot position");
+ goto end_unlock;
+ }
+
+ consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+ produced_pos, nb_packets_per_stream,
+ stream->max_sb_size);
+
+ while ((long) (consumed_pos - produced_pos) < 0) {
+ ssize_t read_len;
+ unsigned long len, padded_len;
+ const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
+
+ health_code_update();
+ DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
+
+ ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
+ if (ret < 0) {
+ if (ret != -EAGAIN) {
+ PERROR("kernctl_get_subbuf snapshot");
+ goto end_unlock;
+ }
+ DBG("Kernel consumer get subbuf failed. Skipping it.");
+ consumed_pos += stream->max_sb_size;
+ stream->chan->lost_packets++;
+ continue;
+ }
+
+ ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_get_subbuf_size");
+ goto error_put_subbuf;
+ }
+
+ ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_get_padded_subbuf_size");
+ goto error_put_subbuf;
+ }
+
+ ret = get_current_subbuf_addr(stream, &subbuf_addr);
+ if (ret) {
+ goto error_put_subbuf;
+ }
+
+ subbuf_view = lttng_buffer_view_init(
+ subbuf_addr, 0, padded_len);
+ read_len = lttng_consumer_on_read_subbuffer_mmap(
+ stream, &subbuf_view,
+ padded_len - len);
+ /*
+ * We write the padded len in local tracefiles but the data len
+ * when using a relay. Display the error but continue processing
+ * to try to release the subbuffer.
+ */
+ if (relayd_id != (uint64_t) -1ULL) {
+ if (read_len != len) {
+ ERR("Error sending to the relay (ret: %zd != len: %lu)",
+ read_len, len);
+ }
+ } else {
+ if (read_len != padded_len) {
+ ERR("Error writing to tracefile (ret: %zd != len: %lu)",
+ read_len, padded_len);
+ }
+ }
+
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf");
+ goto end_unlock;
+ }
+ consumed_pos += stream->max_sb_size;
+ }
+
+ if (relayd_id == (uint64_t) -1ULL) {
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot close out_fd");
+ goto end_unlock;
+ }
+ stream->out_fd = -1;
+ }
+ } else {
+ close_relayd_stream(stream);
+ stream->net_seq_idx = (uint64_t) -1ULL;
+ }
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ /* All good! */
+ ret = 0;
+ goto end;
+
+error_put_subbuf:
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf error path");
+ }
+end_unlock:
+ pthread_mutex_unlock(&stream->lock);
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Read the whole metadata available for a snapshot.
+ * RCU read-side lock must be held across this function to ensure existence of
+ * metadata_channel. The channel lock must be held by the caller.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+static int lttng_kconsumer_snapshot_metadata(
+ struct lttng_consumer_channel *metadata_channel,
+ uint64_t key, char *path, uint64_t relayd_id,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret, use_relayd = 0;
+ ssize_t ret_read;
+ struct lttng_consumer_stream *metadata_stream;
+
+ assert(ctx);
+
+ DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
+ key, path);
+
+ rcu_read_lock();
+
+ metadata_stream = metadata_channel->metadata_stream;
+ assert(metadata_stream);
+
+ pthread_mutex_lock(&metadata_stream->lock);
+ assert(metadata_channel->trace_chunk);
+ assert(metadata_stream->trace_chunk);
+
+ /* Flag once that we have a valid relayd for the stream. */
+ if (relayd_id != (uint64_t) -1ULL) {
+ use_relayd = 1;
+ }
+
+ if (use_relayd) {
+ ret = consumer_send_relayd_stream(metadata_stream, path);
+ if (ret < 0) {
+ goto error_snapshot;
+ }
+ } else {
+ ret = consumer_stream_create_output_files(metadata_stream,
+ false);
+ if (ret < 0) {
+ goto error_snapshot;
+ }
+ }
+
+ do {
+ health_code_update();
+
+ ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
+ if (ret_read < 0) {
+ if (ret_read != -EAGAIN) {
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
+ ret_read);
+ ret = ret_read;
+ goto error_snapshot;
+ }
+ /* ret_read is negative at this point so we will exit the loop. */
+ continue;
+ }
+ } while (ret_read >= 0);
+
+ if (use_relayd) {
+ close_relayd_stream(metadata_stream);
+ metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+ } else {
+ if (metadata_stream->out_fd >= 0) {
+ ret = close(metadata_stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot metadata close out_fd");
+ /*
+ * Don't go on error here since the snapshot was successful at this
+ * point but somehow the close failed.
+ */
+ }
+ metadata_stream->out_fd = -1;
+ lttng_trace_chunk_put(metadata_stream->trace_chunk);
+ metadata_stream->trace_chunk = NULL;
+ }