X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-kconsumer%2Flttng-kconsumer.c;fp=liblttng-kconsumer%2Flttng-kconsumer.c;h=c4ebb013e315cbbd8fe554bcb042dda348af88f4;hp=0000000000000000000000000000000000000000;hb=3bd1e0819b577ffcb44acd7c2f8e02ff09654b7b;hpb=f84efadf55274918ca038a4e06e0a8af1a320654 diff --git a/liblttng-kconsumer/lttng-kconsumer.c b/liblttng-kconsumer/lttng-kconsumer.c new file mode 100644 index 000000000..c4ebb013e --- /dev/null +++ b/liblttng-kconsumer/lttng-kconsumer.c @@ -0,0 +1,305 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +extern struct lttng_consumer_global_data consumer_data; +extern int consumer_poll_timeout; +extern volatile int consumer_quit; + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written + */ +int lttng_kconsumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + unsigned long mmap_offset; + long ret = 0; + off_t orig_offset = stream->out_fd_offset; + int fd = stream->wait_fd; + int outfd = stream->out_fd; + + /* get the offset inside the fd to mmap */ + ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); + if (ret != 0) { + ret = -errno; + perror("kernctl_get_mmap_read_offset"); + goto end; + } + + while (len > 0) { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + if (ret >= len) { + len = 0; + } else if (ret < 0) { + ret = -errno; + perror("Error in file write"); + goto end; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + + lttng_consumer_sync_trace_file(stream, orig_offset); + + goto end; + +end: + return ret; +} + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +int lttng_kconsumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + long ret = 0; + loff_t offset = 0; + off_t orig_offset = stream->out_fd_offset; + int fd = stream->wait_fd; + int outfd = stream->out_fd; + + while (len > 0) { + DBG("splice chan to pipe offset %lu (fd : %d)", + (unsigned long)offset, fd); + ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe ret %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in relay splice"); + goto splice_error; + } + + ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in file splice"); + goto splice_error; + } + len -= ret; + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + lttng_consumer_sync_trace_file(stream, orig_offset); + + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch(ret) { + case EBADF: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE); + break; + } + +end: + return ret; +} + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret = 0; + int infd = stream->wait_fd; + + ret = kernctl_snapshot(infd); + if (ret != 0) { + ret = errno; + perror("Getting sub-buffer snapshot."); + } + + return ret; +} + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_produced(infd, pos); + if (ret != 0) { + ret = errno; + perror("kernctl_snapshot_get_produced"); + } + + return ret; +} + +int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll) +{ + ssize_t ret; + struct lttcomm_consumer_msg msg; + + ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); + if (ret != sizeof(msg)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + return -ENOENT; + } + + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_CHANNEL: + { + struct lttng_consumer_channel *new_channel; + + DBG("consumer_add_channel %d", msg.u.channel.channel_key); + new_channel = consumer_allocate_channel(msg.u.channel.channel_key, + -1, -1, + msg.u.channel.mmap_len, + msg.u.channel.max_sb_size); + if (new_channel == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + if (ctx->on_recv_channel != NULL) { + ret = ctx->on_recv_channel(new_channel); + if (ret == 0) { + consumer_add_channel(new_channel); + } else if (ret < 0) { + goto end_nosignal; + } + } else { + consumer_add_channel(new_channel); + } + goto end_nosignal; + } + case LTTNG_CONSUMER_ADD_STREAM: + { + struct lttng_consumer_stream *new_stream; + int fds[1]; + size_t nb_fd = 1; + + /* block */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); + if (ret != sizeof(fds)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + if (nb_fd < 2) + fds[1] = fds[0]; /* duplicate same fd if recv only one */ + + DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, + fds[0], fds[1]); + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, + msg.u.stream.stream_key, + fds[0], fds[1], + msg.u.stream.state, + msg.u.stream.mmap_len, + msg.u.stream.output, + msg.u.stream.path_name); + if (new_stream == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end; + } + if (ctx->on_recv_stream != NULL) { + ret = ctx->on_recv_stream(new_stream); + if (ret == 0) { + consumer_add_stream(new_stream); + } else if (ret < 0) { + goto end; + } + } else { + consumer_add_stream(new_stream); + } + break; + } + case LTTNG_CONSUMER_UPDATE_STREAM: + { + if (ctx->on_update_stream != NULL) { + ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); + if (ret == 0) { + consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); + } else if (ret < 0) { + goto end; + } + } else { + consumer_change_stream_state(msg.u.stream.stream_key, + msg.u.stream.state); + } + break; + } + default: + break; + } +end: + /* signal the poll thread */ + ret = write(ctx->consumer_poll_pipe[1], "4", 1); + if (ret < 0) { + perror("write consumer poll"); + } +end_nosignal: + return 0; +}