X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.cpp;fp=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.cpp;h=aa443027effe021529816ad6a1ea7ffbd8dc3163;hp=0000000000000000000000000000000000000000;hb=97535efaa975ca52bf02c2d5e76351bfd2e3defa;hpb=7fe0498a9173fca00dcd45a41847e629b70cd941 diff --git a/src/common/kernel-consumer/kernel-consumer.cpp b/src/common/kernel-consumer/kernel-consumer.cpp new file mode 100644 index 000000000..aa443027e --- /dev/null +++ b/src/common/kernel-consumer/kernel-consumer.cpp @@ -0,0 +1,1920 @@ +/* + * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 Mathieu Desnoyers + * Copyright (C) 2017 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#define _LGPL_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kernel-consumer.h" + +extern struct lttng_consumer_global_data the_consumer_data; +extern int consumer_poll_timeout; + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) +{ + int ret = 0; + int infd = stream->wait_fd; + + ret = kernctl_snapshot(infd); + /* + * -EAGAIN is not an error, it just means that there is no data to + * be read. + */ + if (ret != 0 && ret != -EAGAIN) { + PERROR("Getting sub-buffer snapshot."); + } + + return ret; +} + +/* + * Sample consumed and produced positions for a specific fd. + * + * Returns 0 on success, < 0 on error. + */ +int lttng_kconsumer_sample_snapshot_positions( + struct lttng_consumer_stream *stream) +{ + LTTNG_ASSERT(stream); + + return kernctl_snapshot_sample_positions(stream->wait_fd); +} + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_produced(infd, pos); + if (ret != 0) { + PERROR("kernctl_snapshot_get_produced"); + } + + return ret; +} + +/* + * Get the consumerd position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_consumed(infd, pos); + if (ret != 0) { + PERROR("kernctl_snapshot_get_consumed"); + } + + return ret; +} + +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base = (const char *) stream->mmap_base; + + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret < 0) { + PERROR("Failed to get mmap read offset"); + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; +} + +/* + * Take a snapshot of all the stream of a channel + * RCU read-side lock must be held across this function to ensure existence of + * channel. The channel lock must be held by the caller. + * + * Returns 0 on success, < 0 on error + */ +static int lttng_kconsumer_snapshot_channel( + struct lttng_consumer_channel *channel, + uint64_t key, char *path, uint64_t relayd_id, + uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_stream *stream; + + DBG("Kernel consumer snapshot channel %" PRIu64, key); + + rcu_read_lock(); + + /* Splice is not supported yet for channel snapshot. */ + if (channel->output != CONSUMER_CHANNEL_MMAP) { + ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot", + channel->name); + ret = -1; + goto end; + } + + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + unsigned long consumed_pos, produced_pos; + + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + LTTNG_ASSERT(channel->trace_chunk); + if (!lttng_trace_chunk_get(channel->trace_chunk)) { + /* + * Can't happen barring an internal error as the channel + * holds a reference to the trace chunk. + */ + ERR("Failed to acquire reference to channel's trace chunk"); + ret = -1; + goto end_unlock; + } + LTTNG_ASSERT(!stream->trace_chunk); + stream->trace_chunk = channel->trace_chunk; + + /* + * Assign the received relayd ID so we can use it for streaming. The streams + * are not visible to anyone so this is OK to change it. + */ + stream->net_seq_idx = relayd_id; + channel->relayd_id = relayd_id; + if (relayd_id != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(stream, path); + if (ret < 0) { + ERR("sending stream to relayd"); + goto end_unlock; + } + } else { + ret = consumer_stream_create_output_files(stream, + false); + if (ret < 0) { + goto end_unlock; + } + DBG("Kernel consumer snapshot stream (%" PRIu64 ")", + stream->key); + } + + ret = kernctl_buffer_flush_empty(stream->wait_fd); + if (ret < 0) { + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect + * for stream intersection, but required as a + * fall-back when "flush_empty" is not + * implemented by lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end_unlock; + } + goto end_unlock; + } + + ret = lttng_kconsumer_take_snapshot(stream); + if (ret < 0) { + ERR("Taking kernel snapshot"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumerd kernel snapshot position"); + goto end_unlock; + } + + consumed_pos = consumer_get_consume_start_pos(consumed_pos, + produced_pos, nb_packets_per_stream, + stream->max_sb_size); + + while ((long) (consumed_pos - produced_pos) < 0) { + ssize_t read_len; + unsigned long len, padded_len; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; + + health_code_update(); + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); + + ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); + if (ret < 0) { + if (ret != -EAGAIN) { + PERROR("kernctl_get_subbuf snapshot"); + goto end_unlock; + } + DBG("Kernel consumer get subbuf failed. Skipping it."); + consumed_pos += stream->max_sb_size; + stream->chan->lost_packets++; + continue; + } + + ret = kernctl_get_subbuf_size(stream->wait_fd, &len); + if (ret < 0) { + ERR("Snapshot kernctl_get_subbuf_size"); + goto error_put_subbuf; + } + + ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); + if (ret < 0) { + ERR("Snapshot kernctl_get_padded_subbuf_size"); + goto error_put_subbuf; + } + + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); + read_len = lttng_consumer_on_read_subbuffer_mmap( + stream, &subbuf_view, + padded_len - len); + /* + * We write the padded len in local tracefiles but the data len + * when using a relay. Display the error but continue processing + * to try to release the subbuffer. + */ + if (relayd_id != (uint64_t) -1ULL) { + if (read_len != len) { + ERR("Error sending to the relay (ret: %zd != len: %lu)", + read_len, len); + } + } else { + if (read_len != padded_len) { + ERR("Error writing to tracefile (ret: %zd != len: %lu)", + read_len, padded_len); + } + } + + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf"); + goto end_unlock; + } + consumed_pos += stream->max_sb_size; + } + + if (relayd_id == (uint64_t) -1ULL) { + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; + } + } else { + close_relayd_stream(stream); + stream->net_seq_idx = (uint64_t) -1ULL; + } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; + pthread_mutex_unlock(&stream->lock); + } + + /* All good! */ + ret = 0; + goto end; + +error_put_subbuf: + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf error path"); + } +end_unlock: + pthread_mutex_unlock(&stream->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Read the whole metadata available for a snapshot. + * RCU read-side lock must be held across this function to ensure existence of + * metadata_channel. The channel lock must be held by the caller. + * + * Returns 0 on success, < 0 on error + */ +static int lttng_kconsumer_snapshot_metadata( + struct lttng_consumer_channel *metadata_channel, + uint64_t key, char *path, uint64_t relayd_id, + struct lttng_consumer_local_data *ctx) +{ + int ret, use_relayd = 0; + ssize_t ret_read; + struct lttng_consumer_stream *metadata_stream; + + LTTNG_ASSERT(ctx); + + DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", + key, path); + + rcu_read_lock(); + + metadata_stream = metadata_channel->metadata_stream; + LTTNG_ASSERT(metadata_stream); + + pthread_mutex_lock(&metadata_stream->lock); + LTTNG_ASSERT(metadata_channel->trace_chunk); + LTTNG_ASSERT(metadata_stream->trace_chunk); + + /* Flag once that we have a valid relayd for the stream. */ + if (relayd_id != (uint64_t) -1ULL) { + use_relayd = 1; + } + + if (use_relayd) { + ret = consumer_send_relayd_stream(metadata_stream, path); + if (ret < 0) { + goto error_snapshot; + } + } else { + ret = consumer_stream_create_output_files(metadata_stream, + false); + if (ret < 0) { + goto error_snapshot; + } + } + + do { + health_code_update(); + + ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); + if (ret_read < 0) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", + ret_read); + ret = ret_read; + goto error_snapshot; + } + } while (ret_read > 0); + + if (use_relayd) { + close_relayd_stream(metadata_stream); + metadata_stream->net_seq_idx = (uint64_t) -1ULL; + } else { + if (metadata_stream->out_fd >= 0) { + ret = close(metadata_stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ + } + metadata_stream->out_fd = -1; + lttng_trace_chunk_put(metadata_stream->trace_chunk); + metadata_stream->trace_chunk = NULL; + } + } + + ret = 0; +error_snapshot: + pthread_mutex_unlock(&metadata_stream->lock); + cds_list_del(&metadata_stream->send_node); + consumer_stream_destroy(metadata_stream, NULL); + metadata_channel->metadata_stream = NULL; + rcu_read_unlock(); + return ret; +} + +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ +int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll) +{ + int ret_func; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; + struct lttcomm_consumer_msg msg; + + health_code_update(); + + { + ssize_t ret_recv; + + ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); + if (ret_recv != sizeof(msg)) { + if (ret_recv > 0) { + lttng_consumer_send_error(ctx, + LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + ret_recv = -1; + } + return ret_recv; + } + } + + health_code_update(); + + /* Deprecated command */ + LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP); + + health_code_update(); + + /* relayd needs RCU read-side protection */ + rcu_read_lock(); + + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + /* Session daemon status message are handled in the following call. */ + consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); + goto end_nosignal; + } + case LTTNG_CONSUMER_ADD_CHANNEL: + { + struct lttng_consumer_channel *new_channel; + int ret_send_status, ret_add_channel = 0; + const uint64_t chunk_id = msg.u.channel.chunk_id.value; + + health_code_update(); + + /* First send a status message before receiving the fds. */ + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_fatal; + } + + health_code_update(); + + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); + new_channel = consumer_allocate_channel(msg.u.channel.channel_key, + msg.u.channel.session_id, + msg.u.channel.chunk_id.is_set ? + &chunk_id : NULL, + msg.u.channel.pathname, + msg.u.channel.name, + msg.u.channel.relayd_id, msg.u.channel.output, + msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count, 0, + msg.u.channel.monitor, + msg.u.channel.live_timer_interval, + msg.u.channel.is_live, + NULL, NULL); + if (new_channel == NULL) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + switch (msg.u.channel.output) { + case LTTNG_EVENT_SPLICE: + new_channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + new_channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + ERR("Channel output unknown %d", msg.u.channel.output); + goto end_nosignal; + } + + /* Translate and save channel type. */ + switch (msg.u.channel.type) { + case CONSUMER_CHANNEL_TYPE_DATA: + case CONSUMER_CHANNEL_TYPE_METADATA: + new_channel->type = (consumer_channel_type) msg.u.channel.type; + break; + default: + abort(); + goto end_nosignal; + }; + + health_code_update(); + + if (ctx->on_recv_channel != NULL) { + int ret_recv_channel = + ctx->on_recv_channel(new_channel); + if (ret_recv_channel == 0) { + ret_add_channel = consumer_add_channel( + new_channel, ctx); + } else if (ret_recv_channel < 0) { + goto end_nosignal; + } + } else { + ret_add_channel = + consumer_add_channel(new_channel, ctx); + } + if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && + !ret_add_channel) { + int monitor_start_ret; + + DBG("Consumer starting monitor timer"); + consumer_timer_live_start(new_channel, + msg.u.channel.live_timer_interval); + monitor_start_ret = consumer_timer_monitor_start( + new_channel, + msg.u.channel.monitor_timer_interval); + if (monitor_start_ret < 0) { + ERR("Starting channel monitoring timer failed"); + goto end_nosignal; + } + } + + health_code_update(); + + /* If we received an error in add_channel, we need to report it. */ + if (ret_add_channel < 0) { + ret_send_status = consumer_send_status_msg( + sock, ret_add_channel); + if (ret_send_status < 0) { + goto error_fatal; + } + goto end_nosignal; + } + + goto end_nosignal; + } + case LTTNG_CONSUMER_ADD_STREAM: + { + int fd; + struct lttng_pipe *stream_pipe; + struct lttng_consumer_stream *new_stream; + struct lttng_consumer_channel *channel; + int alloc_ret = 0; + int ret_send_status, ret_poll, ret_get_max_subbuf_size; + ssize_t ret_pipe_write, ret_recv; + + /* + * Get stream's channel reference. Needed when adding the stream to the + * global hash table. + */ + channel = consumer_find_channel(msg.u.stream.channel_key); + if (!channel) { + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + /* First send a status message before receiving the fds. */ + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_add_stream_fatal; + } + + health_code_update(); + + if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) { + /* Channel was not found. */ + goto error_add_stream_nosignal; + } + + /* Blocking call */ + health_poll_entry(); + ret_poll = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret_poll) { + goto error_add_stream_fatal; + } + + health_code_update(); + + /* Get stream file descriptor from socket */ + ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret_recv != sizeof(fd)) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + ret_func = ret_recv; + goto end; + } + + health_code_update(); + + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_add_stream_nosignal; + } + + health_code_update(); + + pthread_mutex_lock(&channel->lock); + new_stream = consumer_stream_create( + channel, + channel->key, + fd, + channel->name, + channel->relayd_id, + channel->session_id, + channel->trace_chunk, + msg.u.stream.cpu, + &alloc_ret, + channel->type, + channel->monitor); + if (new_stream == NULL) { + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + } + pthread_mutex_unlock(&channel->lock); + goto error_add_stream_nosignal; + } + + new_stream->wait_fd = fd; + ret_get_max_subbuf_size = kernctl_get_max_subbuf_size( + new_stream->wait_fd, &new_stream->max_sb_size); + if (ret_get_max_subbuf_size < 0) { + pthread_mutex_unlock(&channel->lock); + ERR("Failed to get kernel maximal subbuffer size"); + goto error_add_stream_nosignal; + } + + consumer_stream_update_channel_attributes(new_stream, + channel); + + /* + * We've just assigned the channel to the stream so increment the + * refcount right now. We don't need to increment the refcount for + * streams in no monitor because we handle manually the cleanup of + * those. It is very important to make sure there is NO prior + * consumer_del_stream() calls or else the refcount will be unbalanced. + */ + if (channel->monitor) { + uatomic_inc(&new_stream->chan->refcount); + } + + /* + * The buffer flush is done on the session daemon side for the kernel + * so no need for the stream "hangup_flush_done" variable to be + * tracked. This is important for a kernel stream since we don't rely + * on the flush state of the stream to read data. It's not the case for + * user space tracing. + */ + new_stream->hangup_flush_done = 0; + + health_code_update(); + + pthread_mutex_lock(&new_stream->lock); + if (ctx->on_recv_stream) { + int ret_recv_stream = ctx->on_recv_stream(new_stream); + if (ret_recv_stream < 0) { + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); + consumer_stream_free(new_stream); + goto error_add_stream_nosignal; + } + } + health_code_update(); + + if (new_stream->metadata_flag) { + channel->metadata_stream = new_stream; + } + + /* Do not monitor this stream. */ + if (!channel->monitor) { + DBG("Kernel consumer add stream %s in no monitor mode with " + "relayd id %" PRIu64, new_stream->name, + new_stream->net_seq_idx); + cds_list_add(&new_stream->send_node, &channel->streams.head); + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); + goto end_add_stream; + } + + /* Send stream to relayd if the stream has an ID. */ + if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + int ret_send_relayd_stream; + + ret_send_relayd_stream = consumer_send_relayd_stream( + new_stream, new_stream->chan->pathname); + if (ret_send_relayd_stream < 0) { + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); + consumer_stream_free(new_stream); + goto error_add_stream_nosignal; + } + + /* + * If adding an extra stream to an already + * existing channel (e.g. cpu hotplug), we need + * to send the "streams_sent" command to relayd. + */ + if (channel->streams_sent_to_relayd) { + int ret_send_relayd_streams_sent; + + ret_send_relayd_streams_sent = + consumer_send_relayd_streams_sent( + new_stream->net_seq_idx); + if (ret_send_relayd_streams_sent < 0) { + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); + goto error_add_stream_nosignal; + } + } + } + pthread_mutex_unlock(&new_stream->lock); + pthread_mutex_unlock(&channel->lock); + + /* Get the right pipe where the stream will be sent. */ + if (new_stream->metadata_flag) { + consumer_add_metadata_stream(new_stream); + stream_pipe = ctx->consumer_metadata_pipe; + } else { + consumer_add_data_stream(new_stream); + stream_pipe = ctx->consumer_data_pipe; + } + + /* Visible to other threads */ + new_stream->globally_visible = 1; + + health_code_update(); + + ret_pipe_write = lttng_pipe_write( + stream_pipe, &new_stream, sizeof(new_stream)); + if (ret_pipe_write < 0) { + ERR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + lttng_pipe_get_writefd(stream_pipe)); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } + goto error_add_stream_nosignal; + } + + DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64, + new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id); +end_add_stream: + break; +error_add_stream_nosignal: + goto end_nosignal; +error_add_stream_fatal: + goto error_fatal; + } + case LTTNG_CONSUMER_STREAMS_SENT: + { + struct lttng_consumer_channel *channel; + int ret_send_status; + + /* + * Get stream's channel reference. Needed when adding the stream to the + * global hash table. + */ + channel = consumer_find_channel(msg.u.sent_streams.channel_key); + if (!channel) { + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + ERR("Unable to find channel key %" PRIu64, + msg.u.sent_streams.channel_key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + /* + * Send status code to session daemon. + */ + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0 || + ret_code != LTTCOMM_CONSUMERD_SUCCESS) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_streams_sent_nosignal; + } + + health_code_update(); + + /* + * We should not send this message if we don't monitor the + * streams in this channel. + */ + if (!channel->monitor) { + goto end_error_streams_sent; + } + + health_code_update(); + /* Send stream to relayd if the stream has an ID. */ + if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) { + int ret_send_relay_streams; + + ret_send_relay_streams = consumer_send_relayd_streams_sent( + msg.u.sent_streams.net_seq_idx); + if (ret_send_relay_streams < 0) { + goto error_streams_sent_nosignal; + } + channel->streams_sent_to_relayd = true; + } +end_error_streams_sent: + break; +error_streams_sent_nosignal: + goto end_nosignal; + } + case LTTNG_CONSUMER_UPDATE_STREAM: + { + rcu_read_unlock(); + return -ENOSYS; + } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; + struct consumer_relayd_sock_pair *relayd; + int ret_send_status; + + DBG("Kernel consumer destroying relayd %" PRIu64, index); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(index); + if (relayd == NULL) { + DBG("Unable to find relayd %" PRIu64, index); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; + } + + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + if (relayd) { + consumer_flag_relayd_for_destroy(relayd); + } + + health_code_update(); + + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_fatal; + } + + goto end_nosignal; + } + case LTTNG_CONSUMER_DATA_PENDING: + { + int32_t ret_data_pending; + uint64_t id = msg.u.data_pending.session_id; + ssize_t ret_send; + + DBG("Kernel consumer data pending command for id %" PRIu64, id); + + ret_data_pending = consumer_data_pending(id); + + health_code_update(); + + /* Send back returned value to session daemon */ + ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending, + sizeof(ret_data_pending)); + if (ret_send < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } + + /* + * No need to send back a status message since the data pending + * returned value is the response. + */ + break; + } + case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: + { + struct lttng_consumer_channel *channel; + uint64_t key = msg.u.snapshot_channel.key; + int ret_send_status; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } else { + pthread_mutex_lock(&channel->lock); + if (msg.u.snapshot_channel.metadata == 1) { + int ret_snapshot; + + ret_snapshot = lttng_kconsumer_snapshot_metadata( + channel, key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + ctx); + if (ret_snapshot < 0) { + ERR("Snapshot metadata failed"); + ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; + } + } else { + int ret_snapshot; + + ret_snapshot = lttng_kconsumer_snapshot_channel( + channel, key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel + .nb_packets_per_stream, + ctx); + if (ret_snapshot < 0) { + ERR("Snapshot channel failed"); + ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; + } + } + pthread_mutex_unlock(&channel->lock); + } + health_code_update(); + + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_DESTROY_CHANNEL: + { + uint64_t key = msg.u.destroy_channel.key; + struct lttng_consumer_channel *channel; + int ret_send_status; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer destroy channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_destroy_channel; + } + + health_code_update(); + + /* Stop right now if no channel was found. */ + if (!channel) { + goto end_destroy_channel; + } + + /* + * This command should ONLY be issued for channel with streams set in + * no monitor mode. + */ + LTTNG_ASSERT(!channel->monitor); + + /* + * The refcount should ALWAYS be 0 in the case of a channel in no + * monitor mode. + */ + LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1)); + + consumer_del_channel(channel); +end_destroy_channel: + goto end_nosignal; + } + case LTTNG_CONSUMER_DISCARDED_EVENTS: + { + ssize_t ret; + uint64_t count; + struct lttng_consumer_channel *channel; + uint64_t id = msg.u.discarded_events.session_id; + uint64_t key = msg.u.discarded_events.channel_key; + + DBG("Kernel consumer discarded events command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer discarded events channel %" + PRIu64 " not found", key); + count = 0; + } else { + count = channel->discarded_events; + } + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &count, sizeof(count)); + if (ret < 0) { + PERROR("send discarded events"); + goto error_fatal; + } + + break; + } + case LTTNG_CONSUMER_LOST_PACKETS: + { + ssize_t ret; + uint64_t count; + struct lttng_consumer_channel *channel; + uint64_t id = msg.u.lost_packets.session_id; + uint64_t key = msg.u.lost_packets.channel_key; + + DBG("Kernel consumer lost packets command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer lost packets channel %" + PRIu64 " not found", key); + count = 0; + } else { + count = channel->lost_packets; + } + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &count, sizeof(count)); + if (ret < 0) { + PERROR("send lost packets"); + goto error_fatal; + } + + break; + } + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + { + int channel_monitor_pipe; + int ret_send_status, ret_set_channel_monitor_pipe; + ssize_t ret_recv; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + goto error_fatal; + } + + ret_recv = lttcomm_recv_fds_unix_sock( + sock, &channel_monitor_pipe, 1); + if (ret_recv != sizeof(channel_monitor_pipe)) { + ERR("Failed to receive channel monitor pipe"); + goto error_fatal; + } + + DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); + ret_set_channel_monitor_pipe = + consumer_timer_thread_set_channel_monitor_pipe( + channel_monitor_pipe); + if (!ret_set_channel_monitor_pipe) { + int flags; + int ret_fcntl; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Set the pipe as non-blocking. */ + ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0); + if (ret_fcntl == -1) { + PERROR("fcntl get flags of the channel monitoring pipe"); + goto error_fatal; + } + flags = ret_fcntl; + + ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, + flags | O_NONBLOCK); + if (ret_fcntl == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); + goto error_fatal; + } + DBG("Channel monitor pipe set as non-blocking"); + } else { + ret_code = LTTCOMM_CONSUMERD_ALREADY_SET; + } + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + goto error_fatal; + } + break; + } + case LTTNG_CONSUMER_ROTATE_CHANNEL: + { + struct lttng_consumer_channel *channel; + uint64_t key = msg.u.rotate_channel.key; + int ret_send_status; + + DBG("Consumer rotate channel %" PRIu64, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } else { + /* + * Sample the rotate position of all the streams in this channel. + */ + int ret_rotate_channel; + + ret_rotate_channel = lttng_consumer_rotate_channel( + channel, key, + msg.u.rotate_channel.relayd_id, + msg.u.rotate_channel.metadata, ctx); + if (ret_rotate_channel < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; + } + + health_code_update(); + } + + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_rotate_channel; + } + if (channel) { + /* Rotate the streams that are ready right now. */ + int ret_rotate; + + ret_rotate = lttng_consumer_rotate_ready_streams( + channel, key, ctx); + if (ret_rotate < 0) { + ERR("Rotate ready streams failed"); + } + } + break; +error_rotate_channel: + goto end_nosignal; + } + case LTTNG_CONSUMER_CLEAR_CHANNEL: + { + struct lttng_consumer_channel *channel; + uint64_t key = msg.u.clear_channel.key; + int ret_send_status; + + channel = consumer_find_channel(key); + if (!channel) { + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } else { + int ret_clear_channel; + + ret_clear_channel = + lttng_consumer_clear_channel(channel); + if (ret_clear_channel) { + ERR("Clear channel failed"); + ret_code = (lttcomm_return_code) ret_clear_channel; + } + + health_code_update(); + } + + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + break; + } + case LTTNG_CONSUMER_INIT: + { + int ret_send_status; + + ret_code = lttng_consumer_init_command(ctx, + msg.u.init.sessiond_uuid); + health_code_update(); + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: + { + const struct lttng_credentials credentials = { + .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), + }; + const bool is_local_trace = + !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = + msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = + *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + NULL; + struct lttng_directory_handle *chunk_directory_handle = NULL; + + /* + * The session daemon will only provide a chunk directory file + * descriptor for local traces. + */ + if (is_local_trace) { + int chunk_dirfd; + int ret_send_status; + ssize_t ret_recv; + + /* Acnowledge the reception of the command. */ + ret_send_status = consumer_send_status_msg( + sock, LTTCOMM_CONSUMERD_SUCCESS); + if (ret_send_status < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + ret_recv = lttcomm_recv_fds_unix_sock( + sock, &chunk_dirfd, 1); + if (ret_recv != sizeof(chunk_dirfd)) { + ERR("Failed to receive trace chunk directory file descriptor"); + goto error_fatal; + } + + DBG("Received trace chunk directory fd (%d)", + chunk_dirfd); + chunk_directory_handle = lttng_directory_handle_create_from_dirfd( + chunk_dirfd); + if (!chunk_directory_handle) { + ERR("Failed to initialize chunk directory handle from directory file descriptor"); + if (close(chunk_dirfd)) { + PERROR("Failed to close chunk directory file descriptor"); + } + goto error_fatal; + } + } + + ret_code = lttng_consumer_create_trace_chunk( + !is_local_trace ? &relayd_id : NULL, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk + .creation_timestamp, + chunk_override_name, + msg.u.create_trace_chunk.credentials.is_set ? + &credentials : + NULL, + chunk_directory_handle); + lttng_directory_handle_put(chunk_directory_handle); + goto end_msg_sessiond; + } + case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: + { + enum lttng_trace_chunk_command_type close_command = + (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value; + const uint64_t relayd_id = + msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char path[LTTNG_PATH_MAX]; + ssize_t ret_send; + + ret_code = lttng_consumer_close_trace_chunk( + msg.u.close_trace_chunk.relayd_id.is_set ? + &relayd_id : + NULL, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL, path); + reply.ret_code = ret_code; + reply.path_length = strlen(path) + 1; + ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret_send != sizeof(reply)) { + goto error_fatal; + } + ret_send = lttcomm_send_unix_sock( + sock, path, reply.path_length); + if (ret_send != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; + } + case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: + { + const uint64_t relayd_id = + msg.u.trace_chunk_exists.relayd_id.value; + + ret_code = lttng_consumer_trace_chunk_exists( + msg.u.trace_chunk_exists.relayd_id.is_set ? + &relayd_id : NULL, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); + goto end_msg_sessiond; + } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + WARN("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } + default: + goto end_nosignal; + } + +end_nosignal: + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + ret_func = 1; + goto end; +error_fatal: + /* This will issue a consumer stop. */ + ret_func = -1; + goto end; +end_msg_sessiond: + /* + * The returned value here is not useful since either way we'll return 1 to + * the caller because the session daemon socket management is done + * elsewhere. Returning a negative code or 0 will shutdown the consumer. + */ + { + int ret_send_status; + + ret_send_status = consumer_send_status_msg(sock, ret_code); + if (ret_send_status < 0) { + goto error_fatal; + } + } + + ret_func = 1; + +end: + health_code_update(); + rcu_read_unlock(); + return ret_func; +} + +/* + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + */ +enum sync_metadata_status lttng_kconsumer_sync_metadata( + struct lttng_consumer_stream *metadata) +{ + int ret; + enum sync_metadata_status status; + + LTTNG_ASSERT(metadata); + + ret = kernctl_buffer_flush(metadata->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + status = SYNC_METADATA_STATUS_ERROR; + goto end; + } + + ret = kernctl_snapshot(metadata->wait_fd); + if (ret < 0) { + if (errno == EAGAIN) { + /* No new metadata, exit. */ + DBG("Sync metadata, no new kernel metadata"); + status = SYNC_METADATA_STATUS_NO_DATA; + } else { + ERR("Sync metadata, taking kernel snapshot failed."); + status = SYNC_METADATA_STATUS_ERROR; + } + } else { + status = SYNC_METADATA_STATUS_NEW_DATA; + } + +end: + return status; +} + +static +int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = kernctl_get_subbuf_size( + stream->wait_fd, &subbuf->info.data.subbuf_size); + if (ret) { + goto end; + } + + ret = kernctl_get_padded_subbuf_size( + stream->wait_fd, &subbuf->info.data.padded_subbuf_size); + if (ret) { + goto end; + } + +end: + return ret; +} + +static +int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } + + ret = kernctl_get_metadata_version( + stream->wait_fd, &subbuf->info.metadata.version); + if (ret) { + goto end; + } + +end: + return ret; +} + +static +int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } + + ret = kernctl_get_packet_size( + stream->wait_fd, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); + goto end; + } + + ret = kernctl_get_content_size( + stream->wait_fd, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); + goto end; + } + + ret = kernctl_get_timestamp_begin( + stream->wait_fd, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; + } + + ret = kernctl_get_timestamp_end( + stream->wait_fd, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = kernctl_get_events_discarded( + stream->wait_fd, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = kernctl_get_sequence_number(stream->wait_fd, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); + goto end; + } + } else { + subbuf->info.data.sequence_number.is_set = true; + } + + ret = kernctl_get_stream_id( + stream->wait_fd, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } + + ret = kernctl_get_instance_id(stream->wait_fd, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); + goto end; + } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} + +static +enum get_next_subbuffer_status get_subbuffer_common( + struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + enum get_next_subbuffer_status status; + + ret = kernctl_get_next_subbuf(stream->wait_fd); + switch (ret) { + case 0: + status = GET_NEXT_SUBBUFFER_STATUS_OK; + break; + case -ENODATA: + case -EAGAIN: + /* + * The caller only expects -ENODATA when there is no data to + * read, but the kernel tracer returns -EAGAIN when there is + * currently no data for a non-finalized stream, and -ENODATA + * when there is no data for a finalized stream. Those can be + * combined into a -ENODATA return value. + */ + status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA; + goto end; + default: + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; + goto end; + } + + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; + } +end: + return status; +} + +static +enum get_next_subbuffer_status get_next_subbuffer_splice( + struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const enum get_next_subbuffer_status status = + get_subbuffer_common(stream, subbuffer); + + if (status != GET_NEXT_SUBBUFFER_STATUS_OK) { + goto end; + } + + subbuffer->buffer.fd = stream->wait_fd; +end: + return status; +} + +static +enum get_next_subbuffer_status get_next_subbuffer_mmap( + struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + enum get_next_subbuffer_status status; + const char *addr; + + status = get_subbuffer_common(stream, subbuffer); + if (status != GET_NEXT_SUBBUFFER_STATUS_OK) { + goto end; + } + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); +end: + return status; +} + +static +enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + bool coherent; + enum get_next_subbuffer_status status; + + ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, + &coherent); + if (ret) { + goto end; + } + + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s", + subbuffer->info.metadata.padded_subbuf_size, + coherent ? "true" : "false"); +end: + /* + * The caller only expects -ENODATA when there is no data to read, but + * the kernel tracer returns -EAGAIN when there is currently no data + * for a non-finalized stream, and -ENODATA when there is no data for a + * finalized stream. Those can be combined into a -ENODATA return value. + */ + switch (ret) { + case 0: + status = GET_NEXT_SUBBUFFER_STATUS_OK; + break; + case -ENODATA: + case -EAGAIN: + /* + * The caller only expects -ENODATA when there is no data to + * read, but the kernel tracer returns -EAGAIN when there is + * currently no data for a non-finalized stream, and -ENODATA + * when there is no data for a finalized stream. Those can be + * combined into a -ENODATA return value. + */ + status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA; + break; + default: + status = GET_NEXT_SUBBUFFER_STATUS_ERROR; + break; + } + + return status; +} + +static +int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = kernctl_put_next_subbuf(stream->wait_fd); + + if (ret) { + if (ret == -EFAULT) { + PERROR("Error in unreserving sub buffer"); + } else if (ret == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted"); + } + } + + return ret; +} + +static +bool is_get_next_check_metadata_available(int tracer_fd) +{ + const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL); + const bool available = ret != -ENOTTY; + + if (ret == 0) { + /* get succeeded, make sure to put the subbuffer. */ + kernctl_put_subbuf(tracer_fd); + } + + return available; +} + +static +int signal_metadata(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + ASSERT_LOCKED(stream->metadata_rdv_lock); + return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; +} + +static +int lttng_kconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + int ret = 0; + + if (stream->metadata_flag && stream->chan->is_live) { + DBG("Attempting to enable metadata bucketization for live consumers"); + if (is_get_next_check_metadata_available(stream->wait_fd)) { + DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached"); + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata_check; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } else { + /* + * The kernel tracer version is too old to indicate + * when the metadata stream has reached a "coherent" + * (parseable) point. + * + * This means that a live viewer may see an incoherent + * sequence of metadata and fail to parse it. + */ + WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream"); + metadata_bucket_destroy(stream->metadata_bucket); + stream->metadata_bucket = NULL; + } + + stream->read_subbuffer_ops.on_sleep = signal_metadata; + } + + if (!stream->read_subbuffer_ops.get_next_subbuffer) { + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; + } + } + + if (stream->metadata_flag) { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + } else { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_kernel_index; + } + } + + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; +} + +int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + LTTNG_ASSERT(stream); + + /* + * Don't create anything if this is set for streaming or if there is + * no current trace chunk on the parent channel. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && + stream->chan->trace_chunk) { + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + goto error; + } + } + + if (stream->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + unsigned long mmap_len; + + ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); + if (ret != 0) { + PERROR("kernctl_get_mmap_len"); + goto error_close_fd; + } + stream->mmap_len = (size_t) mmap_len; + + stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ, + MAP_PRIVATE, stream->wait_fd, 0); + if (stream->mmap_base == MAP_FAILED) { + PERROR("Error mmaping"); + ret = -1; + goto error_close_fd; + } + } + + ret = lttng_kconsumer_set_stream_ops(stream); + if (ret) { + goto error_close_fd; + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error_close_fd: + if (stream->out_fd >= 0) { + int err; + + err = close(stream->out_fd); + LTTNG_ASSERT(!err); + stream->out_fd = -1; + } +error: + return ret; +} + +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. + * + * Return 1 if the traced data are still getting read else 0 meaning that the + * data is available for trace viewer reading. + */ +int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) +{ + int ret; + + LTTNG_ASSERT(stream); + + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = kernctl_put_subbuf(stream->wait_fd); + LTTNG_ASSERT(ret == 0); + ret = 1; /* Data is pending */ + goto end; + } + + /* Data is NOT pending and ready to be read. */ + ret = 0; + +end: + return ret; +}