From d4a1283e471c152cedf6cb5ebcc74d509009145d Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Fri, 6 May 2011 13:45:00 -0400 Subject: [PATCH] Initial import of kconsumerd and libkernelctl Import the new kernel consumer along with the kernel tracing control library. kernel-ioctl.h is taken from the lttng-modules tree. Missing: - Send back error to session daemon on splice error - kconsumerd clean tear down on error for both threads Signed-off-by: David Goulet --- .gitignore | 1 + Makefile.am | 1 + configure.ac | 1 + kconsumerd/Makefile.am | 9 + kconsumerd/kconsumerd.c | 861 ++++++++++++++++++++++++ kconsumerd/kconsumerd.h | 44 ++ libkernelctl/kernel-ioctl.h | 14 +- libkernelctl/libkernelctl.c | 18 +- libkernelctl/libkernelctl.h | 1 + liblttsessiondcomm/liblttsessiondcomm.c | 9 + liblttsessiondcomm/liblttsessiondcomm.h | 10 + 11 files changed, 957 insertions(+), 12 deletions(-) create mode 100644 kconsumerd/Makefile.am create mode 100644 kconsumerd/kconsumerd.c create mode 100644 kconsumerd/kconsumerd.h diff --git a/.gitignore b/.gitignore index 82001ff50..6b33473d1 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ libtool ltt-sessiond/ltt-sessiond lttng/lttng +kconsumerd/kconsumerd diff --git a/Makefile.am b/Makefile.am index dce10f836..5c27eabe3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -5,4 +5,5 @@ SUBDIRS = liblttsessiondcomm \ liblttngctl \ lttng \ ltt-sessiond \ + kconsumerd \ tests diff --git a/configure.ac b/configure.ac index 43b2f3f73..5eb64a5f9 100644 --- a/configure.ac +++ b/configure.ac @@ -51,6 +51,7 @@ AC_CONFIG_FILES([ liblttsessiondcomm/Makefile ltt-sessiond/Makefile lttng/Makefile + kconsumerd/Makefile tests/Makefile tests/ltt-sessiond/Makefile ]) diff --git a/kconsumerd/Makefile.am b/kconsumerd/Makefile.am new file mode 100644 index 000000000..f96a55248 --- /dev/null +++ b/kconsumerd/Makefile.am @@ -0,0 +1,9 @@ +AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/liblttsessiondcomm -I$(top_srcdir)/libkernelctl + +bin_PROGRAMS = kconsumerd + +kconsumerd_SOURCES = kconsumerd.c + +kconsumerd_LDADD = \ + $(top_builddir)/liblttsessiondcomm/liblttsessiondcomm.la \ + $(top_builddir)/libkernelctl/libkernelctl.la diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c new file mode 100644 index 000000000..7e166b8f1 --- /dev/null +++ b/kconsumerd/kconsumerd.c @@ -0,0 +1,861 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "lttngerr.h" +#include "libkernelctl.h" +#include "liblttsessiondcomm.h" +#include "kconsumerd.h" + +/* Init the list of FDs */ +static struct ltt_kconsumerd_fd_list kconsumerd_fd_list = { + .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head), +}; + +/* Number of element for the list below. */ +static unsigned int fds_count; + +/* If the local array of FDs needs update in the poll function */ +static unsigned int update_fd_array = 1; + +/* lock the fd array and structures */ +static pthread_mutex_t kconsumerd_lock_fds; + +/* the two threads (receive fd and poll) */ +static pthread_t threads[2]; + +/* communication with splice */ +static int thread_pipe[2]; + +/* socket to communicate errors with sessiond */ +static int error_socket = -1; + +/* Argument variables */ +int opt_quiet; +int opt_verbose; +static int opt_daemon; +static const char *progname; +static char command_sock_path[PATH_MAX]; /* Global command socket path */ +static char error_sock_path[PATH_MAX]; /* Global error path */ + +/* + * cleanup + * + * Cleanup the daemon's socket on exit + */ +static void cleanup() +{ + unlink(command_sock_path); +} + +/* send_error + * + * send return code to ltt-sessiond + */ +static int send_error(enum lttcomm_return_code cmd) +{ + if (error_socket > 0) { + return lttcomm_send_unix_sock(error_socket, &cmd, + sizeof(enum lttcomm_sessiond_command)); + } else { + return 0; + } +} + +/* + * cleanup_kconsumerd_fd + * + * Close the FDs and frees a ltt_kconsumerd_fd struct + */ +static void cleanup_kconsumerd_fd(struct ltt_kconsumerd_fd *lcf) +{ + if (lcf != NULL) { + close(lcf->out_fd); + close(lcf->consumerd_fd); + free(lcf); + lcf = NULL; + } +} + +/* + * add_fd + * + * Add a fd to the global list protected by a mutex + */ +static int add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) +{ + struct ltt_kconsumerd_fd *tmp_fd; + int ret; + + tmp_fd = malloc(sizeof(struct ltt_kconsumerd_fd)); + tmp_fd->sessiond_fd = buf->fd; + tmp_fd->consumerd_fd = consumerd_fd; + tmp_fd->state = buf->state; + tmp_fd->max_sb_size = buf->max_sb_size; + strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); + + /* Opening the tracefile in write mode */ + DBG("Opening %s for writing", tmp_fd->path_name); + ret = open(tmp_fd->path_name, + O_WRONLY|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", tmp_fd->path_name); + perror("open"); + goto end; + } + tmp_fd->out_fd = ret; + tmp_fd->out_fd_offset = 0; + + DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, + tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); + + pthread_mutex_lock(&kconsumerd_lock_fds); + cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head); + fds_count++; + pthread_mutex_unlock(&kconsumerd_lock_fds); + +end: + return ret; +} + +/* + * del_fd + * + * Remove a fd from the global list protected by a mutex + */ +static void del_fd(struct ltt_kconsumerd_fd *lcf) +{ + pthread_mutex_lock(&kconsumerd_lock_fds); + cds_list_del(&lcf->list); + if (fds_count > 0) { + fds_count--; + DBG("Removed ltt_kconsumerd_fd"); + cleanup_kconsumerd_fd(lcf); + } + pthread_mutex_unlock(&kconsumerd_lock_fds); +} + +/* + * close_outfds + * + * Close all fds in the previous fd_list + * Must be used with kconsumerd_lock_fds lock held + */ +static void close_outfds() +{ + struct ltt_kconsumerd_fd *iter; + cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + del_fd(iter); + } +} + +/* + * sighandler + * + * Signal handler for the daemon + */ +static void sighandler(int sig) +{ + /* unblock the threads */ + pthread_cancel(threads[0]); + pthread_cancel(threads[1]); + + close_outfds(); + cleanup(); + + return; +} + +/* + * set_signal_handler + * + * Setup signal handler for : + * SIGINT, SIGTERM, SIGPIPE + */ +static int set_signal_handler(void) +{ + int ret = 0; + struct sigaction sa; + sigset_t sigset; + + if ((ret = sigemptyset(&sigset)) < 0) { + perror("sigemptyset"); + return ret; + } + + sa.sa_handler = sighandler; + sa.sa_mask = sigset; + sa.sa_flags = 0; + if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) { + perror("sigaction"); + return ret; + } + + if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) { + perror("sigaction"); + return ret; + } + + if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { + perror("sigaction"); + return ret; + } + + return ret; +} + +/* + * on_read_subbuffer + * + * Splice the data from the ring buffer to the tracefile. + * Returns the number of bytes spliced + */ +static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd, + unsigned long len) +{ + long ret = 0; + loff_t offset = 0; + off_t orig_offset = kconsumerd_fd->out_fd_offset; + int fd = kconsumerd_fd->consumerd_fd; + int outfd = kconsumerd_fd->out_fd; + + while (len > 0) { + DBG("splice chan to pipe offset %lu (fd : %d)", + (unsigned long)offset, fd); + ret = splice(fd, &offset, thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe ret %ld", ret); + if (ret < 0) { + perror("Error in relay splice"); + goto write_end; + } + + ret = splice(thread_pipe[0], NULL, outfd, NULL, ret, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file %ld", ret); + if (ret < 0) { + perror("Error in file splice"); + goto write_end; + } + if (ret >= len) { + len = 0; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + kconsumerd_fd->out_fd_offset += ret; + } +write_end: + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset >= kconsumerd_fd->max_sb_size) { + sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near + * future after we write it. + * We need to call fadvise again after the file grows because + * the kernel does not seem to apply fadvise to non-existing + * parts of the file. + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not + * well defined. So it can be expected to lead to lower + * throughput in streaming. + */ + posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); + } + return ret; +} + +/* + * read_subbuffer + * + * Consume data on a file descriptor and write it on a trace file + */ +static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd) +{ + unsigned long len; + int err; + long ret = 0; + int infd = kconsumerd_fd->consumerd_fd; + + DBG("In read_subbuffer"); + /* Get the next subbuffer */ + err = kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + + /* read the whole subbuffer */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = on_read_subbuffer(kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile"); + } + + err = kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } + +end: + return ret; +} + +/* + * change_fd_state + * + * Update a fd according to what we just received + */ +static void change_fd_state(int sessiond_fd, + enum lttcomm_kconsumerd_fd_state state) +{ + struct ltt_kconsumerd_fd *iter; + cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + if (iter->sessiond_fd == sessiond_fd) { + iter->state = state; + break; + } + } +} + +/* + * consumerd_recv_fd + * + * Receives an array of file descriptors and the associated + * structures describing each fd (path name). + * Returns the size of received data + */ +static int consumerd_recv_fd(int sfd, int size, + enum lttcomm_consumerd_command cmd_type) +{ + struct msghdr msg; + struct iovec iov[1]; + int ret, i; + struct cmsghdr *cmsg; + int nb_fd; + char tmp[CMSG_SPACE(size)]; + struct lttcomm_kconsumerd_msg *buf; + /* the number of fds we are about to receive */ + nb_fd = size/sizeof(struct lttcomm_kconsumerd_msg); + + buf = malloc(size); + + memset(&msg, 0, sizeof(msg)); + + /* Prepare to receive the structures */ + iov[0].iov_base = buf; + iov[0].iov_len = size; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + msg.msg_control = tmp; + msg.msg_controllen = sizeof(tmp); + + DBG("Waiting to receive fds"); + if ((ret = recvmsg(sfd, &msg, 0)) < 0) { + perror("recvmsg"); + } + if (ret != size) { + ERR("Received only %d, expected %d", ret, size); + send_error(KCONSUMERD_ERROR_RECV_FD); + goto end; + } + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) { + ERR("Invalid control message header"); + ret = -1; + send_error(KCONSUMERD_ERROR_RECV_FD); + goto end; + } + + /* if we received fds */ + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + DBG("Receive : expecting %d fds", nb_fd); + for (i = 0; i < nb_fd; i++) { + switch (cmd_type) { + case LTTCOMM_ADD_STREAM: + DBG("add_fd %s (%d)", buf[i].path_name, ((int *)CMSG_DATA(cmsg))[i]); + ret = add_fd(&buf[i], ((int *)CMSG_DATA(cmsg))[i]); + if (ret < 0) { + send_error(KCONSUMERD_OUTFD_ERROR); + goto end; + } + break; + case LTTCOMM_UPDATE_STREAM: + change_fd_state(buf[i].fd, buf[i].state); + break; + default: + break; + } + } + /* flag to tell the polling thread to update its fd array */ + update_fd_array = 1; + send_error(KCONSUMERD_SUCCESS_RECV_FD); + } else { + ERR("Didn't received any fd"); + send_error(KCONSUMERD_ERROR_RECV_FD); + ret = -1; + goto end; + } + +end: + if (buf != NULL) { + free(buf); + buf = NULL; + } + return ret; +} + +/* + * thread_receive_fds + * + * This thread listens on the consumerd socket and + * receives the file descriptors from ltt-sessiond + */ +static void *thread_receive_fds(void *data) +{ + int sock, client_socket, ret; + struct lttcomm_kconsumerd_header tmp; + + DBG("Creating command socket %s", command_sock_path); + unlink(command_sock_path); + client_socket = lttcomm_create_unix_sock(command_sock_path); + if (client_socket < 0) { + ERR("Cannot create command socket"); + goto error; + } + + ret = lttcomm_listen_unix_sock(client_socket); + if (ret < 0) { + goto error; + } + + DBG("Sending ready command to ltt-sessiond"); + ret = send_error(KCONSUMERD_COMMAND_SOCK_READY); + if (ret < 0) { + ERR("Error sending ready command to ltt-sessiond"); + goto error; + } + + while (1) { + /* Blocking call, waiting for transmission */ + sock = lttcomm_accept_unix_sock(client_socket); + if (sock <= 0) { + continue; + } + + /* We first get the number of fd we are about to receive */ + ret = lttcomm_recv_unix_sock(sock, &tmp, + sizeof(struct lttcomm_kconsumerd_header)); + if (ret < 0) { + ERR("Receiving the lttcomm_kconsumerd_header"); + continue; + } + ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); + if (ret < 0) { + continue; + } + } + +error: + return NULL; +} + +/* + * update_poll_array + * + * Allocate the pollfd structure and the local view of the out fds + * to avoid doing a lookup in the linked list and concurrency issues + * when writing is needed. + * Returns the number of fds in the structures + */ +static int update_poll_array(struct pollfd **pollfd, + struct ltt_kconsumerd_fd **local_kconsumerd_fd) +{ + struct ltt_kconsumerd_fd *iter; + int i = 0; + + if (*pollfd != NULL) { + free(*pollfd); + *pollfd = NULL; + } + + if (*local_kconsumerd_fd != NULL) { + free(*local_kconsumerd_fd); + *local_kconsumerd_fd = NULL; + } + + *pollfd = malloc(fds_count * sizeof(struct pollfd)); + if (*pollfd == NULL) { + perror("pollfd malloc"); + goto error_mem; + } + + *local_kconsumerd_fd = malloc(fds_count * sizeof(struct ltt_kconsumerd_fd)); + if (*local_kconsumerd_fd == NULL) { + perror("local_kconsumerd_fd malloc"); + goto error_mem; + } + + DBG("Updating poll fd array"); + pthread_mutex_lock(&kconsumerd_lock_fds); + + cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + DBG("Inside for each"); + if (iter->state == ACTIVE_FD) { + DBG("Active FD %d", iter->consumerd_fd); + pollfd[i]->fd = iter->consumerd_fd; + pollfd[i]->events = POLLIN | POLLPRI; + local_kconsumerd_fd[i] = iter; + i++; + } else if (iter->state == DELETE_FD) { + del_fd(iter); + } + } + update_fd_array = 0; + pthread_mutex_unlock(&kconsumerd_lock_fds); + return i; + +error_mem: + return -ENOMEM; +} + +/* + * thread_poll_fds + * + * This thread polls the fds in the ltt_fd_list to consume the data + * and write it to tracefile if necessary. + */ +static void *thread_poll_fds(void *data) +{ + int num_rdy, num_hup, high_prio, ret, i; + struct pollfd *pollfd = NULL; + /* local view of the fds */ + struct ltt_kconsumerd_fd *local_kconsumerd_fd = NULL; + /* local view of fds_count */ + int nb_fd = 0; + + ret = pipe(thread_pipe); + if (ret < 0) { + perror("Error creating pipe"); + goto end; + } + + while (1) { + high_prio = 0; + num_hup = 0; + + /* + * the ltt_fd_list has been updated, we need to update our + * local array as well + */ + if (update_fd_array) { + ret = update_poll_array(&pollfd, &local_kconsumerd_fd); + if (ret < 0) { + ERR("Error in allocating pollfd or local_outfds"); + send_error(KCONSUMERD_POLL_ERROR); + goto end; + } + nb_fd = ret; + } + + /* poll on the array of fds */ + DBG("polling on %d fd", nb_fd); + num_rdy = poll(pollfd, nb_fd, POLL_TIMEOUT); + DBG("poll num_rdy : %d", num_rdy); + if (num_rdy == -1) { + perror("Poll error"); + send_error(KCONSUMERD_POLL_ERROR); + goto end; + } + + /* Take care of high priority channels first. */ + for (i = 0; i < nb_fd; i++) { + switch(pollfd[i].revents) { + case POLLERR: + ERR("Error returned in polling fd %d.", pollfd[i].fd); + num_hup++; + send_error(KCONSUMERD_POLL_ERROR); + break; + case POLLHUP: + ERR("Polling fd %d tells it has hung up.", pollfd[i].fd); + num_hup++; + break; + case POLLNVAL: + ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); + send_error(KCONSUMERD_POLL_NVAL); + num_hup++; + break; + case POLLPRI: + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = read_subbuffer(&local_kconsumerd_fd[i]); + /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */ + if (ret == EAGAIN) { + ret = 0; + } + break; + } + } + + /* If every buffer FD has hung up, we end the read loop here */ + if (nb_fd > 0 && num_hup == nb_fd) { + DBG("every buffer FD has hung up\n"); + send_error(KCONSUMERD_POLL_HUP); + continue; + } + + /* Take care of low priority channels. */ + if (!high_prio) { + for (i = 0; i < nb_fd; i++) { + switch(pollfd[i].revents) { + case POLLIN: + DBG("Normal read on fd %d", pollfd[i].fd); + ret = read_subbuffer(&local_kconsumerd_fd[i]); + /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */ + if (ret == EAGAIN) { + ret = 0; + } + break; + } + } + } + } +end: + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_kconsumerd_fd != NULL) { + free(local_kconsumerd_fd); + local_kconsumerd_fd = NULL; + } + return NULL; +} + +/* + * usage function on stderr + */ +static void usage(void) +{ + fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); + fprintf(stderr, " -h, --help " + "Display this usage.\n"); + fprintf(stderr, " -c, --kconsumerd-cmd-sock PATH " + "Specify path for the command socket\n"); + fprintf(stderr, " -e, --kconsumerd-err-sock PATH " + "Specify path for the error socket\n"); + fprintf(stderr, " -d, --daemonize " + "Start as a daemon.\n"); + fprintf(stderr, " -q, --quiet " + "No output at all.\n"); + fprintf(stderr, " -v, --verbose " + "Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -V, --version " + "Show version number.\n"); +} + +/* + * daemon argument parsing + */ +static void parse_args(int argc, char **argv) +{ + int c; + + static struct option long_options[] = { + { "kconsumerd-cmd-sock", 1, 0, 'c' }, + { "kconsumerd-err-sock", 1, 0, 'e' }, + { "daemonize", 0, 0, 'd' }, + { "help", 0, 0, 'h' }, + { "quiet", 0, 0, 'q' }, + { "verbose", 0, 0, 'v' }, + { "version", 0, 0, 'V' }, + { NULL, 0, 0, 0 } + }; + + while (1) { + int option_index = 0; + c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index); + if (c == -1) { + break; + } + + switch (c) { + case 0: + fprintf(stderr, "option %s", long_options[option_index].name); + if (optarg) { + fprintf(stderr, " with arg %s\n", optarg); + } + break; + case 'c': + snprintf(command_sock_path, PATH_MAX, "%s", optarg); + break; + case 'e': + snprintf(error_sock_path, PATH_MAX, "%s", optarg); + break; + case 'd': + opt_daemon = 1; + break; + case 'h': + usage(); + exit(EXIT_FAILURE); + case 'q': + opt_quiet = 1; + break; + case 'v': + opt_verbose = 1; + break; + case 'V': + fprintf(stdout, "%s\n", VERSION); + exit(EXIT_SUCCESS); + default: + usage(); + exit(EXIT_FAILURE); + } + } +} + + +/* + * main + */ +int main(int argc, char **argv) +{ + int i; + int ret = 0; + void *status; + + /* Parse arguments */ + progname = argv[0]; + parse_args(argc, argv); + + /* Daemonize */ + if (opt_daemon) { + ret = daemon(0, 0); + if (ret < 0) { + perror("daemon"); + goto error; + } + } + + if (strlen(command_sock_path) == 0) { + snprintf(command_sock_path, PATH_MAX, + KCONSUMERD_CMD_SOCK_PATH); + } + if (strlen(error_sock_path) == 0) { + snprintf(error_sock_path, PATH_MAX, + KCONSUMERD_ERR_SOCK_PATH); + } + + if (set_signal_handler() < 0) { + goto error; + } + + /* Connect to the socket created by ltt-sessiond to report errors */ + DBG("Connecting to error socket %s", error_sock_path); + error_socket = lttcomm_connect_unix_sock(error_sock_path); + /* not a fatal error, but all communication with ltt-sessiond will fail */ + if (error_socket < 0) { + WARN("Cannot connect to error socket, is ltt-sessiond started ?"); + } + + /* Create the thread to manage the receive of fd */ + ret = pthread_create(&threads[0], NULL, thread_receive_fds, (void *) NULL); + if (ret != 0) { + perror("pthread_create"); + goto error; + } + + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&threads[1], NULL, thread_poll_fds, (void *) NULL); + if (ret != 0) { + perror("pthread_create"); + goto error; + } + + for (i = 0; i < 2; i++) { + ret = pthread_join(threads[i], &status); + if (ret != 0) { + perror("pthread_join"); + goto error; + } + } + ret = EXIT_SUCCESS; + send_error(KCONSUMERD_EXIT_SUCCESS); + goto end; + +error: + ret = EXIT_FAILURE; + send_error(KCONSUMERD_EXIT_FAILURE); + +end: + cleanup(); + + return ret; +} diff --git a/kconsumerd/kconsumerd.h b/kconsumerd/kconsumerd.h new file mode 100644 index 000000000..f5ec288bc --- /dev/null +++ b/kconsumerd/kconsumerd.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTT_KCONSUMERD_H +#define _LTT_KCONSUMERD_H + +/* timeout of 1s on poll to ensure the fd list is updated when needed */ +#define POLL_TIMEOUT 1000 + +struct ltt_kconsumerd_fd_list { + struct cds_list_head head; +}; + +/* + * Internal representation of the FDs, + * sessiond_fd is used to identify uniquely a fd + */ +struct ltt_kconsumerd_fd { + struct cds_list_head list; + int sessiond_fd; /* used to identify uniquely a fd with sessiond */ + int consumerd_fd; /* fd to consume */ + int out_fd; /* output file to write the data */ + off_t out_fd_offset; /* write position in the output file descriptor */ + char path_name[PATH_MAX]; /* tracefile name */ + enum lttcomm_kconsumerd_fd_state state; + unsigned long max_sb_size; /* the subbuffer size for this channel */ +}; + +#endif /* _LTT_KCONSUMERD_H */ diff --git a/libkernelctl/kernel-ioctl.h b/libkernelctl/kernel-ioctl.h index bed6f5934..9aa0407b4 100644 --- a/libkernelctl/kernel-ioctl.h +++ b/libkernelctl/kernel-ioctl.h @@ -47,13 +47,15 @@ #define RING_BUFFER_GET_MMAP_READ_OFFSET _IOR(0xF6, 0x0B, unsigned long) /* Create session LTTNG_SESSION */ -#define KERNEL_IO_CREATE_SESSION _IO(0xF6, 0x40) -#define KERNEL_IO_SESSION_START _IO(0xF6, 0x41) -#define KERNEL_IO_SESSION_STOP _IO(0xF6, 0x42) +#define KERNEL_CREATE_SESSION _IO(0xF6, 0x40) +#define KERNEL_SESSION_START _IO(0xF6, 0x41) +#define KERNEL_SESSION_STOP _IO(0xF6, 0x42) /* Create channel LTTNG_CHANNEL */ -#define KERNEL_IO_CREATE_CHANNEL _IOW(0xF6, 0x43, struct lttng_channel) -#define KERNEL_IO_CREATE_STREAM _IO(0xF6, 0x44) -#define KERNEL_IO_CREATE_EVENT _IOW(0xF6, 0x45, struct lttng_event) +#define KERNEL_CREATE_CHANNEL _IOW(0xF6, 0x43, struct lttng_channel) +#define KERNEL_CREATE_STREAM _IO(0xF6, 0x44) +#define KERNEL_CREATE_EVENT _IOW(0xF6, 0x45, struct lttng_event) + +#define KERNEL_OPEN_METADATA _IOW(0xF6, 0x46, struct lttng_channel) #endif /* _LTT_KERNEL_IOCTL_H */ diff --git a/libkernelctl/libkernelctl.c b/libkernelctl/libkernelctl.c index e99e777c3..e15dd8da6 100644 --- a/libkernelctl/libkernelctl.c +++ b/libkernelctl/libkernelctl.c @@ -25,22 +25,22 @@ int kernctl_create_channel(int fd, struct lttng_channel *chops) { - return ioctl(fd, KERNEL_IO_CREATE_CHANNEL, chops); + return ioctl(fd, KERNEL_CREATE_CHANNEL, chops); } int kernctl_create_event(int fd, struct lttng_event *ev) { - return ioctl(fd, KERNEL_IO_CREATE_EVENT, ev); + return ioctl(fd, KERNEL_CREATE_EVENT, ev); } int kernctl_create_session(int fd) { - return ioctl(fd, KERNEL_IO_CREATE_SESSION); + return ioctl(fd, KERNEL_CREATE_SESSION); } int kernctl_create_stream(int fd) { - return ioctl(fd, KERNEL_IO_CREATE_STREAM); + return ioctl(fd, KERNEL_CREATE_STREAM); } /* returns the maximum size for sub-buffers. */ @@ -85,6 +85,12 @@ int kernctl_get_subbuf_size(int fd, unsigned long *len) return ioctl(fd, RING_BUFFER_GET_SUBBUF_SIZE, len); } +/* open the metadata global channel */ +int kernctl_open_metadata(int fd, struct lttng_channel *chops) +{ + return ioctl(fd, KERNEL_OPEN_METADATA, chops); +} + /* Release exclusive sub-buffer access, move consumer forward. */ int kernctl_put_next_subbuf(int fd) { @@ -117,11 +123,11 @@ int kernctl_snapshot_get_produced(int fd, unsigned long *pos) int kernctl_start_session(int fd) { - return ioctl(fd, KERNEL_IO_SESSION_START); + return ioctl(fd, KERNEL_SESSION_START); } int kernctl_stop_session(int fd) { - return ioctl(fd, KERNEL_IO_SESSION_STOP); + return ioctl(fd, KERNEL_SESSION_STOP); } diff --git a/libkernelctl/libkernelctl.h b/libkernelctl/libkernelctl.h index 2c03e1088..eb0c7800e 100644 --- a/libkernelctl/libkernelctl.h +++ b/libkernelctl/libkernelctl.h @@ -33,6 +33,7 @@ int kernctl_get_next_subbuf(int fd); int kernctl_get_padded_subbuf_size(int fd, unsigned long *len); int kernctl_get_subbuf(int fd, unsigned long *len); int kernctl_get_subbuf_size(int fd, unsigned long *len); +int kernctl_open_metadata(int fd, struct lttng_channel *chops); int kernctl_put_next_subbuf(int fd); int kernctl_put_subbuf(int fd); int kernctl_snapshot(int fd); diff --git a/liblttsessiondcomm/liblttsessiondcomm.c b/liblttsessiondcomm/liblttsessiondcomm.c index 2c46f0027..785d8e827 100644 --- a/liblttsessiondcomm/liblttsessiondcomm.c +++ b/liblttsessiondcomm/liblttsessiondcomm.c @@ -48,6 +48,15 @@ static const char *lttcomm_readable_code[] = { [ LTTCOMM_ERR_INDEX(LTTCOMM_NO_TRACEABLE) ] = "App is not traceable", [ LTTCOMM_ERR_INDEX(LTTCOMM_SELECT_SESS) ] = "A session MUST be selected", [ LTTCOMM_ERR_INDEX(LTTCOMM_EXIST_SESS) ] = "Session name already exist", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_COMMAND_SOCK_READY) ] = "Kconsumerd command socket ready", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_SUCCESS_RECV_FD) ] = "Kconsumerd success on receiving fds", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_ERROR_RECV_FD) ] = "Kconsumerd error on receiving fds", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_ERROR) ] = "Kconsumerd error in polling thread", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_NVAL) ] = "Kconsumerd polling on closed fd", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_HUP) ] = "Kconsumerd all fd hung up", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_EXIT_SUCCESS) ] = "Kconsumerd exiting normally", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_EXIT_FAILURE) ] = "Kconsumerd exiting on error", + [ LTTCOMM_ERR_INDEX(KCONSUMERD_OUTFD_ERROR) ] = "Kconsumerd error opening the tracefile", }; /* diff --git a/liblttsessiondcomm/liblttsessiondcomm.h b/liblttsessiondcomm/liblttsessiondcomm.h index 4a7fdfcc4..8b505eb98 100644 --- a/liblttsessiondcomm/liblttsessiondcomm.h +++ b/liblttsessiondcomm/liblttsessiondcomm.h @@ -94,6 +94,15 @@ enum lttcomm_return_code { LTTCOMM_SELECT_SESS, /* Must select a session */ LTTCOMM_EXIST_SESS, /* Session name already exist */ LTTCOMM_NR, /* Last element */ + KCONSUMERD_COMMAND_SOCK_READY, /* when kconsumerd command socket ready */ + KCONSUMERD_SUCCESS_RECV_FD, /* success on receiving fds */ + KCONSUMERD_ERROR_RECV_FD, /* error on receiving fds */ + KCONSUMERD_POLL_ERROR, /* Error in polling thread in kconsumerd */ + KCONSUMERD_POLL_NVAL, /* Poll on closed fd */ + KCONSUMERD_POLL_HUP, /* All fds have hungup */ + KCONSUMERD_EXIT_SUCCESS, /* kconsumerd exiting normally */ + KCONSUMERD_EXIT_FAILURE, /* kconsumerd exiting on error */ + KCONSUMERD_OUTFD_ERROR, /* error opening the tracefile */ }; /* commands for kconsumerd */ @@ -178,6 +187,7 @@ struct lttcomm_kconsumerd_msg { char path_name[PATH_MAX]; int fd; u32 state; /* enum lttcomm_kconsumerd_fd_state */ + unsigned long max_sb_size; /* the subbuffer size for this channel */ }; extern int lttcomm_create_unix_sock(const char *pathname); -- 2.34.1