+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
+ uint64_t session_id,
+ const char *pathname,
+ const char *name,
+ uid_t uid,
+ gid_t gid,
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval)
+{
+ struct lttng_consumer_channel *channel;
+
+ channel = zmalloc(sizeof(*channel));
+ if (channel == NULL) {
+ PERROR("malloc struct lttng_consumer_channel");
+ goto end;
+ }
+
+ channel->key = key;
+ channel->refcount = 0;
+ channel->session_id = session_id;
+ channel->session_id_per_pid = session_id_per_pid;
+ channel->uid = uid;
+ channel->gid = gid;
+ channel->relayd_id = relayd_id;
+ channel->output = output;
+ channel->tracefile_size = tracefile_size;
+ channel->tracefile_count = tracefile_count;
+ channel->monitor = monitor;
+ channel->live_timer_interval = live_timer_interval;
+ pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
+
+ /*
+ * In monitor mode, the streams associated with the channel will be put in
+ * a special list ONLY owned by this channel. So, the refcount is set to 1
+ * here meaning that the channel itself has streams that are referenced.
+ *
+ * On a channel deletion, once the channel is no longer visible, the
+ * refcount is decremented and checked for a zero value to delete it. With
+ * streams in no monitor mode, it will now be safe to destroy the channel.
+ */
+ if (!channel->monitor) {
+ channel->refcount = 1;
+ }
+
+ strncpy(channel->pathname, pathname, sizeof(channel->pathname));
+ channel->pathname[sizeof(channel->pathname) - 1] = '\0';
+
+ strncpy(channel->name, name, sizeof(channel->name));
+ channel->name[sizeof(channel->name) - 1] = '\0';
+
+ lttng_ht_node_init_u64(&channel->node, channel->key);
+
+ channel->wait_fd = -1;
+
+ CDS_INIT_LIST_HEAD(&channel->streams.head);
+
+ DBG("Allocated channel (key %" PRIu64 ")", channel->key)
+
+end:
+ return channel;
+}
+
+/*
+ * Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
+ */
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+ struct lttng_ht_node_u64 *node;
+ struct lttng_ht_iter iter;
+
+ pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
+ rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node != NULL) {
+ /* Channel already exist. Ignore the insertion */
+ ERR("Consumer add channel key %" PRIu64 " already exists!",
+ channel->key);
+ ret = -EEXIST;
+ goto end;
+ }
+
+ lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
+
+end:
+ rcu_read_unlock();
+ pthread_mutex_unlock(&channel->timer_lock);
+ pthread_mutex_unlock(&channel->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
+
+ if (!ret && channel->wait_fd != -1 &&
+ channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+ notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
+ }
+ return ret;
+}
+
+/*
+ * Allocate the pollfd structure and the local view of the out fds to avoid
+ * doing a lookup in the linked list and concurrency issues when writing is
+ * needed. Called with consumer_data.lock held.
+ *
+ * Returns the number of fds in the structures.
+ */
+static int update_poll_array(struct lttng_consumer_local_data *ctx,
+ struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *ht)
+{
+ int i = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ assert(ctx);
+ assert(ht);
+ assert(pollfd);
+ assert(local_stream);
+
+ DBG("Updating poll fd array");
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ */
+ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
+ stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ continue;
+ }
+ /*
+ * This clobbers way too much the debug output. Uncomment that if you
+ * need it for debugging purposes.
+ *
+ * DBG("Active FD %d", stream->wait_fd);
+ */
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
+ }
+ rcu_read_unlock();
+
+ /*
+ * Insert the consumer_data_pipe at the end of the array and don't
+ * increment i so nb_fd is the number of real FD.
+ */
+ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ return i;
+}
+
+/*
+ * Poll on the should_quit pipe and the command socket return -1 on error and
+ * should exit, 0 if data is available on the command socket
+ */
+int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
+{
+ int num_rdy;
+
+restart:
+ num_rdy = poll(consumer_sockpoll, 2, -1);
+ if (num_rdy == -1) {
+ /*
+ * Restart interrupted system call.
+ */
+ if (errno == EINTR) {
+ goto restart;
+ }
+ PERROR("Poll error");
+ goto exit;
+ }
+ if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
+ DBG("consumer_should_quit wake up");
+ goto exit;
+ }
+ return 0;
+
+exit:
+ return -1;
+}
+
+/*
+ * Set the error socket.
+ */
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
+ int sock)
+{
+ ctx->consumer_error_socket = sock;
+}
+
+/*
+ * Set the command socket path.
+ */
+void lttng_consumer_set_command_sock_path(
+ struct lttng_consumer_local_data *ctx, char *sock)
+{
+ ctx->consumer_command_sock_path = sock;
+}
+
+/*
+ * Send return code to the session daemon.
+ * If the socket is not defined, we return 0, it is not a fatal error
+ */
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+{
+ if (ctx->consumer_error_socket > 0) {
+ return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
+ sizeof(enum lttcomm_sessiond_command));
+ }
+
+ return 0;
+}
+
+/*
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
+ */
+void lttng_consumer_cleanup(void)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+ node.node) {
+ consumer_del_channel(channel);
+ }
+
+ rcu_read_unlock();
+
+ lttng_ht_destroy(consumer_data.channel_ht);
+
+ cleanup_relayd_ht();
+
+ lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
+
+ /*
+ * This HT contains streams that are freed by either the metadata thread or
+ * the data thread so we do *nothing* on the hash table and simply destroy
+ * it.
+ */
+ lttng_ht_destroy(consumer_data.stream_list_ht);
+}
+
+/*
+ * Called from signal handler.
+ */
+void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ consumer_quit = 1;
+ do {
+ ret = write(ctx->consumer_should_quit[1], "4", 1);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != 1) {
+ PERROR("write consumer quit");
+ }
+
+ DBG("Consumer flag that it should quit");
+}
+
+void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
+ off_t orig_offset)
+{
+ int outfd = stream->out_fd;
+
+ /*
+ * This does a blocking write-and-wait on any page that belongs to the
+ * subbuffer prior to the one we just wrote.
+ * Don't care about error values, as these are just hints and ways to
+ * limit the amount of page cache used.
+ */
+ if (orig_offset < stream->max_sb_size) {
+ return;
+ }
+ lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size,
+ SYNC_FILE_RANGE_WAIT_BEFORE
+ | SYNC_FILE_RANGE_WRITE
+ | SYNC_FILE_RANGE_WAIT_AFTER);
+ /*
+ * Give hints to the kernel about how we access the file:
+ * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
+ * we write it.
+ *
+ * We need to call fadvise again after the file grows because the
+ * kernel does not seem to apply fadvise to non-existing parts of the
+ * file.
+ *
+ * Call fadvise _after_ having waited for the page writeback to
+ * complete because the dirty page writeback semantic is not well
+ * defined. So it can be expected to lead to lower throughput in
+ * streaming.
+ */
+ posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size, POSIX_FADV_DONTNEED);
+}
+
+/*
+ * Initialise the necessary environnement :
+ * - create a new context
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
+ * - create the thread pipe (for splice)
+ *
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ *
+ * Returns a pointer to the new context or NULL on error.
+ */
+struct lttng_consumer_local_data *lttng_consumer_create(
+ enum lttng_consumer_type type,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx),
+ int (*recv_channel)(struct lttng_consumer_channel *channel),
+ int (*recv_stream)(struct lttng_consumer_stream *stream),
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
+{
+ int ret;
+ struct lttng_consumer_local_data *ctx;
+
+ assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
+ consumer_data.type == type);
+ consumer_data.type = type;
+
+ ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
+ if (ctx == NULL) {
+ PERROR("allocating context");
+ goto error;
+ }
+
+ ctx->consumer_error_socket = -1;
+ ctx->consumer_metadata_socket = -1;
+ pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
+ /* assign the callbacks */
+ ctx->on_buffer_ready = buffer_ready;
+ ctx->on_recv_channel = recv_channel;
+ ctx->on_recv_stream = recv_stream;
+ ctx->on_update_stream = update_stream;
+
+ ctx->consumer_data_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_data_pipe) {
+ goto error_poll_pipe;
+ }
+
+ ret = pipe(ctx->consumer_should_quit);
+ if (ret < 0) {
+ PERROR("Error creating recv pipe");
+ goto error_quit_pipe;
+ }
+
+ ret = pipe(ctx->consumer_thread_pipe);
+ if (ret < 0) {
+ PERROR("Error creating thread pipe");
+ goto error_thread_pipe;
+ }
+
+ ret = pipe(ctx->consumer_channel_pipe);
+ if (ret < 0) {
+ PERROR("Error creating channel pipe");
+ goto error_channel_pipe;
+ }
+
+ ctx->consumer_metadata_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_metadata_pipe) {
+ goto error_metadata_pipe;
+ }
+
+ ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
+ if (ret < 0) {
+ goto error_splice_pipe;
+ }
+
+ return ctx;
+
+error_splice_pipe:
+ lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+error_metadata_pipe:
+ utils_close_pipe(ctx->consumer_channel_pipe);
+error_channel_pipe:
+ utils_close_pipe(ctx->consumer_thread_pipe);
+error_thread_pipe:
+ utils_close_pipe(ctx->consumer_should_quit);
+error_quit_pipe:
+ lttng_pipe_destroy(ctx->consumer_data_pipe);
+error_poll_pipe:
+ free(ctx);
+error:
+ return NULL;
+}
+
+/*
+ * Close all fds associated with the instance and free the context.
+ */
+void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+
+ DBG("Consumer destroying it. Closing everything.");
+
+ ret = close(ctx->consumer_error_socket);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_metadata_socket);
+ if (ret) {
+ PERROR("close");
+ }
+ utils_close_pipe(ctx->consumer_thread_pipe);
+ utils_close_pipe(ctx->consumer_channel_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_pipe);
+ lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+ utils_close_pipe(ctx->consumer_should_quit);
+ utils_close_pipe(ctx->consumer_splice_metadata_pipe);
+
+ unlink(ctx->consumer_command_sock_path);
+ free(ctx);
+}
+
+/*
+ * Write the metadata stream id on the specified file descriptor.
+ */
+static int write_relayd_metadata_id(int fd,
+ struct lttng_consumer_stream *stream,
+ struct consumer_relayd_sock_pair *relayd, unsigned long padding)
+{
+ int ret;
+ struct lttcomm_relayd_metadata_payload hdr;
+
+ hdr.stream_id = htobe64(stream->relayd_stream_id);
+ hdr.padding_size = htobe32(padding);
+ do {
+ ret = write(fd, (void *) &hdr, sizeof(hdr));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != sizeof(hdr)) {
+ /*
+ * This error means that the fd's end is closed so ignore the perror
+ * not to clubber the error output since this can happen in a normal
+ * code path.
+ */
+ if (errno != EPIPE) {
+ PERROR("write metadata stream id");
+ }
+ DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
+ /*
+ * Set ret to a negative value because if ret != sizeof(hdr), we don't
+ * handle writting the missing part so report that as an error and
+ * don't lie to the caller.
+ */
+ ret = -1;
+ goto end;
+ }
+ DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
+ stream->relayd_stream_id, padding);
+
+end:
+ return ret;
+}
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile. This is a
+ * core function for writing trace buffers to either the local filesystem or
+ * the network.
+ *
+ * It must be called with the stream lock held.
+ *
+ * Careful review MUST be put if any changes occur!
+ *
+ * Returns the number of bytes written
+ */
+ssize_t lttng_consumer_on_read_subbuffer_mmap(
+ struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding,
+ struct lttng_packet_index *index)
+{
+ unsigned long mmap_offset;
+ void *mmap_base;
+ ssize_t ret = 0, written = 0;
+ off_t orig_offset = stream->out_fd_offset;
+ /* Default is on the disk */
+ int outfd = stream->out_fd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ unsigned int relayd_hang_up = 0;
+
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
+ /* Flag that the current stream if set for network streaming. */
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd == NULL) {
+ ret = -EPIPE;
+ goto end;
+ }
+ }
+
+ /* get the offset inside the fd to mmap */
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ mmap_base = stream->mmap_base;
+ ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+ if (ret != 0) {
+ PERROR("tracer ctl get_mmap_read_offset");
+ written = -errno;
+ goto end;
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ mmap_base = lttng_ustctl_get_mmap_base(stream);
+ if (!mmap_base) {
+ ERR("read mmap get mmap base for stream %s", stream->name);
+ written = -EPERM;
+ goto end;
+ }
+ ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
+ if (ret != 0) {
+ PERROR("tracer ctl get_mmap_read_offset");
+ written = ret;
+ goto end;
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
+ /* Handle stream on the relayd if the output is on the network */
+ if (relayd) {
+ unsigned long netlen = len;
+
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ if (stream->metadata_flag) {
+ /* Metadata requires the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ netlen += sizeof(struct lttcomm_relayd_metadata_payload);
+ }
+
+ ret = write_relayd_stream_header(stream, netlen, padding, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+
+ /* Write metadata stream id before payload */
+ if (stream->metadata_flag) {
+ ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+ if (ret < 0) {
+ written = ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EPIPE || ret == -EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ }
+ }
+ } else {
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EPIPE || ret == -EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ /* Else, use the default set before which is the filesystem. */
+ }
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
+
+ /*
+ * Check if we need to change the tracefile before writing the packet.
+ */
+ if (stream->chan->tracefile_size > 0 &&
+ (stream->tracefile_size_current + len) >
+ stream->chan->tracefile_size) {
+ ret = utils_rotate_stream_file(stream->chan->pathname,
+ stream->name, stream->chan->tracefile_size,
+ stream->chan->tracefile_count, stream->uid, stream->gid,
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
+ if (ret < 0) {
+ ERR("Rotating output file");
+ goto end;
+ }
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
+ }
+ stream->tracefile_size_current += len;
+ if (index) {
+ index->offset = htobe64(stream->out_fd_offset);
+ }
+ }
+
+ while (len > 0) {
+ do {
+ ret = write(outfd, mmap_base + mmap_offset, len);
+ } while (ret < 0 && errno == EINTR);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+ if (ret < 0) {
+ /*
+ * This is possible if the fd is closed on the other side (outfd)
+ * or any write problem. It can be verbose a bit for a normal
+ * execution if for instance the relayd is stopped abruptly. This
+ * can happen so set this to a DBG statement.
+ */
+ DBG("Error in file write mmap");
+ if (written == 0) {
+ written = -errno;
+ }
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EPIPE || errno == EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ } else if (ret > len) {
+ PERROR("Error in file write (ret %zd > len %lu)", ret, len);
+ written += ret;
+ goto end;
+ } else {
+ len -= ret;
+ mmap_offset += ret;
+ }
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += ret;
+ }
+ stream->output_written += ret;
+ written += ret;
+ }
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+
+write_error:
+ /*
+ * This is a special case that the relayd has closed its socket. Let's
+ * cleanup the relayd object and all associated streams.
+ */
+ if (relayd && relayd_hang_up) {
+ cleanup_relayd(relayd, ctx);
+ }
+
+end:
+ /* Unlock only if ctrl socket used */
+ if (relayd && stream->metadata_flag) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+
+ rcu_read_unlock();
+ return written;
+}
+
+/*
+ * Splice the data from the ring buffer to the tracefile.
+ *
+ * It must be called with the stream lock held.
+ *
+ * Returns the number of bytes spliced.
+ */
+ssize_t lttng_consumer_on_read_subbuffer_splice(
+ struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding,
+ struct lttng_packet_index *index)
+{
+ ssize_t ret = 0, written = 0, ret_splice = 0;
+ loff_t offset = 0;
+ off_t orig_offset = stream->out_fd_offset;
+ int fd = stream->wait_fd;
+ /* Default is on the disk */
+ int outfd = stream->out_fd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ int *splice_pipe;
+ unsigned int relayd_hang_up = 0;
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Not supported for user space tracing */
+ return -ENOSYS;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
+ /* Flag that the current stream if set for network streaming. */
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd == NULL) {
+ ret = -EPIPE;
+ goto end;
+ }
+ }
+
+ /*
+ * Choose right pipe for splice. Metadata and trace data are handled by
+ * different threads hence the use of two pipes in order not to race or
+ * corrupt the written data.
+ */
+ if (stream->metadata_flag) {
+ splice_pipe = ctx->consumer_splice_metadata_pipe;
+ } else {
+ splice_pipe = ctx->consumer_thread_pipe;
+ }
+
+ /* Write metadata stream id before payload */
+ if (relayd) {
+ int total_len = len;
+
+ if (stream->metadata_flag) {
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+ ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
+ padding);
+ if (ret < 0) {
+ written = ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EBADF) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ }
+
+ total_len += sizeof(struct lttcomm_relayd_metadata_payload);
+ }
+
+ ret = write_relayd_stream_header(stream, total_len, padding, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+ } else {
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EBADF) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ goto end;
+ }
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
+
+ /*
+ * Check if we need to change the tracefile before writing the packet.
+ */
+ if (stream->chan->tracefile_size > 0 &&
+ (stream->tracefile_size_current + len) >
+ stream->chan->tracefile_size) {
+ ret = utils_rotate_stream_file(stream->chan->pathname,
+ stream->name, stream->chan->tracefile_size,
+ stream->chan->tracefile_count, stream->uid, stream->gid,
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
+ if (ret < 0) {
+ ERR("Rotating output file");
+ goto end;
+ }
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
+ }
+ stream->tracefile_size_current += len;
+ index->offset = htobe64(stream->out_fd_offset);
+ }
+
+ while (len > 0) {
+ DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
+ (unsigned long)offset, len, fd, splice_pipe[1]);
+ ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
+ SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("splice chan to pipe, ret %zd", ret_splice);
+ if (ret_splice < 0) {
+ PERROR("Error in relay splice");
+ if (written == 0) {
+ written = ret_splice;
+ }
+ ret = errno;
+ goto splice_error;
+ }
+
+ /* Handle stream on the relayd if the output is on the network */
+ if (relayd) {
+ if (stream->metadata_flag) {
+ size_t metadata_payload_size =
+ sizeof(struct lttcomm_relayd_metadata_payload);
+
+ /* Update counter to fit the spliced data */
+ ret_splice += metadata_payload_size;
+ len += metadata_payload_size;
+ /*
+ * We do this so the return value can match the len passed as
+ * argument to this function.
+ */
+ written -= metadata_payload_size;
+ }
+ }
+
+ /* Splice data out */
+ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
+ ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+ if (ret_splice < 0) {
+ PERROR("Error in file splice");
+ if (written == 0) {
+ written = ret_splice;
+ }
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EBADF || errno == EPIPE) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ ret = errno;
+ goto splice_error;
+ } else if (ret_splice > len) {
+ errno = EINVAL;
+ PERROR("Wrote more data than requested %zd (len: %lu)",
+ ret_splice, len);
+ written += ret_splice;
+ ret = errno;
+ goto splice_error;
+ }
+ len -= ret_splice;
+
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += ret_splice;
+ }
+ stream->output_written += ret_splice;
+ written += ret_splice;
+ }
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+
+ ret = ret_splice;
+
+ goto end;
+
+write_error:
+ /*
+ * This is a special case that the relayd has closed its socket. Let's
+ * cleanup the relayd object and all associated streams.
+ */
+ if (relayd && relayd_hang_up) {
+ cleanup_relayd(relayd, ctx);
+ /* Skip splice error so the consumer does not fail */
+ goto end;
+ }
+
+splice_error:
+ /* send the appropriate error description to sessiond */
+ switch (ret) {
+ case EINVAL:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
+ break;
+ case ENOMEM:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
+ break;
+ case ESPIPE:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
+ break;
+ }
+
+end:
+ if (relayd && stream->metadata_flag) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+
+ rcu_read_unlock();
+ return written;
+}
+
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_take_snapshot(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_take_snapshot(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_get_produced_snapshot(stream, pos);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_get_produced_snapshot(stream, pos);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
+ int sock, struct pollfd *consumer_sockpoll)