+ PERROR("Getting sub-buffer snapshot.");
+ }
+
+ return ret;
+}
+
+/*
+ * Sample consumed and produced positions for a specific fd.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_kconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+
+ return kernctl_snapshot_sample_positions(stream->wait_fd);
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ int ret;
+ int infd = stream->wait_fd;
+
+ ret = kernctl_snapshot_get_produced(infd, pos);
+ if (ret != 0) {
+ PERROR("kernctl_snapshot_get_produced");
+ }
+
+ return ret;
+}
+
+/*
+ * Get the consumerd position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ int ret;
+ int infd = stream->wait_fd;
+
+ ret = kernctl_snapshot_get_consumed(infd, pos);
+ if (ret != 0) {
+ PERROR("kernctl_snapshot_get_consumed");
+ }
+
+ return ret;
+}
+
+/*
+ * Take a snapshot of all the stream of a channel
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
+ uint64_t relayd_id, uint64_t nb_packets_per_stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Kernel consumer snapshot channel %" PRIu64, key);
+
+ rcu_read_lock();
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("No channel found for key %" PRIu64, key);
+ ret = -1;
+ goto end;
+ }
+
+ /* Splice is not supported yet for channel snapshot. */
+ if (channel->output != CONSUMER_CHANNEL_MMAP) {
+ ERR("Unsupported output %d", channel->output);
+ ret = -1;
+ goto end;
+ }
+
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ /* Are we at a position _before_ the first available packet ? */
+ bool before_first_packet = true;
+ unsigned long consumed_pos, produced_pos;
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Assign the received relayd ID so we can use it for streaming. The streams
+ * are not visible to anyone so this is OK to change it.
+ */
+ stream->net_seq_idx = relayd_id;
+ channel->relayd_id = relayd_id;
+ if (relayd_id != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(stream, path);
+ if (ret < 0) {
+ ERR("sending stream to relayd");
+ goto end_unlock;
+ }
+ } else {
+ ret = utils_create_stream_file(path, stream->name,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current,
+ stream->uid, stream->gid, NULL);
+ if (ret < 0) {
+ ERR("utils_create_stream_file");
+ goto end_unlock;
+ }
+
+ stream->out_fd = ret;
+ stream->tracefile_size_current = 0;
+
+ DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
+ path, stream->name, stream->key);
+ }
+ if (relayd_id != -1ULL) {
+ ret = consumer_send_relayd_streams_sent(relayd_id);
+ if (ret < 0) {
+ ERR("sending streams sent to relayd");
+ goto end_unlock;
+ }
+ }
+
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_take_snapshot(stream);
+ if (ret < 0) {
+ ERR("Taking kernel snapshot");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
+ if (ret < 0) {
+ ERR("Produced kernel snapshot position");
+ goto end_unlock;
+ }
+
+ ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumerd kernel snapshot position");
+ goto end_unlock;
+ }
+
+ if (stream->max_sb_size == 0) {
+ ret = kernctl_get_max_subbuf_size(stream->wait_fd,
+ &stream->max_sb_size);
+ if (ret < 0) {
+ ERR("Getting kernel max_sb_size");
+ goto end_unlock;
+ }
+ }
+
+ consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+ produced_pos, nb_packets_per_stream,
+ stream->max_sb_size);
+
+ while (consumed_pos < produced_pos) {
+ ssize_t read_len;
+ unsigned long len, padded_len;
+ int lost_packet = 0;
+
+ health_code_update();
+
+ DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
+
+ ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
+ if (ret < 0) {
+ if (ret != -EAGAIN) {
+ PERROR("kernctl_get_subbuf snapshot");
+ goto end_unlock;
+ }
+ DBG("Kernel consumer get subbuf failed. Skipping it.");
+ consumed_pos += stream->max_sb_size;
+
+ /*
+ * Start accounting lost packets only when we
+ * already have extracted packets (to match the
+ * content of the final snapshot).
+ */
+ if (!before_first_packet) {
+ lost_packet = 1;
+ }
+ continue;
+ }
+
+ ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_get_subbuf_size");
+ goto error_put_subbuf;
+ }
+
+ ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_get_padded_subbuf_size");
+ goto error_put_subbuf;
+ }
+
+ read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+ padded_len - len, NULL);
+ /*
+ * We write the padded len in local tracefiles but the data len
+ * when using a relay. Display the error but continue processing
+ * to try to release the subbuffer.
+ */
+ if (relayd_id != (uint64_t) -1ULL) {
+ if (read_len != len) {
+ ERR("Error sending to the relay (ret: %zd != len: %lu)",
+ read_len, len);
+ }
+ } else {
+ if (read_len != padded_len) {
+ ERR("Error writing to tracefile (ret: %zd != len: %lu)",
+ read_len, padded_len);
+ }
+ }
+
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf");
+ goto end_unlock;
+ }
+ consumed_pos += stream->max_sb_size;
+
+ /*
+ * Only account lost packets located between
+ * succesfully extracted packets (do not account before
+ * and after since they are not visible in the
+ * resulting snapshot).
+ */
+ stream->chan->lost_packets += lost_packet;
+ lost_packet = 0;
+ before_first_packet = false;
+ }
+
+ if (relayd_id == (uint64_t) -1ULL) {
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot close out_fd");
+ goto end_unlock;
+ }
+ stream->out_fd = -1;
+ }
+ } else {
+ close_relayd_stream(stream);
+ stream->net_seq_idx = (uint64_t) -1ULL;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ /* All good! */
+ ret = 0;
+ goto end;
+
+error_put_subbuf:
+ ret = kernctl_put_subbuf(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf error path");
+ }
+end_unlock:
+ pthread_mutex_unlock(&stream->lock);
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Read the whole metadata available for a snapshot.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
+ uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+{
+ int ret, use_relayd = 0;
+ ssize_t ret_read;
+ struct lttng_consumer_channel *metadata_channel;
+ struct lttng_consumer_stream *metadata_stream;
+
+ assert(ctx);
+
+ DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
+ key, path);
+
+ rcu_read_lock();
+
+ metadata_channel = consumer_find_channel(key);
+ if (!metadata_channel) {
+ ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
+ ret = -1;
+ goto error;
+ }
+
+ metadata_stream = metadata_channel->metadata_stream;
+ assert(metadata_stream);
+
+ /* Flag once that we have a valid relayd for the stream. */
+ if (relayd_id != (uint64_t) -1ULL) {
+ use_relayd = 1;
+ }
+
+ if (use_relayd) {
+ ret = consumer_send_relayd_stream(metadata_stream, path);
+ if (ret < 0) {
+ goto error;
+ }
+ } else {
+ ret = utils_create_stream_file(path, metadata_stream->name,
+ metadata_stream->chan->tracefile_size,
+ metadata_stream->tracefile_count_current,
+ metadata_stream->uid, metadata_stream->gid, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+ metadata_stream->out_fd = ret;
+ }
+
+ do {
+ health_code_update();
+
+ ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ if (ret_read < 0) {
+ if (ret_read != -EAGAIN) {
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
+ ret_read);
+ goto error;
+ }
+ /* ret_read is negative at this point so we will exit the loop. */
+ continue;
+ }
+ } while (ret_read >= 0);
+
+ if (use_relayd) {
+ close_relayd_stream(metadata_stream);
+ metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+ } else {
+ if (metadata_stream->out_fd >= 0) {
+ ret = close(metadata_stream->out_fd);
+ if (ret < 0) {
+ PERROR("Kernel consumer snapshot metadata close out_fd");
+ /*
+ * Don't go on error here since the snapshot was successful at this
+ * point but somehow the close failed.
+ */
+ }
+ metadata_stream->out_fd = -1;
+ }
+ }
+
+ ret = 0;
+
+ cds_list_del(&metadata_stream->send_node);
+ consumer_stream_destroy(metadata_stream, NULL);
+ metadata_channel->metadata_stream = NULL;
+error:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */
+int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+ int sock, struct pollfd *consumer_sockpoll)
+{
+ ssize_t ret;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttcomm_consumer_msg msg;
+
+ health_code_update();
+
+ ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
+ if (ret != sizeof(msg)) {
+ if (ret > 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ ret = -1;
+ }
+ return ret;
+ }
+
+ health_code_update();
+
+ /* Deprecated command */
+ assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
+
+ health_code_update();
+
+ /* relayd needs RCU read-side protection */
+ rcu_read_lock();
+
+ switch (msg.cmd_type) {
+ case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
+ {
+ /* Session daemon status message are handled in the following call. */
+ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id);
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_ADD_CHANNEL:
+ {
+ struct lttng_consumer_channel *new_channel;
+ int ret_recv;
+
+ health_code_update();
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ health_code_update();
+
+ DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
+ new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
+ msg.u.channel.session_id, msg.u.channel.pathname,
+ msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
+ msg.u.channel.relayd_id, msg.u.channel.output,
+ msg.u.channel.tracefile_size,
+ msg.u.channel.tracefile_count, 0,
+ msg.u.channel.monitor,
+ msg.u.channel.live_timer_interval,
+ NULL, NULL);
+ if (new_channel == NULL) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ goto end_nosignal;
+ }
+ new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+ switch (msg.u.channel.output) {
+ case LTTNG_EVENT_SPLICE:
+ new_channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ new_channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ ERR("Channel output unknown %d", msg.u.channel.output);
+ goto end_nosignal;
+ }
+
+ /* Translate and save channel type. */
+ switch (msg.u.channel.type) {
+ case CONSUMER_CHANNEL_TYPE_DATA:
+ case CONSUMER_CHANNEL_TYPE_METADATA:
+ new_channel->type = msg.u.channel.type;
+ break;
+ default:
+ assert(0);
+ goto end_nosignal;
+ };
+
+ health_code_update();
+
+ if (ctx->on_recv_channel != NULL) {
+ ret_recv = ctx->on_recv_channel(new_channel);
+ if (ret_recv == 0) {
+ ret = consumer_add_channel(new_channel, ctx);
+ } else if (ret_recv < 0) {
+ goto end_nosignal;
+ }
+ } else {
+ ret = consumer_add_channel(new_channel, ctx);
+ }
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ int monitor_start_ret;
+
+ DBG("Consumer starting monitor timer");
+ consumer_timer_live_start(new_channel,
+ msg.u.channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ new_channel,
+ msg.u.channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_nosignal;
+ }
+
+ }
+
+ health_code_update();
+
+ /* If we received an error in add_channel, we need to report it. */
+ if (ret < 0) {
+ ret = consumer_send_status_msg(sock, ret);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ goto end_nosignal;
+ }
+
+ goto end_nosignal;