From 3bd1e0819b577ffcb44acd7c2f8e02ff09654b7b Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Sat, 22 Oct 2011 21:06:11 -0400 Subject: [PATCH] UST 2.0 support Refactoring of the consumer/sessiond interaction so the consumer supports applications instrumented with libust (UST 2.0). At this point, more testing of interaction between libust and sessiond is required. Signed-off-by: Mathieu Desnoyers --- Makefile.am | 7 +- README | 10 +- configure.ac | 24 +- include/Makefile.am | 7 +- .../{ltt-kconsumerd.h => lttng-consumerd.h} | 11 +- include/lttng-kernel-ctl.h | 6 +- include/lttng-sessiond-comm.h | 84 +- include/lttng-ust.h | 109 -- include/lttng/lttng-consumer.h | 297 +++++ include/lttng/lttng-kconsumer.h | 64 ++ include/lttng/lttng-kconsumerd.h | 229 ---- include/lttng/lttng-ustconsumer.h | 132 +++ include/lttng/lttng.h | 2 +- liblttng-consumer/Makefile.am | 15 + liblttng-consumer/lttng-consumer.c | 1002 ++++++++++++++++ liblttng-kconsumer/Makefile.am | 8 + liblttng-kconsumer/lttng-kconsumer.c | 305 +++++ liblttng-sessiond-comm/lttng-sessiond-comm.c | 102 +- liblttng-ustconsumer/Makefile.am | 10 + liblttng-ustconsumer/lttng-ustconsumer.c | 307 +++++ liblttngkconsumerd/Makefile.am | 9 - liblttngkconsumerd/lttngkconsumerd.c | 1017 ----------------- libustctl/Makefile.am | 9 - ltt-kconsumerd/Makefile.am | 10 - ltt-sessiond/Makefile.am | 14 +- ltt-sessiond/compat/compat-epoll.c | 1 + ltt-sessiond/kernel-ctl.c | 2 +- ltt-sessiond/lttng-ust-abi.h | 137 +++ ltt-sessiond/main.c | 466 ++++---- ltt-sessiond/trace-kernel.h | 2 +- ltt-sessiond/trace-ust.c | 3 +- ltt-sessiond/trace-ust.h | 82 +- ltt-sessiond/ust-app.h | 48 +- ltt-sessiond/ust-comm.c | 3 +- ltt-sessiond/ust-ctl.c | 1 + ltt-sessiond/ust-ctl.h | 40 + lttng-consumerd/Makefile.am | 14 + .../lttng-consumerd.c | 99 +- lttng/commands/version.c | 1 + lttng/lttng.c | 1 + tests/test_kernel_data_trace.c | 2 +- 41 files changed, 2948 insertions(+), 1744 deletions(-) rename include/{ltt-kconsumerd.h => lttng-consumerd.h} (76%) delete mode 100644 include/lttng-ust.h create mode 100644 include/lttng/lttng-consumer.h create mode 100644 include/lttng/lttng-kconsumer.h delete mode 100644 include/lttng/lttng-kconsumerd.h create mode 100644 include/lttng/lttng-ustconsumer.h create mode 100644 liblttng-consumer/Makefile.am create mode 100644 liblttng-consumer/lttng-consumer.c create mode 100644 liblttng-kconsumer/Makefile.am create mode 100644 liblttng-kconsumer/lttng-kconsumer.c create mode 100644 liblttng-ustconsumer/Makefile.am create mode 100644 liblttng-ustconsumer/lttng-ustconsumer.c delete mode 100644 liblttngkconsumerd/Makefile.am delete mode 100644 liblttngkconsumerd/lttngkconsumerd.c delete mode 100644 libustctl/Makefile.am delete mode 100644 ltt-kconsumerd/Makefile.am create mode 100644 ltt-sessiond/lttng-ust-abi.h create mode 100644 lttng-consumerd/Makefile.am rename ltt-kconsumerd/ltt-kconsumerd.c => lttng-consumerd/lttng-consumerd.c (77%) diff --git a/Makefile.am b/Makefile.am index ec8b7fe95..9bafeb481 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,12 +2,13 @@ ACLOCAL_AMFLAGS = -I config SUBDIRS = liblttng-sessiond-comm \ libkernelctl \ - liblttngkconsumerd \ + liblttng-kconsumer \ + liblttng-ustconsumer \ + liblttng-consumer \ + lttng-consumerd \ liblttngctl \ libustcomm \ - libustctl \ lttng \ - ltt-kconsumerd \ ltt-sessiond \ tests \ include \ diff --git a/README b/README index 16e1154bd..dc7281562 100644 --- a/README +++ b/README @@ -51,16 +51,16 @@ PACKAGE CONTENTS: - liblttsessiondcomm The ltt-sessiond communication library. In order to talk with ltt-sessiond, - thiis library must be used. + this library must be used. - libkernelctl Kernel tracer control and ioctl definitions. - - liblttngkconsumerd - Library for Kernel trace consumer. + - liblttng-consumer + Library for Kernel and (optionally) UST trace consumer. - - ltt-kconsumerd - The Kernel consumer daemon which uses liblttngkconsumerd. + - lttng-consumerd + The consumer daemon which uses liblttng-consumer. - ltt-sessiond The LTTng session daemon binary. diff --git a/configure.ac b/configure.ac index c7dd21ec3..052c5a33d 100644 --- a/configure.ac +++ b/configure.ac @@ -6,6 +6,10 @@ AC_CONFIG_MACRO_DIR([config]) AM_INIT_AUTOMAKE([foreign dist-bzip2 no-dist-gzip]) AM_SILENT_RULES([yes]) +AC_CONFIG_HEADERS([include/config.h]) + +AH_TEMPLATE([CONFIG_LTTNG_TOOLS_HAVE_UST], [Defined on systems where UST headers can be found.]) + AC_CHECK_HEADERS([ \ sys/types.h unistd.h fcntl.h string.h pthread.h limits.h \ signal.h stdlib.h sys/un.h sys/socket.h stdlib.h stdio.h \ @@ -41,6 +45,19 @@ AC_CHECK_DECL([rcu_thread_offline], [], AC_CHECK_DECL([rcu_thread_online], [], [AC_MSG_ERROR([liburcu $liburcu_version or newer is needed])], [[#include ]] ) +AC_CHECK_DECL([ustctl_create_session], + [ + AC_DEFINE([CONFIG_LTTNG_TOOLS_HAVE_UST], 1) + have_ust_test=1 + ], + [ + AC_MSG_WARN([UST header not found. Building without UST support.]) + have_ust_test=0 + ], + [[#include ]] +) + +AM_CONDITIONAL([LTTNG_TOOLS_HAVE_UST], [ test "x$have_ust_test" = "x1" ]) # Epoll check. If not present, the build will fallback on poll() API AX_HAVE_EPOLL( @@ -70,12 +87,13 @@ AC_CONFIG_FILES([ Makefile include/Makefile libkernelctl/Makefile - liblttngkconsumerd/Makefile + liblttng-consumer/Makefile + liblttng-kconsumer/Makefile + liblttng-ustconsumer/Makefile liblttngctl/Makefile liblttng-sessiond-comm/Makefile - libustctl/Makefile libustcomm/Makefile - ltt-kconsumerd/Makefile + lttng-consumerd/Makefile ltt-sessiond/Makefile lttng/Makefile tests/Makefile diff --git a/include/Makefile.am b/include/Makefile.am index 171eefc9d..d53444f4e 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1,4 +1,5 @@ -lttnginclude_HEADERS = lttng/lttng.h lttng/lttng-kconsumerd.h +lttnginclude_HEADERS = lttng/lttng.h lttng/lttng-kconsumer.h \ + lttng/lttng-ustconsumer.h lttng/lttng-consumer.h -noinst_HEADERS = lttngerr.h lttng-kernel.h ltt-kconsumerd.h lttng-share.h \ - lttng-sessiond-comm.h lttng-ust.h lttng-kernel-ctl.h +noinst_HEADERS = lttngerr.h lttng-kernel.h lttng-consumerd.h lttng-share.h \ + lttng-sessiond-comm.h lttng-kernel-ctl.h diff --git a/include/ltt-kconsumerd.h b/include/lttng-consumerd.h similarity index 76% rename from include/ltt-kconsumerd.h rename to include/lttng-consumerd.h index 95c500524..8c240309e 100644 --- a/include/ltt-kconsumerd.h +++ b/include/lttng-consumerd.h @@ -17,12 +17,17 @@ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ -#ifndef _LTT_KCONSUMERD_H -#define _LTT_KCONSUMERD_H +#ifndef _LTTNG_CONSUMERD_H +#define _LTTNG_CONSUMERD_H /* Kernel consumer path */ #define KCONSUMERD_PATH LTTNG_RUNDIR "/kconsumerd" #define KCONSUMERD_CMD_SOCK_PATH KCONSUMERD_PATH "/command" #define KCONSUMERD_ERR_SOCK_PATH KCONSUMERD_PATH "/error" -#endif /* _LTT_KCONSUMERD_H */ +/* UST consumer path */ +#define USTCONSUMERD_PATH LTTNG_RUNDIR "/ustconsumerd" +#define USTCONSUMERD_CMD_SOCK_PATH USTCONSUMERD_PATH "/command" +#define USTCONSUMERD_ERR_SOCK_PATH USTCONSUMERD_PATH "/error" + +#endif /* _LTTNG_CONSUMERD_H */ diff --git a/include/lttng-kernel-ctl.h b/include/lttng-kernel-ctl.h index b51273fac..fa307e446 100644 --- a/include/lttng-kernel-ctl.h +++ b/include/lttng-kernel-ctl.h @@ -17,8 +17,8 @@ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ -#ifndef _LTT_LIBKERNELCTL_H -#define _LTT_LIBKERNELCTL_H +#ifndef _LTTNG_KERNEL_CTL_H +#define _LTTNG_KERNEL_CTL_H #include #include @@ -67,4 +67,4 @@ int kernctl_put_subbuf(int fd); int kernctl_buffer_flush(int fd); -#endif /* _LTT_LIBKERNELCTL_H */ +#endif /* _LTTNG_KERNEL_CTL_H */ diff --git a/include/lttng-sessiond-comm.h b/include/lttng-sessiond-comm.h index fcdc5a2cf..d9919add7 100644 --- a/include/lttng-sessiond-comm.h +++ b/include/lttng-sessiond-comm.h @@ -28,7 +28,6 @@ #include #include -#include #define LTTNG_RUNDIR "/var/run/lttng" @@ -117,19 +116,19 @@ enum lttcomm_return_code { LTTCOMM_UST_SESS_FAIL, /* UST create session failed */ LTTCOMM_UST_CHAN_NOT_FOUND, /* UST channel not found */ LTTCOMM_UST_CHAN_FAIL, /* UST create channel failed */ - KCONSUMERD_COMMAND_SOCK_READY, /* when kconsumerd command socket ready */ - KCONSUMERD_SUCCESS_RECV_FD, /* success on receiving fds */ - KCONSUMERD_ERROR_RECV_FD, /* error on receiving fds */ - KCONSUMERD_POLL_ERROR, /* Error in polling thread in kconsumerd */ - KCONSUMERD_POLL_NVAL, /* Poll on closed fd */ - KCONSUMERD_POLL_HUP, /* All fds have hungup */ - KCONSUMERD_EXIT_SUCCESS, /* kconsumerd exiting normally */ - KCONSUMERD_EXIT_FAILURE, /* kconsumerd exiting on error */ - KCONSUMERD_OUTFD_ERROR, /* error opening the tracefile */ - KCONSUMERD_SPLICE_EBADF, /* EBADF from splice(2) */ - KCONSUMERD_SPLICE_EINVAL, /* EINVAL from splice(2) */ - KCONSUMERD_SPLICE_ENOMEM, /* ENOMEM from splice(2) */ - KCONSUMERD_SPLICE_ESPIPE, /* ESPIPE from splice(2) */ + CONSUMERD_COMMAND_SOCK_READY, /* when consumerd command socket ready */ + CONSUMERD_SUCCESS_RECV_FD, /* success on receiving fds */ + CONSUMERD_ERROR_RECV_FD, /* error on receiving fds */ + CONSUMERD_POLL_ERROR, /* Error in polling thread in kconsumerd */ + CONSUMERD_POLL_NVAL, /* Poll on closed fd */ + CONSUMERD_POLL_HUP, /* All fds have hungup */ + CONSUMERD_EXIT_SUCCESS, /* kconsumerd exiting normally */ + CONSUMERD_EXIT_FAILURE, /* kconsumerd exiting on error */ + CONSUMERD_OUTFD_ERROR, /* error opening the tracefile */ + CONSUMERD_SPLICE_EBADF, /* EBADF from splice(2) */ + CONSUMERD_SPLICE_EINVAL, /* EINVAL from splice(2) */ + CONSUMERD_SPLICE_ENOMEM, /* ENOMEM from splice(2) */ + CONSUMERD_SPLICE_ESPIPE, /* ESPIPE from splice(2) */ /* MUST be last element */ LTTCOMM_NR, /* Last element */ }; @@ -186,26 +185,34 @@ struct lttcomm_lttng_msg { }; /* - * Data structures for the kconsumerd communications - * - * The header structure is sent to the kconsumerd daemon to inform - * how many lttcomm_kconsumerd_msg it is about to receive + * lttcomm_consumer_msg is the message sent from sessiond to consumerd + * to either add a channel, add a stream, update a stream, or stop + * operation. */ -struct lttcomm_kconsumerd_header { - uint32_t payload_size; - uint32_t cmd_type; /* enum kconsumerd_command */ +struct lttcomm_consumer_msg { + uint32_t cmd_type; /* enum consumerd_command */ + union { + struct { + int channel_key; + uint64_t max_sb_size; /* the subbuffer size for this channel */ + /* shm_fd and wait_fd are sent as ancillary data */ + uint64_t mmap_len; + } channel; + struct { + int channel_key; + int stream_key; + /* shm_fd and wait_fd are sent as ancillary data */ + uint32_t state; /* enum lttcomm_consumer_fd_state */ + enum lttng_event_output output; /* use splice or mmap to consume this fd */ + uint64_t mmap_len; + char path_name[PATH_MAX]; + } stream; + } u; }; -/* lttcomm_kconsumerd_msg represents a file descriptor to consume the - * data and a path name to write it - */ -struct lttcomm_kconsumerd_msg { - char path_name[PATH_MAX]; - int fd; - uint32_t state; /* enum lttcomm_kconsumerd_fd_state */ - unsigned long max_sb_size; /* the subbuffer size for this channel */ - enum lttng_event_output output; /* use splice or mmap to consume this fd */ -}; +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + +#include /* * Data structure for the commands sent from sessiond to UST. @@ -240,17 +247,20 @@ struct lttcomm_ust_reply { } u; }; +#endif /* CONFIG_LTTNG_TOOLS_HAVE_UST */ + extern int lttcomm_create_unix_sock(const char *pathname); extern int lttcomm_connect_unix_sock(const char *pathname); extern int lttcomm_accept_unix_sock(int sock); extern int lttcomm_listen_unix_sock(int sock); extern int lttcomm_close_unix_sock(int sock); -/* Send fd(s) over a unix socket. */ -extern ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds, - size_t nb_fd, size_t len); -/* Recv fd(s) over a unix socket */ -extern ssize_t lttcomm_recv_fds_unix_sock(int sock, void *buf, int *fds, - size_t nb_fd, size_t len); + +#define LTTCOMM_MAX_SEND_FDS 4 +/* Send a message accompanied by fd(s) over a unix socket. */ +extern ssize_t lttcomm_send_fds_unix_sock(int sock, int *fds, size_t nb_fd); +/* Recv a message accompanied by fd(s) from a unix socket */ +extern ssize_t lttcomm_recv_fds_unix_sock(int sock, int *fds, size_t nb_fd); + extern ssize_t lttcomm_recv_unix_sock(int sock, void *buf, size_t len); extern ssize_t lttcomm_send_unix_sock(int sock, void *buf, size_t len); extern const char *lttcomm_get_readable_code(enum lttcomm_return_code code); diff --git a/include/lttng-ust.h b/include/lttng-ust.h deleted file mode 100644 index b7143f871..000000000 --- a/include/lttng-ust.h +++ /dev/null @@ -1,109 +0,0 @@ -#ifndef _LTTNG_UST_H -#define _LTTNG_UST_H - -/* - * Taken from the lttng-ust-abi.h in the UST 2.0 git tree - * - * Copyright 2010-2011 - Mathieu Desnoyers - * Copyright 2011 - David Goulet - * - * LTTng-UST ABI header - * - * Dual LGPL v2.1/GPL v2 license. - */ - -#include - -#define LTTNG_UST_SYM_NAME_LEN 128 - -#define LTTNG_UST_COMM_VERSION_MAJOR 0 -#define LTTNG_UST_COMM_VERSION_MINOR 1 - -enum lttng_ust_instrumentation { - LTTNG_UST_TRACEPOINT = 0, - LTTNG_UST_PROBE = 1, - LTTNG_UST_FUNCTION = 2, -}; - -enum lttng_ust_output { - LTTNG_UST_MMAP = 0, -}; - -struct lttng_ust_tracer_version { - uint32_t version; - uint32_t patchlevel; - uint32_t sublevel; -}; - -struct lttng_ust_channel { - int overwrite; /* 1: overwrite, 0: discard */ - uint64_t subbuf_size; /* in bytes */ - uint64_t num_subbuf; - unsigned int switch_timer_interval; /* usecs */ - unsigned int read_timer_interval; /* usecs */ - enum lttng_ust_output output; /* output mode */ - /* The following fields are used internally within UST. */ - int shm_fd; - int wait_fd; - uint64_t memory_map_size; -}; - -struct lttng_ust_event { - char name[LTTNG_UST_SYM_NAME_LEN]; /* event name */ - enum lttng_ust_instrumentation instrumentation; - /* Per instrumentation type configuration */ - union { - } u; -}; - -enum lttng_ust_context_type { - LTTNG_UST_CONTEXT_VTID = 0, -}; - -struct lttng_ust_context { - enum lttng_ust_context_type ctx; - union { - } u; -}; - -#define _UST_CMD(minor) (minor) -#define _UST_CMDR(minor, type) (minor) -#define _UST_CMDW(minor, type) (minor) - -/* Handled by object descriptor */ -#define LTTNG_UST_RELEASE _UST_CMD(0x1) - -/* Handled by object cmd */ - -/* LTTng-UST commands */ -#define LTTNG_UST_SESSION _UST_CMD(0x40) -#define LTTNG_UST_TRACER_VERSION \ - _UST_CMDR(0x41, struct lttng_ust_tracer_version) -#define LTTNG_UST_TRACEPOINT_LIST _UST_CMD(0x42) -#define LTTNG_UST_WAIT_QUIESCENT _UST_CMD(0x43) -#define LTTNG_UST_REGISTER_DONE _UST_CMD(0x44) - -/* Session FD ioctl */ -#define LTTNG_UST_METADATA \ - _UST_CMDW(0x50, struct lttng_ust_channel) -#define LTTNG_UST_CHANNEL \ - _UST_CMDW(0x51, struct lttng_ust_channel) -#define LTTNG_UST_SESSION_START _UST_CMD(0x52) -#define LTTNG_UST_SESSION_STOP _UST_CMD(0x53) - -/* Channel FD ioctl */ -#define LTTNG_UST_STREAM _UST_CMD(0x60) -#define LTTNG_UST_EVENT \ - _UST_CMDW(0x61, struct lttng_ust_event) - -/* Event and Channel FD ioctl */ -#define LTTNG_UST_CONTEXT \ - _UST_CMDW(0x70, struct lttng_ust_context) - -/* Event, Channel and Session ioctl */ -#define LTTNG_UST_ENABLE _UST_CMD(0x80) -#define LTTNG_UST_DISABLE _UST_CMD(0x81) - -#define LTTNG_UST_ROOT_HANDLE 0 - -#endif /* _LTTNG_UST_H */ diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h new file mode 100644 index 000000000..7ca94cc1d --- /dev/null +++ b/include/lttng/lttng-consumer.h @@ -0,0 +1,297 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Copyright (C) 2011 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTTNG_CONSUMER_H +#define _LTTNG_CONSUMER_H + +#include +#include +#include +#include + +/* + * When the receiving thread dies, we need to have a way to make the polling + * thread exit eventually. If all FDs hang up (normal case when the + * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for + * whatever reason some FDs remain open, the consumer should still exit + * eventually. + * + * If the timeout is reached, it means that during this period no events + * occurred on the FDs so we need to force an exit. This case should not happen + * but it is a safety to ensure we won't block the consumer indefinitely. + * + * The value of 2 seconds is an arbitrary choice. + */ +#define LTTNG_CONSUMER_POLL_TIMEOUT 2000 + +/* Commands for consumer */ +enum lttng_consumer_command { + LTTNG_CONSUMER_ADD_CHANNEL, + LTTNG_CONSUMER_ADD_STREAM, + /* pause, delete, active depending on fd state */ + LTTNG_CONSUMER_UPDATE_STREAM, + /* inform the consumer to quit when all fd has hang up */ + LTTNG_CONSUMER_STOP, +}; + +/* State of each fd in consumer */ +enum lttng_consumer_stream_state { + LTTNG_CONSUMER_ACTIVE_STREAM, + LTTNG_CONSUMER_PAUSE_STREAM, + LTTNG_CONSUMER_DELETE_STREAM, +}; + +struct lttng_consumer_channel_list { + struct cds_list_head head; +}; + +struct lttng_consumer_stream_list { + struct cds_list_head head; +}; + +enum lttng_consumer_type { + LTTNG_CONSUMER_UNKNOWN = 0, + LTTNG_CONSUMER_KERNEL, + LTTNG_CONSUMER_UST, +}; + +struct lttng_consumer_channel { + struct cds_list_head list; + int key; + uint64_t max_sb_size; /* the subbuffer size for this channel */ + int refcount; /* Number of streams referencing this channel */ + /* For UST */ + int shm_fd; + int wait_fd; + void *mmap_base; + size_t mmap_len; + struct shm_handle *handle; + int nr_streams; +}; + +/* Forward declaration for UST. */ +struct lib_ring_buffer; + +/* + * Internal representation of the streams, sessiond_key is used to identify + * uniquely a stream. + */ +struct lttng_consumer_stream { + struct cds_list_head list; + struct lttng_consumer_channel *chan; /* associated channel */ + /* + * key is the key used by the session daemon to refer to the + * object in the consumer daemon. + */ + int key; + int shm_fd; + int wait_fd; + int out_fd; /* output file to write the data */ + off_t out_fd_offset; /* write position in the output file descriptor */ + char path_name[PATH_MAX]; /* tracefile name */ + enum lttng_consumer_stream_state state; + size_t shm_len; + void *mmap_base; + size_t mmap_len; + enum lttng_event_output output; /* splice or mmap */ + /* For UST */ + struct lib_ring_buffer *buf; + int cpu; +}; + +/* + * UST consumer local data to the program. One or more instance per + * process. + */ +struct lttng_consumer_local_data { + /* function to call when data is available on a buffer */ + int (*on_buffer_ready)(struct lttng_consumer_stream *stream); + /* + * function to call when we receive a new channel, it receives a + * newly allocated channel, depending on the return code of this + * function, the new channel will be handled by the application + * or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_recv_channel)(struct lttng_consumer_channel *channel); + /* + * function to call when we receive a new stream, it receives a + * newly allocated stream, depending on the return code of this + * function, the new stream will be handled by the application + * or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_recv_stream)(struct lttng_consumer_stream *stream); + /* + * function to call when a stream is getting updated by the session + * daemon, this function receives the sessiond key and the new + * state, depending on the return code of this function the + * update of state for the stream is handled by the application + * or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_update_stream)(int sessiond_key, uint32_t state); + /* socket to communicate errors with sessiond */ + int consumer_error_socket; + /* socket to exchange commands with sessiond */ + char *consumer_command_sock_path; + /* communication with splice */ + int consumer_thread_pipe[2]; + /* pipe to wake the poll thread when necessary */ + int consumer_poll_pipe[2]; + /* to let the signal handler wake up the fd receiver thread */ + int consumer_should_quit[2]; +}; + +/* + * Library-level data. One instance per process. + */ +struct lttng_consumer_global_data { + /* + * consumer_data.lock protects consumer_data.fd_list, + * consumer_data.stream_count, and consumer_data.need_update. It + * ensures the count matches the number of items in the fd_list. + * It ensures the list updates *always* trigger an fd_array + * update (therefore need to make list update vs + * consumer_data.need_update flag update atomic, and also flag + * read, fd array and flag clear atomic). + */ + pthread_mutex_t lock; + /* + * Number of streams in the list below. Protected by + * consumer_data.lock. + */ + int stream_count; + /* + * Lists of streams and channels. Protected by consumer_data.lock. + */ + struct lttng_consumer_stream_list stream_list; + struct lttng_consumer_channel_list channel_list; + /* + * Flag specifying if the local array of FDs needs update in the + * poll function. Protected by consumer_data.lock. + */ + unsigned int need_update; + enum lttng_consumer_type type; +}; + +/* + * Set the error socket for communication with a session daemon. + */ +extern void lttng_consumer_set_error_sock( + struct lttng_consumer_local_data *ctx, int sock); + +/* + * Set the command socket path for communication with a session daemon. + */ +extern void lttng_consumer_set_command_sock_path( + struct lttng_consumer_local_data *ctx, char *sock); + +/* + * Send return code to session daemon. + * + * Returns the return code of sendmsg : the number of bytes transmitted or -1 + * on error. + */ +extern int lttng_consumer_send_error( + struct lttng_consumer_local_data *ctx, int cmd); + +/* + * Called from signal handler to ensure a clean exit. + */ +extern void lttng_consumer_should_exit( + struct lttng_consumer_local_data *ctx); + +/* + * Cleanup the daemon's socket on exit. + */ +extern void lttng_consumer_cleanup(void); + +/* + * Flush pending writes to trace output disk file. + */ +extern void lttng_consumer_sync_trace_file( + struct lttng_consumer_stream *stream, off_t orig_offset); + +/* + * Poll on the should_quit pipe and the command socket return -1 on error and + * should exit, 0 if data is available on the command socket + */ +extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); + +extern int consumer_update_poll_array( + struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, + struct lttng_consumer_stream **local_consumer_streams); + +extern struct lttng_consumer_stream *consumer_allocate_stream( + int channel_key, int stream_key, + int shm_fd, int wait_fd, + enum lttng_consumer_stream_state state, + uint64_t mmap_len, + enum lttng_event_output output, + const char *path_name); +extern int consumer_add_stream(struct lttng_consumer_stream *stream); +extern void consumer_del_stream(struct lttng_consumer_stream *stream); +extern void consumer_change_stream_state(int stream_key, + enum lttng_consumer_stream_state state); +extern void consumer_del_channel(struct lttng_consumer_channel *channel); +extern struct lttng_consumer_channel *consumer_allocate_channel( + int channel_key, + int shm_fd, int wait_fd, + uint64_t mmap_len, + uint64_t max_sb_size); +int consumer_add_channel(struct lttng_consumer_channel *channel); + +extern struct lttng_consumer_local_data *lttng_consumer_create( + enum lttng_consumer_type type, + int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*recv_channel)(struct lttng_consumer_channel *channel), + int (*recv_stream)(struct lttng_consumer_stream *stream), + int (*update_stream)(int sessiond_key, uint32_t state)); +extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); +extern int lttng_consumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); +extern int lttng_consumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); +extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream); +extern int lttng_consumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos); +extern void *lttng_consumer_thread_poll_fds(void *data); +extern void *lttng_consumer_thread_receive_fds(void *data); +extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll); + +#endif /* _LTTNG_CONSUMER_H */ diff --git a/include/lttng/lttng-kconsumer.h b/include/lttng/lttng-kconsumer.h new file mode 100644 index 000000000..764a3ef43 --- /dev/null +++ b/include/lttng/lttng-kconsumer.h @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Copyright (C) 2011 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTTNG_KCONSUMER_H +#define _LTTNG_KCONSUMER_H + +#include + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written. + */ +extern int lttng_kconsumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +extern int lttng_kconsumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream); + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos); + +int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll); + +#endif /* _LTTNG_KCONSUMER_H */ diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h deleted file mode 100644 index 98771de3c..000000000 --- a/include/lttng/lttng-kconsumerd.h +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#ifndef _LTTNG_KCONSUMERD_H -#define _LTTNG_KCONSUMERD_H - -#include -#include -#include -#include - -/* - * When the receiving thread dies, we need to have a way to make the polling - * thread exit eventually. If all FDs hang up (normal case when the - * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for - * whatever reason some FDs remain open, the consumer should still exit - * eventually. - * - * If the timeout is reached, it means that during this period no events - * occurred on the FDs so we need to force an exit. This case should not happen - * but it is a safety to ensure we won't block the consumer indefinitely. - * - * The value of 2 seconds is an arbitrary choice. - */ -#define LTTNG_KCONSUMERD_POLL_GRACE_PERIOD 2000 - -/* Commands for kconsumerd */ -enum lttng_kconsumerd_command { - ADD_STREAM, - UPDATE_STREAM, /* pause, delete, active depending on fd state */ - STOP, /* inform the kconsumerd to quit when all fd has hang up */ -}; - -/* State of each fd in consumerd */ -enum lttng_kconsumerd_fd_state { - ACTIVE_FD, - PAUSE_FD, - DELETE_FD, -}; - -struct lttng_kconsumerd_fd_list { - struct cds_list_head head; -}; - -/* - * Internal representation of the FDs, sessiond_fd is used to identify uniquely - * a fd - */ -struct lttng_kconsumerd_fd { - struct cds_list_head list; - int sessiond_fd; /* used to identify uniquely a fd with sessiond */ - int consumerd_fd; /* fd to consume */ - int out_fd; /* output file to write the data */ - off_t out_fd_offset; /* write position in the output file descriptor */ - char path_name[PATH_MAX]; /* tracefile name */ - enum lttng_kconsumerd_fd_state state; - unsigned long max_sb_size; /* the subbuffer size for this channel */ - void *mmap_base; - size_t mmap_len; - enum lttng_event_output output; /* splice or mmap */ -}; - -/* - * Kernel consumer local data to the program. - */ -struct lttng_kconsumerd_local_data { - /* function to call when data is available on a buffer */ - int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd); - /* - * function to call when we receive a new fd, it receives a - * newly allocated kconsumerd_fd, depending on the return code - * of this function, the new FD will be handled by the - * application or the library. - * - * Returns: - * > 0 (success, FD is kept by application) - * == 0 (success, FD is left to library) - * < 0 (error) - */ - int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd); - /* - * function to call when a FD is getting updated by the session - * daemon, this function receives the FD as seen by the session - * daemon (sessiond_fd) and the new state, depending on the - * return code of this function the update of state for the FD - * is handled by the application or the library. - * - * Returns: - * > 0 (success, FD is kept by application) - * == 0 (success, FD is left to library) - * < 0 (error) - */ - int (*on_update_fd)(int sessiond_fd, uint32_t state); - /* socket to communicate errors with sessiond */ - int kconsumerd_error_socket; - /* socket to exchange commands with sessiond */ - char *kconsumerd_command_sock_path; - /* communication with splice */ - int kconsumerd_thread_pipe[2]; - /* pipe to wake the poll thread when necessary */ - int kconsumerd_poll_pipe[2]; - /* to let the signal handler wake up the fd receiver thread */ - int kconsumerd_should_quit[2]; -}; - -/* - * Initialise the necessary environnement: - * - create a new context - * - create the poll_pipe - * - create the should_quit pipe (for signal handler) - * - create the thread pipe (for splice) - * - * Takes the function pointers to the on_buffer_ready, on_recv_fd, and - * on_update_fd callbacks. - * - * Returns a pointer to the new context or NULL on error. - */ -extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), - int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), - int (*update_fd)(int sessiond_fd, uint32_t state)); - -/* - * Close all fds associated with the instance and free the context. - */ -extern void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data *ctx); - -/* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written. - */ -extern int lttng_kconsumerd_on_read_subbuffer_mmap( - struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len); - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. - */ -extern int lttng_kconsumerd_on_read_subbuffer_splice( - struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len); - -/* - * Take a snapshot for a specific fd - * - * Returns 0 on success, < 0 on error - */ -int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd); - -/* - * Get the produced position - * - * Returns 0 on success, < 0 on error - */ -int lttng_kconsumerd_get_produced_snapshot( - struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd, - unsigned long *pos); - -/* - * Send return code to session daemon. - * - * Returns the return code of sendmsg : the number of bytes transmitted or -1 - * on error. - */ -extern int lttng_kconsumerd_send_error( - struct lttng_kconsumerd_local_data *ctx, int cmd); - -/* - * Poll on the should_quit pipe and the command socket return -1 on error and - * should exit, 0 if data is available on the command socket. - */ -extern int lttng_kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll); - -/* - * This thread polls the fds in the ltt_fd_list to consume the data and write - * it to tracefile if necessary. - */ -extern void *lttng_kconsumerd_thread_poll_fds(void *data); - -/* - * This thread listens on the consumerd socket and receives the file - * descriptors from ltt-sessiond. - */ -extern void *lttng_kconsumerd_thread_receive_fds(void *data); - -/* - * Called from signal handler to ensure a clean exit. - */ -extern void lttng_kconsumerd_should_exit( - struct lttng_kconsumerd_local_data *ctx); - -/* - * Cleanup the daemon's socket on exit. - */ -extern void lttng_kconsumerd_cleanup(void); - -/* - * Set the error socket for communication with a session daemon. - */ -extern void lttng_kconsumerd_set_error_sock( - struct lttng_kconsumerd_local_data *ctx, int sock); - -/* - * Set the command socket path for communication with a session daemon. - */ -extern void lttng_kconsumerd_set_command_sock_path( - struct lttng_kconsumerd_local_data *ctx, char *sock); - -#endif /* _LTTNG_KCONSUMERD_H */ diff --git a/include/lttng/lttng-ustconsumer.h b/include/lttng/lttng-ustconsumer.h new file mode 100644 index 000000000..0d77a8939 --- /dev/null +++ b/include/lttng/lttng-ustconsumer.h @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Copyright (C) 2011 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTTNG_USTCONSUMER_H +#define _LTTNG_USTCONSUMER_H + +#include +#include +#include + +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written. + */ +extern int lttng_ustconsumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); + +/* Not implemented */ +extern int lttng_ustconsumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream); + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_ustconsumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos); + +int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll); + +extern int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan); +extern void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan); +extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream); +extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream); + + +#else /* CONFIG_LTTNG_TOOLS_HAVE_UST */ + +static inline +int lttng_ustconsumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + return -ENOSYS; +} + +static inline +int lttng_ustconsumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *uststream, unsigned long len) +{ + return -ENOSYS; +} + +static inline +int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + return -ENOSYS; +} + +static inline +int lttng_ustconsumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + return -ENOSYS; +} + +static inline +int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll); + +static inline +int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) +{ + return -ENOSYS; +} + +static inline +void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) +{ +} + +static inline +int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) +{ + return -ENOSYS; +} + +static inline +void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) +{ +} + +#endif /* CONFIG_LTTNG_TOOLS_HAVE_UST */ + +#endif /* _LTTNG_USTCONSUMER_H */ diff --git a/include/lttng/lttng.h b/include/lttng/lttng.h index 84f814793..62b3ca7c8 100644 --- a/include/lttng/lttng.h +++ b/include/lttng/lttng.h @@ -300,7 +300,7 @@ extern const char *lttng_get_readable_code(int code); * domain. No consumer will be spawned and all fds/commands will go through the * socket path given (socket_path). * - * NOTE: At the moment, if you use the liblttngkconsumerd, you can only use the + * NOTE: At the moment, if you use the liblttng-kconsumer, you can only use the * command socket. The error socket is not supported yet for roaming consumers. */ extern int lttng_register_consumer(struct lttng_handle *handle, diff --git a/liblttng-consumer/Makefile.am b/liblttng-consumer/Makefile.am new file mode 100644 index 000000000..e3aa02a22 --- /dev/null +++ b/liblttng-consumer/Makefile.am @@ -0,0 +1,15 @@ +AM_CPPFLAGS = -I$(top_srcdir)/include + +lib_LTLIBRARIES = liblttng-consumer.la + +liblttng_consumer_la_SOURCES = lttng-consumer.c + +liblttng_consumer_la_LIBADD = \ + $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \ + $(top_builddir)/liblttng-kconsumer/liblttng-kconsumer.la + + +if LTTNG_TOOLS_HAVE_UST +liblttng_consumer_la_LIBADD += \ + $(top_builddir)/liblttng-ustconsumer/liblttng-ustconsumer.la +endif diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c new file mode 100644 index 000000000..f031d5a67 --- /dev/null +++ b/liblttng-consumer/lttng-consumer.c @@ -0,0 +1,1002 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +struct lttng_consumer_global_data consumer_data = { + .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head), + .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head), + .stream_count = 0, + .need_update = 1, + .type = LTTNG_CONSUMER_UNKNOWN, +}; + +/* timeout parameter, to control the polling thread grace period. */ +int consumer_poll_timeout = -1; + +/* + * Flag to inform the polling thread to quit when all fd hung up. Updated by + * the consumer_thread_receive_fds when it notices that all fds has hung up. + * Also updated by the signal handler (consumer_should_exit()). Read by the + * polling threads. + */ +volatile int consumer_quit = 0; + +/* + * Find a stream. The consumer_data.lock must be locked during this + * call. + */ +static struct lttng_consumer_stream *consumer_find_stream(int key) +{ + struct lttng_consumer_stream *iter; + + cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { + if (iter->key == key) { + DBG("Found stream key %d", key); + return iter; + } + } + return NULL; +} + +static struct lttng_consumer_channel *consumer_find_channel(int key) +{ + struct lttng_consumer_channel *iter; + + cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) { + if (iter->key == key) { + DBG("Found channel key %d", key); + return iter; + } + } + return NULL; +} + +/* + * Remove a stream from the global list protected by a mutex. This + * function is also responsible for freeing its data structures. + */ +void consumer_del_stream(struct lttng_consumer_stream *stream) +{ + int ret; + struct lttng_consumer_channel *free_chan = NULL; + + pthread_mutex_lock(&consumer_data.lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + if (stream->mmap_base != NULL) { + ret = munmap(stream->mmap_base, stream->mmap_len); + if (ret != 0) { + perror("munmap"); + } + } + break; + case LTTNG_CONSUMER_UST: + lttng_ustconsumer_del_stream(stream); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + goto end; + } + + cds_list_del(&stream->list); + if (consumer_data.stream_count <= 0) { + goto end; + } + consumer_data.stream_count--; + if (!stream) { + goto end; + } + if (stream->out_fd >= 0) { + close(stream->out_fd); + } + if (stream->wait_fd >= 0) { + close(stream->wait_fd); + } + if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) { + close(stream->shm_fd); + } + if (!--stream->chan->refcount) + free_chan = stream->chan; + free(stream); +end: + consumer_data.need_update = 1; + pthread_mutex_unlock(&consumer_data.lock); + + if (free_chan) + consumer_del_channel(free_chan); +} + +struct lttng_consumer_stream *consumer_allocate_stream( + int channel_key, int stream_key, + int shm_fd, int wait_fd, + enum lttng_consumer_stream_state state, + uint64_t mmap_len, + enum lttng_event_output output, + const char *path_name) +{ + struct lttng_consumer_stream *stream; + int ret; + + stream = malloc(sizeof(*stream)); + if (stream == NULL) { + perror("malloc struct lttng_consumer_stream"); + goto end; + } + stream->chan = consumer_find_channel(channel_key); + if (!stream->chan) { + perror("Unable to find channel key"); + goto end; + } + stream->chan->refcount++; + stream->key = stream_key; + stream->shm_fd = shm_fd; + stream->wait_fd = wait_fd; + stream->out_fd = -1; + stream->out_fd_offset = 0; + stream->state = state; + stream->mmap_len = mmap_len; + stream->mmap_base = NULL; + stream->output = output; + strncpy(stream->path_name, path_name, PATH_MAX - 1); + stream->path_name[PATH_MAX - 1] = '\0'; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER_UST: + ret = lttng_ustconsumer_allocate_stream(stream); + if (ret) { + free(stream); + return NULL; + } + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + goto end; + } + DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)", + stream->path_name, stream->key, + stream->shm_fd, + stream->wait_fd, + (unsigned long long) stream->mmap_len, + stream->out_fd); +end: + return stream; +} + +/* + * Add a stream to the global list protected by a mutex. + */ +int consumer_add_stream(struct lttng_consumer_stream *stream) +{ + int ret = 0; + + pthread_mutex_lock(&consumer_data.lock); + /* Check if already exist */ + if (consumer_find_stream(stream->key)) { + ret = -1; + goto end; + } + cds_list_add(&stream->list, &consumer_data.stream_list.head); + consumer_data.stream_count++; + consumer_data.need_update = 1; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER_UST: + /* Streams are in CPU number order (we rely on this) */ + stream->cpu = stream->chan->nr_streams++; + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + goto end; + } + +end: + pthread_mutex_unlock(&consumer_data.lock); + return ret; +} + +/* + * Update a stream according to what we just received. + */ +void consumer_change_stream_state(int stream_key, + enum lttng_consumer_stream_state state) +{ + struct lttng_consumer_stream *stream; + + pthread_mutex_lock(&consumer_data.lock); + stream = consumer_find_stream(stream_key); + if (stream) { + stream->state = state; + } + consumer_data.need_update = 1; + pthread_mutex_unlock(&consumer_data.lock); +} + +/* + * Remove a channel from the global list protected by a mutex. This + * function is also responsible for freeing its data structures. + */ +void consumer_del_channel(struct lttng_consumer_channel *channel) +{ + int ret; + + pthread_mutex_lock(&consumer_data.lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER_UST: + lttng_ustconsumer_del_channel(channel); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + goto end; + } + + cds_list_del(&channel->list); + if (channel->mmap_base != NULL) { + ret = munmap(channel->mmap_base, channel->mmap_len); + if (ret != 0) { + perror("munmap"); + } + } + if (channel->wait_fd >= 0) { + close(channel->wait_fd); + } + if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) { + close(channel->shm_fd); + } + free(channel); +end: + pthread_mutex_unlock(&consumer_data.lock); +} + +struct lttng_consumer_channel *consumer_allocate_channel( + int channel_key, + int shm_fd, int wait_fd, + uint64_t mmap_len, + uint64_t max_sb_size) +{ + struct lttng_consumer_channel *channel; + int ret; + + channel = malloc(sizeof(*channel)); + if (channel == NULL) { + perror("malloc struct lttng_consumer_channel"); + goto end; + } + channel->key = channel_key; + channel->shm_fd = shm_fd; + channel->wait_fd = wait_fd; + channel->mmap_len = mmap_len; + channel->max_sb_size = max_sb_size; + channel->refcount = 0; + channel->nr_streams = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + channel->mmap_base = NULL; + channel->mmap_len = 0; + break; + case LTTNG_CONSUMER_UST: + ret = lttng_ustconsumer_allocate_channel(channel); + if (ret) { + free(channel); + return NULL; + } + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + goto end; + } + DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)", + channel->key, + channel->shm_fd, + channel->wait_fd, + (unsigned long long) channel->mmap_len, + (unsigned long long) channel->max_sb_size); +end: + return channel; +} + +/* + * Add a channel to the global list protected by a mutex. + */ +int consumer_add_channel(struct lttng_consumer_channel *channel) +{ + int ret = 0; + + pthread_mutex_lock(&consumer_data.lock); + /* Check if already exist */ + if (consumer_find_channel(channel->key)) { + ret = -1; + goto end; + } + cds_list_add(&channel->list, &consumer_data.channel_list.head); +end: + pthread_mutex_unlock(&consumer_data.lock); + return ret; +} + +/* + * Allocate the pollfd structure and the local view of the out fds to avoid + * doing a lookup in the linked list and concurrency issues when writing is + * needed. Called with consumer_data.lock held. + * + * Returns the number of fds in the structures. + */ +int consumer_update_poll_array( + struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, + struct lttng_consumer_stream **local_stream) +{ + struct lttng_consumer_stream *iter; + int i = 0; + + DBG("Updating poll fd array"); + cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { + if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) { + continue; + } + DBG("Active FD %d", iter->wait_fd); + (*pollfd)[i].fd = iter->wait_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_stream[i] = iter; + i++; + } + + /* + * Insert the consumer_poll_pipe at the end of the array and don't + * increment i so nb_fd is the number of real FD. + */ + (*pollfd)[i].fd = ctx->consumer_poll_pipe[0]; + (*pollfd)[i].events = POLLIN; + return i; +} + +/* + * Poll on the should_quit pipe and the command socket return -1 on error and + * should exit, 0 if data is available on the command socket + */ +int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll) +{ + int num_rdy; + + num_rdy = poll(consumer_sockpoll, 2, -1); + if (num_rdy == -1) { + perror("Poll error"); + goto exit; + } + if (consumer_sockpoll[0].revents == POLLIN) { + DBG("consumer_should_quit wake up"); + goto exit; + } + return 0; + +exit: + return -1; +} + +/* + * Set the error socket. + */ +void lttng_consumer_set_error_sock( + struct lttng_consumer_local_data *ctx, int sock) +{ + ctx->consumer_error_socket = sock; +} + +/* + * Set the command socket path. + */ + +void lttng_consumer_set_command_sock_path( + struct lttng_consumer_local_data *ctx, char *sock) +{ + ctx->consumer_command_sock_path = sock; +} + +/* + * Send return code to the session daemon. + * If the socket is not defined, we return 0, it is not a fatal error + */ +int lttng_consumer_send_error( + struct lttng_consumer_local_data *ctx, int cmd) +{ + if (ctx->consumer_error_socket > 0) { + return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd, + sizeof(enum lttcomm_sessiond_command)); + } + + return 0; +} + +/* + * Close all the tracefiles and stream fds, should be called when all instances + * are destroyed. + */ +void lttng_consumer_cleanup(void) +{ + struct lttng_consumer_stream *iter, *tmp; + struct lttng_consumer_channel *citer, *ctmp; + + /* + * close all outfd. Called when there are no more threads + * running (after joining on the threads), no need to protect + * list iteration with mutex. + */ + cds_list_for_each_entry_safe(iter, tmp, + &consumer_data.stream_list.head, list) { + consumer_del_stream(iter); + } + cds_list_for_each_entry_safe(citer, ctmp, + &consumer_data.channel_list.head, list) { + consumer_del_channel(citer); + } +} + +/* + * Called from signal handler. + */ +void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) +{ + int ret; + consumer_quit = 1; + ret = write(ctx->consumer_should_quit[1], "4", 1); + if (ret < 0) { + perror("write consumer quit"); + } +} + +void lttng_consumer_sync_trace_file( + struct lttng_consumer_stream *stream, off_t orig_offset) +{ + int outfd = stream->out_fd; + + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset < stream->chan->max_sb_size) { + return; + } + sync_file_range(outfd, orig_offset - stream->chan->max_sb_size, + stream->chan->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near future after + * we write it. + * + * We need to call fadvise again after the file grows because the + * kernel does not seem to apply fadvise to non-existing parts of the + * file. + * + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not well + * defined. So it can be expected to lead to lower throughput in + * streaming. + */ + posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size, + stream->chan->max_sb_size, POSIX_FADV_DONTNEED); +} + +/* + * Initialise the necessary environnement : + * - create a new context + * - create the poll_pipe + * - create the should_quit pipe (for signal handler) + * - create the thread pipe (for splice) + * + * Takes a function pointer as argument, this function is called when data is + * available on a buffer. This function is responsible to do the + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the + * buffer configuration and then kernctl_put_next_subbuf at the end. + * + * Returns a pointer to the new context or NULL on error. + */ +struct lttng_consumer_local_data *lttng_consumer_create( + enum lttng_consumer_type type, + int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*recv_channel)(struct lttng_consumer_channel *channel), + int (*recv_stream)(struct lttng_consumer_stream *stream), + int (*update_stream)(int stream_key, uint32_t state)) +{ + int ret, i; + struct lttng_consumer_local_data *ctx; + + assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN || + consumer_data.type == type); + consumer_data.type = type; + + ctx = malloc(sizeof(struct lttng_consumer_local_data)); + if (ctx == NULL) { + perror("allocating context"); + goto error; + } + + ctx->consumer_error_socket = -1; + /* assign the callbacks */ + ctx->on_buffer_ready = buffer_ready; + ctx->on_recv_channel = recv_channel; + ctx->on_recv_stream = recv_stream; + ctx->on_update_stream = update_stream; + + ret = pipe(ctx->consumer_poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + goto error_poll_pipe; + } + + ret = pipe(ctx->consumer_should_quit); + if (ret < 0) { + perror("Error creating recv pipe"); + goto error_quit_pipe; + } + + ret = pipe(ctx->consumer_thread_pipe); + if (ret < 0) { + perror("Error creating thread pipe"); + goto error_thread_pipe; + } + + return ctx; + + +error_thread_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->consumer_should_quit[i]); + assert(!err); + } +error_quit_pipe: + for (i = 0; i < 2; i++) { + int err; + + err = close(ctx->consumer_poll_pipe[i]); + assert(!err); + } +error_poll_pipe: + free(ctx); +error: + return NULL; +} + +/* + * Close all fds associated with the instance and free the context. + */ +void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) +{ + close(ctx->consumer_error_socket); + close(ctx->consumer_thread_pipe[0]); + close(ctx->consumer_thread_pipe[1]); + close(ctx->consumer_poll_pipe[0]); + close(ctx->consumer_poll_pipe[1]); + close(ctx->consumer_should_quit[0]); + close(ctx->consumer_should_quit[1]); + unlink(ctx->consumer_command_sock_path); + free(ctx); +} + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written + */ +int lttng_consumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len); + default: + ERR("Unknown consumer_data type"); + assert(0); + } +} + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +int lttng_consumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len); + case LTTNG_CONSUMER_UST: + return -ENOSYS; + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } + +} + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_take_snapshot(ctx, stream); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_take_snapshot(ctx, stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } + +} + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + +int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + +/* + * This thread polls the fds in the ltt_fd_list to consume the data and write + * it to tracefile if necessary. + */ +void *lttng_consumer_thread_poll_fds(void *data) +{ + int num_rdy, num_hup, high_prio, ret, i; + struct pollfd *pollfd = NULL; + /* local view of the streams */ + struct lttng_consumer_stream **local_stream = NULL; + /* local view of consumer_data.fds_count */ + int nb_fd = 0; + char tmp; + int tmp2; + struct lttng_consumer_local_data *ctx = data; + + local_stream = malloc(sizeof(struct lttng_consumer_stream)); + + while (1) { + high_prio = 0; + num_hup = 0; + + /* + * the ltt_fd_list has been updated, we need to update our + * local array as well + */ + pthread_mutex_lock(&consumer_data.lock); + if (consumer_data.need_update) { + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_stream != NULL) { + free(local_stream); + local_stream = NULL; + } + + /* allocate for all fds + 1 for the consumer_poll_pipe */ + pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + if (pollfd == NULL) { + perror("pollfd malloc"); + pthread_mutex_unlock(&consumer_data.lock); + goto end; + } + + /* allocate for all fds + 1 for the consumer_poll_pipe */ + local_stream = malloc((consumer_data.stream_count + 1) * + sizeof(struct lttng_consumer_stream)); + if (local_stream == NULL) { + perror("local_stream malloc"); + pthread_mutex_unlock(&consumer_data.lock); + goto end; + } + ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + if (ret < 0) { + ERR("Error in allocating pollfd or local_outfds"); + lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); + pthread_mutex_unlock(&consumer_data.lock); + goto end; + } + nb_fd = ret; + consumer_data.need_update = 0; + } + pthread_mutex_unlock(&consumer_data.lock); + + /* poll on the array of fds */ + DBG("polling on %d fd", nb_fd + 1); + num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout); + DBG("poll num_rdy : %d", num_rdy); + if (num_rdy == -1) { + perror("Poll error"); + lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); + goto end; + } else if (num_rdy == 0) { + DBG("Polling thread timed out"); + goto end; + } + + /* No FDs and consumer_quit, kconsumer_cleanup the thread */ + if (nb_fd == 0 && consumer_quit == 1) { + goto end; + } + + /* + * If the consumer_poll_pipe triggered poll go + * directly to the beginning of the loop to update the + * array. We want to prioritize array update over + * low-priority reads. + */ + if (pollfd[nb_fd].revents == POLLIN) { + DBG("consumer_poll_pipe wake up"); + tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); + if (tmp2 < 0) { + perror("read kconsumer poll"); + } + continue; + } + + /* Take care of high priority channels first. */ + for (i = 0; i < nb_fd; i++) { + switch(pollfd[i].revents) { + case POLLERR: + ERR("Error returned in polling fd %d.", pollfd[i].fd); + consumer_del_stream(local_stream[i]); + num_hup++; + break; + case POLLHUP: + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); + consumer_del_stream(local_stream[i]); + num_hup++; + break; + case POLLNVAL: + ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); + consumer_del_stream(local_stream[i]); + num_hup++; + break; + case POLLPRI: + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = ctx->on_buffer_ready(local_stream[i]); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + break; + } + } + + /* If every buffer FD has hung up, we end the read loop here */ + if (nb_fd > 0 && num_hup == nb_fd) { + DBG("every buffer FD has hung up\n"); + if (consumer_quit == 1) { + goto end; + } + continue; + } + + /* Take care of low priority channels. */ + if (high_prio == 0) { + for (i = 0; i < nb_fd; i++) { + if (pollfd[i].revents == POLLIN) { + DBG("Normal read on fd %d", pollfd[i].fd); + ret = ctx->on_buffer_ready(local_stream[i]); + /* it's ok to have an unavailable subbuffer */ + if (ret == EAGAIN) { + ret = 0; + } + } + } + } + } +end: + DBG("polling thread exiting"); + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_stream != NULL) { + free(local_stream); + local_stream = NULL; + } + return NULL; +} + +/* + * This thread listens on the consumerd socket and receives the file + * descriptors from the session daemon. + */ +void *lttng_consumer_thread_receive_fds(void *data) +{ + int sock, client_socket, ret; + /* + * structure to poll for incoming data on communication socket avoids + * making blocking sockets. + */ + struct pollfd consumer_sockpoll[2]; + struct lttng_consumer_local_data *ctx = data; + + DBG("Creating command socket %s", ctx->consumer_command_sock_path); + unlink(ctx->consumer_command_sock_path); + client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); + if (client_socket < 0) { + ERR("Cannot create command socket"); + goto end; + } + + ret = lttcomm_listen_unix_sock(client_socket); + if (ret < 0) { + goto end; + } + + DBG("Sending ready command to ltt-sessiond"); + ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY); + /* return < 0 on error, but == 0 is not fatal */ + if (ret < 0) { + ERR("Error sending ready command to ltt-sessiond"); + goto end; + } + + ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* prepare the FDs to poll : to client socket and the should_quit pipe */ + consumer_sockpoll[0].fd = ctx->consumer_should_quit[0]; + consumer_sockpoll[0].events = POLLIN | POLLPRI; + consumer_sockpoll[1].fd = client_socket; + consumer_sockpoll[1].events = POLLIN | POLLPRI; + + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + goto end; + } + DBG("Connection on client_socket"); + + /* Blocking call, waiting for transmission */ + sock = lttcomm_accept_unix_sock(client_socket); + if (sock <= 0) { + WARN("On accept"); + goto end; + } + ret = fcntl(sock, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* update the polling structure to poll on the established socket */ + consumer_sockpoll[1].fd = sock; + consumer_sockpoll[1].events = POLLIN | POLLPRI; + + while (1) { + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + goto end; + } + DBG("Incoming command on sock"); + ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll); + if (ret == -ENOENT) { + DBG("Received STOP command"); + goto end; + } + if (ret < 0) { + ERR("Communication interrupted on command socket"); + goto end; + } + if (consumer_quit) { + DBG("consumer_thread_receive_fds received quit from signal"); + goto end; + } + DBG("received fds on sock"); + } +end: + DBG("consumer_thread_receive_fds exiting"); + + /* + * when all fds have hung up, the polling thread + * can exit cleanly + */ + consumer_quit = 1; + + /* + * 2s of grace period, if no polling events occur during + * this period, the polling thread will exit even if there + * are still open FDs (should not happen, but safety mechanism). + */ + consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; + + /* wake up the polling thread */ + ret = write(ctx->consumer_poll_pipe[1], "4", 1); + if (ret < 0) { + perror("poll pipe write"); + } + return NULL; +} diff --git a/liblttng-kconsumer/Makefile.am b/liblttng-kconsumer/Makefile.am new file mode 100644 index 000000000..15021cd05 --- /dev/null +++ b/liblttng-kconsumer/Makefile.am @@ -0,0 +1,8 @@ +AM_CPPFLAGS = -I$(top_srcdir)/include + +noinst_LTLIBRARIES = liblttng-kconsumer.la + +liblttng_kconsumer_la_SOURCES = lttng-kconsumer.c + +liblttng_kconsumer_la_LIBADD = \ + $(top_builddir)/libkernelctl/libkernelctl.la diff --git a/liblttng-kconsumer/lttng-kconsumer.c b/liblttng-kconsumer/lttng-kconsumer.c new file mode 100644 index 000000000..c4ebb013e --- /dev/null +++ b/liblttng-kconsumer/lttng-kconsumer.c @@ -0,0 +1,305 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +extern struct lttng_consumer_global_data consumer_data; +extern int consumer_poll_timeout; +extern volatile int consumer_quit; + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written + */ +int lttng_kconsumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + unsigned long mmap_offset; + long ret = 0; + off_t orig_offset = stream->out_fd_offset; + int fd = stream->wait_fd; + int outfd = stream->out_fd; + + /* get the offset inside the fd to mmap */ + ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); + if (ret != 0) { + ret = -errno; + perror("kernctl_get_mmap_read_offset"); + goto end; + } + + while (len > 0) { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + if (ret >= len) { + len = 0; + } else if (ret < 0) { + ret = -errno; + perror("Error in file write"); + goto end; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + + lttng_consumer_sync_trace_file(stream, orig_offset); + + goto end; + +end: + return ret; +} + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +int lttng_kconsumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + long ret = 0; + loff_t offset = 0; + off_t orig_offset = stream->out_fd_offset; + int fd = stream->wait_fd; + int outfd = stream->out_fd; + + while (len > 0) { + DBG("splice chan to pipe offset %lu (fd : %d)", + (unsigned long)offset, fd); + ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe ret %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in relay splice"); + goto splice_error; + } + + ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in file splice"); + goto splice_error; + } + len -= ret; + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + lttng_consumer_sync_trace_file(stream, orig_offset); + + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch(ret) { + case EBADF: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE); + break; + } + +end: + return ret; +} + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret = 0; + int infd = stream->wait_fd; + + ret = kernctl_snapshot(infd); + if (ret != 0) { + ret = errno; + perror("Getting sub-buffer snapshot."); + } + + return ret; +} + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_produced(infd, pos); + if (ret != 0) { + ret = errno; + perror("kernctl_snapshot_get_produced"); + } + + return ret; +} + +int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll) +{ + ssize_t ret; + struct lttcomm_consumer_msg msg; + + ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); + if (ret != sizeof(msg)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + return -ENOENT; + } + + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_CHANNEL: + { + struct lttng_consumer_channel *new_channel; + + DBG("consumer_add_channel %d", msg.u.channel.channel_key); + new_channel = consumer_allocate_channel(msg.u.channel.channel_key, + -1, -1, + msg.u.channel.mmap_len, + msg.u.channel.max_sb_size); + if (new_channel == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + if (ctx->on_recv_channel != NULL) { + ret = ctx->on_recv_channel(new_channel); + if (ret == 0) { + consumer_add_channel(new_channel); + } else if (ret < 0) { + goto end_nosignal; + } + } else { + consumer_add_channel(new_channel); + } + goto end_nosignal; + } + case LTTNG_CONSUMER_ADD_STREAM: + { + struct lttng_consumer_stream *new_stream; + int fds[1]; + size_t nb_fd = 1; + + /* block */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); + if (ret != sizeof(fds)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + if (nb_fd < 2) + fds[1] = fds[0]; /* duplicate same fd if recv only one */ + + DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, + fds[0], fds[1]); + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, + msg.u.stream.stream_key, + fds[0], fds[1], + msg.u.stream.state, + msg.u.stream.mmap_len, + msg.u.stream.output, + msg.u.stream.path_name); + if (new_stream == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end; + } + if (ctx->on_recv_stream != NULL) { + ret = ctx->on_recv_stream(new_stream); + if (ret == 0) { + consumer_add_stream(new_stream); + } else if (ret < 0) { + goto end; + } + } else { + consumer_add_stream(new_stream); + } + break; + } + case LTTNG_CONSUMER_UPDATE_STREAM: + { + if (ctx->on_update_stream != NULL) { + ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); + if (ret == 0) { + consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); + } else if (ret < 0) { + goto end; + } + } else { + consumer_change_stream_state(msg.u.stream.stream_key, + msg.u.stream.state); + } + break; + } + default: + break; + } +end: + /* signal the poll thread */ + ret = write(ctx->consumer_poll_pipe[1], "4", 1); + if (ret < 0) { + perror("write consumer poll"); + } +end_nosignal: + return 0; +} diff --git a/liblttng-sessiond-comm/lttng-sessiond-comm.c b/liblttng-sessiond-comm/lttng-sessiond-comm.c index be82cc1ec..4bb6ed578 100644 --- a/liblttng-sessiond-comm/lttng-sessiond-comm.c +++ b/liblttng-sessiond-comm/lttng-sessiond-comm.c @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -75,19 +76,19 @@ static const char *lttcomm_readable_code[] = { [ LTTCOMM_ERR_INDEX(LTTCOMM_UST_SESS_FAIL) ] = "UST create session failed", [ LTTCOMM_ERR_INDEX(LTTCOMM_UST_CHAN_NOT_FOUND) ] = "UST channel not found", [ LTTCOMM_ERR_INDEX(LTTCOMM_UST_CHAN_FAIL) ] = "UST create channel failed", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_COMMAND_SOCK_READY) ] = "Kconsumerd command socket ready", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_SUCCESS_RECV_FD) ] = "Kconsumerd success on receiving fds", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_ERROR_RECV_FD) ] = "Kconsumerd error on receiving fds", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_ERROR) ] = "Kconsumerd error in polling thread", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_NVAL) ] = "Kconsumerd polling on closed fd", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_POLL_HUP) ] = "Kconsumerd all fd hung up", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_EXIT_SUCCESS) ] = "Kconsumerd exiting normally", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_EXIT_FAILURE) ] = "Kconsumerd exiting on error", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_OUTFD_ERROR) ] = "Kconsumerd error opening the tracefile", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_EBADF) ] = "Kconsumerd splice EBADF", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_EINVAL) ] = "Kconsumerd splice EINVAL", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_ENOMEM) ] = "Kconsumerd splice ENOMEM", - [ LTTCOMM_ERR_INDEX(KCONSUMERD_SPLICE_ESPIPE) ] = "Kconsumerd splice ESPIPE", + [ LTTCOMM_ERR_INDEX(CONSUMERD_COMMAND_SOCK_READY) ] = "consumerd command socket ready", + [ LTTCOMM_ERR_INDEX(CONSUMERD_SUCCESS_RECV_FD) ] = "consumerd success on receiving fds", + [ LTTCOMM_ERR_INDEX(CONSUMERD_ERROR_RECV_FD) ] = "consumerd error on receiving fds", + [ LTTCOMM_ERR_INDEX(CONSUMERD_POLL_ERROR) ] = "consumerd error in polling thread", + [ LTTCOMM_ERR_INDEX(CONSUMERD_POLL_NVAL) ] = "consumerd polling on closed fd", + [ LTTCOMM_ERR_INDEX(CONSUMERD_POLL_HUP) ] = "consumerd all fd hung up", + [ LTTCOMM_ERR_INDEX(CONSUMERD_EXIT_SUCCESS) ] = "consumerd exiting normally", + [ LTTCOMM_ERR_INDEX(CONSUMERD_EXIT_FAILURE) ] = "consumerd exiting on error", + [ LTTCOMM_ERR_INDEX(CONSUMERD_OUTFD_ERROR) ] = "consumerd error opening the tracefile", + [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_EBADF) ] = "consumerd splice EBADF", + [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_EINVAL) ] = "consumerd splice EINVAL", + [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_ENOMEM) ] = "consumerd splice ENOMEM", + [ LTTCOMM_ERR_INDEX(CONSUMERD_SPLICE_ESPIPE) ] = "consumerd splice ESPIPE", [ LTTCOMM_ERR_INDEX(LTTCOMM_NO_EVENT) ] = "Event not found", }; @@ -285,10 +286,11 @@ int lttcomm_close_unix_sock(int sock) } /* - * Send multiple fds on a unix socket. + * Send a message accompanied by fd(s) over a unix socket. + * + * Returns the size of data sent, or negative error value. */ -ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds, - size_t nb_fd, size_t len) +ssize_t lttcomm_send_fds_unix_sock(int sock, int *fds, size_t nb_fd) { struct msghdr msg = { 0 }; struct cmsghdr *cmptr; @@ -296,11 +298,10 @@ ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds, ssize_t ret = -1; unsigned int sizeof_fds = nb_fd * sizeof(int); char tmp[CMSG_SPACE(sizeof_fds)]; + char dummy = 0; - /* - * Note: the consumerd receiver only supports receiving one FD per message. - */ - assert(nb_fd == 1); + if (nb_fd > LTTCOMM_MAX_SEND_FDS) + return -EINVAL; msg.msg_control = (caddr_t)tmp; msg.msg_controllen = CMSG_LEN(sizeof_fds); @@ -313,8 +314,8 @@ ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds, /* Sum of the length of all control messages in the buffer: */ msg.msg_controllen = cmptr->cmsg_len; - iov[0].iov_base = buf; - iov[0].iov_len = len; + iov[0].iov_base = &dummy; + iov[0].iov_len = 1; msg.msg_iov = iov; msg.msg_iovlen = 1; @@ -322,31 +323,30 @@ ssize_t lttcomm_send_fds_unix_sock(int sock, void *buf, int *fds, if (ret < 0) { perror("sendmsg"); } - return ret; } /* - * Receives a single fd from socket. + * Recv a message accompanied by fd(s) from a unix socket. * - * Returns the size of received data + * Returns the size of received data, or negative error value. + * + * Expect at most "nb_fd" file descriptors. Returns the number of fd + * actually received in nb_fd. */ -ssize_t lttcomm_recv_fds_unix_sock(int sock, void *buf, int *fds, - size_t nb_fd, size_t len) +ssize_t lttcomm_recv_fds_unix_sock(int sock, int *fds, size_t nb_fd) { struct iovec iov[1]; - int data_fd, i, ret = 0; + ssize_t ret = 0; struct cmsghdr *cmsg; - char recv_fd[CMSG_SPACE(sizeof(int))]; + size_t sizeof_fds = nb_fd * sizeof(int); + char recv_fd[CMSG_SPACE(sizeof_fds)]; struct msghdr msg = { 0 }; - union { - unsigned char vc[4]; - int vi; - } tmp; + char dummy; /* Prepare to receive the structures */ - iov[0].iov_base = &data_fd; - iov[0].iov_len = sizeof(data_fd); + iov[0].iov_base = &dummy; + iov[0].iov_len = 1; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = recv_fd; @@ -357,33 +357,35 @@ ssize_t lttcomm_recv_fds_unix_sock(int sock, void *buf, int *fds, perror("recvmsg fds"); goto end; } - - if (ret != sizeof(data_fd)) { - fprintf(stderr, "Error: Received %d bytes, expected %ld", - ret, sizeof(data_fd)); + if (ret != 1) { + fprintf(stderr, "Error: Received %zd bytes, expected %d\n", + ret, 1); + goto end; + } + if (msg.msg_flags & MSG_CTRUNC) { + fprintf(stderr, "Error: Control message truncated.\n"); + ret = -1; goto end; } - cmsg = CMSG_FIRSTHDR(&msg); if (!cmsg) { - fprintf(stderr, "Error: Invalid control message header"); + fprintf(stderr, "Error: Invalid control message header\n"); ret = -1; goto end; } - if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) { - fprintf(stderr, "Didn't received any fd"); + fprintf(stderr, "Didn't received any fd\n"); ret = -1; goto end; } - - /* this is our fd */ - for (i = 0; i < sizeof(int); i++) { - tmp.vc[i] = CMSG_DATA(cmsg)[i]; + if (cmsg->cmsg_len != CMSG_LEN(sizeof_fds)) { + fprintf(stderr, "Error: Received %zu bytes of ancillary data, expected %zu\n", + cmsg->cmsg_len, CMSG_LEN(sizeof_fds)); + ret = -1; + goto end; } - - ret = tmp.vi; - + memcpy(fds, CMSG_DATA(cmsg), sizeof_fds); + ret = sizeof_fds; end: return ret; } diff --git a/liblttng-ustconsumer/Makefile.am b/liblttng-ustconsumer/Makefile.am new file mode 100644 index 000000000..c181a47cf --- /dev/null +++ b/liblttng-ustconsumer/Makefile.am @@ -0,0 +1,10 @@ +AM_CPPFLAGS = -I$(top_srcdir)/include + +if LTTNG_TOOLS_HAVE_UST +noinst_LTLIBRARIES = liblttng-ustconsumer.la + +liblttng_ustconsumer_la_SOURCES = lttng-ustconsumer.c + +liblttng_ustconsumer_la_LIBADD = \ + -lustctl +endif diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c new file mode 100644 index 000000000..1e0bf55ec --- /dev/null +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -0,0 +1,307 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +extern struct lttng_consumer_global_data consumer_data; +extern int consumer_poll_timeout; +extern volatile int consumer_quit; + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written + */ +int lttng_ustconsumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + unsigned long mmap_offset; + long ret = 0; + off_t orig_offset = stream->out_fd_offset; + int outfd = stream->out_fd; + + /* get the offset inside the fd to mmap */ + ret = ustctl_get_mmap_read_offset(stream->chan->handle, + stream->buf, &mmap_offset); + if (ret != 0) { + ret = -errno; + perror("ustctl_get_mmap_read_offset"); + goto end; + } + while (len > 0) { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + if (ret >= len) { + len = 0; + } else if (ret < 0) { + ret = -errno; + perror("Error in file write"); + goto end; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + + lttng_consumer_sync_trace_file(stream, orig_offset); + + goto end; + +end: + return ret; +} + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +int lttng_ustconsumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len) +{ + return -ENOSYS; +} + +/* + * Take a snapshot for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret = 0; + + ret = ustctl_snapshot(stream->chan->handle, stream->buf); + if (ret != 0) { + ret = errno; + perror("Getting sub-buffer snapshot."); + } + + return ret; +} + +/* + * Get the produced position + * + * Returns 0 on success, < 0 on error + */ +int lttng_ustconsumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + + ret = ustctl_snapshot_get_produced(stream->chan->handle, + stream->buf, pos); + if (ret != 0) { + ret = errno; + perror("kernctl_snapshot_get_produced"); + } + + return ret; +} + +int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll) +{ + ssize_t ret; + struct lttcomm_consumer_msg msg; + + ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); + if (ret != sizeof(msg)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + return -ENOENT; + } + + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_CHANNEL: + { + struct lttng_consumer_channel *new_channel; + int fds[1]; + size_t nb_fd = 1; + + /* block */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); + if (ret != sizeof(fds)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + + DBG("consumer_add_channel %d", msg.u.channel.channel_key); + + new_channel = consumer_allocate_channel(msg.u.channel.channel_key, + fds[0], -1, + msg.u.channel.mmap_len, + msg.u.channel.max_sb_size); + if (new_channel == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + if (ctx->on_recv_channel != NULL) { + ret = ctx->on_recv_channel(new_channel); + if (ret == 0) { + consumer_add_channel(new_channel); + } else if (ret < 0) { + goto end_nosignal; + } + } else { + consumer_add_channel(new_channel); + } + goto end_nosignal; + } + case LTTNG_CONSUMER_ADD_STREAM: + { + struct lttng_consumer_stream *new_stream; + int fds[2]; + size_t nb_fd = 2; + + /* block */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); + if (ret != sizeof(fds)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + return ret; + } + + DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, + fds[0], fds[1]); + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, + msg.u.stream.stream_key, + fds[0], fds[1], + msg.u.stream.state, + msg.u.stream.mmap_len, + msg.u.stream.output, + msg.u.stream.path_name); + if (new_stream == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end; + } + if (ctx->on_recv_stream != NULL) { + ret = ctx->on_recv_stream(new_stream); + if (ret == 0) { + consumer_add_stream(new_stream); + } else if (ret < 0) { + goto end; + } + } else { + consumer_add_stream(new_stream); + } + break; + } + case LTTNG_CONSUMER_UPDATE_STREAM: + { + if (ctx->on_update_stream != NULL) { + ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); + if (ret == 0) { + consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); + } else if (ret < 0) { + goto end; + } + } else { + consumer_change_stream_state(msg.u.stream.stream_key, + msg.u.stream.state); + } + break; + } + default: + break; + } +end: + /* signal the poll thread */ + ret = write(ctx->consumer_poll_pipe[1], "4", 1); + if (ret < 0) { + perror("write consumer poll"); + } +end_nosignal: + return 0; +} + +int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) +{ + struct object_data obj; + + obj.handle = -1; + obj.shm_fd = chan->shm_fd; + obj.wait_fd = chan->wait_fd; + obj.memory_map_size = chan->mmap_len; + chan->handle = ustctl_map_channel(&obj); + if (!chan->handle) { + return -ENOMEM; + } + return 0; +} + +void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) +{ + ustctl_unmap_channel(chan->handle); +} + +int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) +{ + struct object_data obj; + int ret; + + obj.handle = -1; + obj.shm_fd = stream->shm_fd; + obj.wait_fd = stream->wait_fd; + obj.memory_map_size = stream->mmap_len; + ret = ustctl_add_stream(stream->chan->handle, &obj); + if (ret) + return ret; + stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); + if (!stream->buf) + return -EBUSY; + stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); + if (!stream->mmap_base) { + return -EINVAL; + } + return 0; +} + +void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) +{ + ustctl_close_stream_read(stream->chan->handle, stream->buf); +} diff --git a/liblttngkconsumerd/Makefile.am b/liblttngkconsumerd/Makefile.am deleted file mode 100644 index e8c5741b3..000000000 --- a/liblttngkconsumerd/Makefile.am +++ /dev/null @@ -1,9 +0,0 @@ -AM_CPPFLAGS = -I$(top_srcdir)/include - -lib_LTLIBRARIES = liblttngkconsumerd.la - -liblttngkconsumerd_la_SOURCES = lttngkconsumerd.c - -liblttngkconsumerd_la_LIBADD = \ - $(top_builddir)/libkernelctl/libkernelctl.la \ - $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c deleted file mode 100644 index 1893e0ae6..000000000 --- a/liblttngkconsumerd/lttngkconsumerd.c +++ /dev/null @@ -1,1017 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -static struct lttng_kconsumerd_global_data { - /* - * kconsumerd_data.lock protects kconsumerd_data.fd_list, - * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It ensures - * the count matches the number of items in the fd_list. It ensures the - * list updates *always* trigger an fd_array update (therefore need to make - * list update vs kconsumerd_data.need_update flag update atomic, and also - * flag read, fd array and flag clear atomic). - */ - pthread_mutex_t lock; - /* - * Number of element for the list below. Protected by kconsumerd_data.lock. - */ - unsigned int fds_count; - /* - * List of FDs. Protected by kconsumerd_data.lock. - */ - struct lttng_kconsumerd_fd_list fd_list; - /* - * Flag specifying if the local array of FDs needs update in the poll - * function. Protected by kconsumerd_data.lock. - */ - unsigned int need_update; -} kconsumerd_data = { - .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), - .fds_count = 0, - .need_update = 1, -}; - -/* timeout parameter, to control the polling thread grace period. */ -static int kconsumerd_poll_timeout = -1; - -/* - * Flag to inform the polling thread to quit when all fd hung up. Updated by - * the kconsumerd_thread_receive_fds when it notices that all fds has hung up. - * Also updated by the signal handler (kconsumerd_should_exit()). Read by the - * polling threads. - */ -static volatile int kconsumerd_quit = 0; - -/* - * Find a session fd in the global list. The kconsumerd_data.lock must be - * locked during this call. - * - * Return 1 if found else 0. - */ -static int kconsumerd_find_session_fd(int fd) -{ - struct lttng_kconsumerd_fd *iter; - - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->sessiond_fd == fd) { - DBG("Duplicate session fd %d", fd); - return 1; - } - } - - return 0; -} - -/* - * Remove a fd from the global list protected by a mutex. - */ -static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf) -{ - int ret; - pthread_mutex_lock(&kconsumerd_data.lock); - cds_list_del(&lcf->list); - if (kconsumerd_data.fds_count > 0) { - kconsumerd_data.fds_count--; - if (lcf != NULL) { - if (lcf->mmap_base != NULL) { - ret = munmap(lcf->mmap_base, lcf->mmap_len); - if (ret != 0) { - perror("munmap"); - } - } - if (lcf->out_fd != 0) { - close(lcf->out_fd); - } - close(lcf->consumerd_fd); - free(lcf); - lcf = NULL; - } - } - kconsumerd_data.need_update = 1; - pthread_mutex_unlock(&kconsumerd_data.lock); -} - -/* - * Create a struct lttcomm_kconsumerd_msg from the - * information received on the receiving socket - */ -struct lttng_kconsumerd_fd *kconsumerd_allocate_fd( - struct lttcomm_kconsumerd_msg *buf, - int consumerd_fd) -{ - struct lttng_kconsumerd_fd *tmp_fd; - - tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); - if (tmp_fd == NULL) { - perror("malloc struct lttng_kconsumerd_fd"); - goto end; - } - - tmp_fd->sessiond_fd = buf->fd; - tmp_fd->consumerd_fd = consumerd_fd; - tmp_fd->state = buf->state; - tmp_fd->max_sb_size = buf->max_sb_size; - tmp_fd->out_fd = 0; - tmp_fd->out_fd_offset = 0; - tmp_fd->mmap_len = 0; - tmp_fd->mmap_base = NULL; - tmp_fd->output = buf->output; - strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); - tmp_fd->path_name[PATH_MAX - 1] = '\0'; - DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)", - tmp_fd->path_name, tmp_fd->sessiond_fd, - tmp_fd->consumerd_fd, tmp_fd->out_fd); - -end: - return tmp_fd; -} - -/* - * Add a fd to the global list protected by a mutex. - */ -static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd) -{ - int ret; - - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd); - if (ret == 1) { - goto end; - } - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); - kconsumerd_data.fds_count++; - kconsumerd_data.need_update = 1; - -end: - pthread_mutex_unlock(&kconsumerd_data.lock); - return ret; -} - -/* - * Update a fd according to what we just received. - */ -static void kconsumerd_change_fd_state(int sessiond_fd, - enum lttng_kconsumerd_fd_state state) -{ - struct lttng_kconsumerd_fd *iter; - - pthread_mutex_lock(&kconsumerd_data.lock); - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->sessiond_fd == sessiond_fd) { - iter->state = state; - break; - } - } - kconsumerd_data.need_update = 1; - pthread_mutex_unlock(&kconsumerd_data.lock); -} - -/* - * Allocate the pollfd structure and the local view of the out fds to avoid - * doing a lookup in the linked list and concurrency issues when writing is - * needed. Called with kconsumerd_data.lock held. - * - * Returns the number of fds in the structures. - */ -static int kconsumerd_update_poll_array( - struct lttng_kconsumerd_local_data *ctx, struct pollfd **pollfd, - struct lttng_kconsumerd_fd **local_kconsumerd_fd) -{ - struct lttng_kconsumerd_fd *iter; - int i = 0; - - DBG("Updating poll fd array"); - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->state == ACTIVE_FD) { - DBG("Active FD %d", iter->consumerd_fd); - (*pollfd)[i].fd = iter->consumerd_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_kconsumerd_fd[i] = iter; - i++; - } - } - - /* - * Insert the kconsumerd_poll_pipe at the end of the array and don't - * increment i so nb_fd is the number of real FD. - */ - (*pollfd)[i].fd = ctx->kconsumerd_poll_pipe[0]; - (*pollfd)[i].events = POLLIN; - return i; -} - -/* - * Receives an array of file descriptors and the associated structures - * describing each fd (path name). - * - * Returns the size of received data - */ -static int kconsumerd_consumerd_recv_fd( - struct lttng_kconsumerd_local_data *ctx, int sfd, - struct pollfd *kconsumerd_sockpoll, int size, - enum lttng_kconsumerd_command cmd_type) -{ - struct iovec iov[1]; - int ret = 0, i, j, tmp2; - struct cmsghdr *cmsg; - int nb_fd; - char recv_fd[CMSG_SPACE(sizeof(int))]; - struct lttcomm_kconsumerd_msg lkm; - struct lttng_kconsumerd_fd *new_fd; - union { - unsigned char vc[4]; - int vi; - } tmp; - - /* the number of fds we are about to receive */ - nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); - - /* - * nb_fd is the number of fds we receive. One fd per recvmsg. - */ - for (i = 0; i < nb_fd; i++) { - struct msghdr msg = { 0 }; - - /* Prepare to receive the structures */ - iov[0].iov_base = &lkm; - iov[0].iov_len = sizeof(lkm); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - msg.msg_control = recv_fd; - msg.msg_controllen = sizeof(recv_fd); - - DBG("Waiting to receive fd"); - if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - - if ((ret = recvmsg(sfd, &msg, 0)) < 0) { - perror("recvmsg"); - continue; - } - - if (ret != (size / nb_fd)) { - ERR("Received only %d, expected %d", ret, size); - lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); - goto end; - } - - cmsg = CMSG_FIRSTHDR(&msg); - if (!cmsg) { - ERR("Invalid control message header"); - ret = -1; - lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); - goto end; - } - - /* if we received fds */ - if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { - switch (cmd_type) { - case ADD_STREAM: - for (j = 0; j < sizeof(int); j++) - tmp.vc[j] = CMSG_DATA(cmsg)[j]; - DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, tmp.vi); - new_fd = kconsumerd_allocate_fd(&lkm, tmp.vi); - if (new_fd == NULL) { - lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); - goto end; - } - - if (ctx->on_recv_fd != NULL) { - ret = ctx->on_recv_fd(new_fd); - if (ret == 0) { - kconsumerd_add_fd(new_fd); - } else if (ret < 0) { - goto end; - } - } else { - kconsumerd_add_fd(new_fd); - } - break; - case UPDATE_STREAM: - if (ctx->on_update_fd != NULL) { - ret = ctx->on_update_fd(lkm.fd, lkm.state); - if (ret == 0) { - kconsumerd_change_fd_state(lkm.fd, lkm.state); - } else if (ret < 0) { - goto end; - } - } else { - kconsumerd_change_fd_state(lkm.fd, lkm.state); - } - break; - default: - break; - } - /* signal the poll thread */ - tmp2 = write(ctx->kconsumerd_poll_pipe[1], "4", 1); - if (tmp2 < 0) { - perror("write kconsumerd poll"); - } - } else { - ERR("Didn't received any fd"); - lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); - ret = -1; - goto end; - } - } - -end: - return ret; -} - -/* - * Set the error socket. - */ -void lttng_kconsumerd_set_error_sock( - struct lttng_kconsumerd_local_data *ctx, int sock) -{ - ctx->kconsumerd_error_socket = sock; -} - -/* - * Set the command socket path. - */ - -void lttng_kconsumerd_set_command_sock_path( - struct lttng_kconsumerd_local_data *ctx, char *sock) -{ - ctx->kconsumerd_command_sock_path = sock; -} - -static void lttng_kconsumerd_sync_trace_file( - struct lttng_kconsumerd_fd *kconsumerd_fd, off_t orig_offset) -{ - int outfd = kconsumerd_fd->out_fd; - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } -} - - -/* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written - */ -int lttng_kconsumerd_on_read_subbuffer_mmap( - struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len) -{ - unsigned long mmap_offset; - long ret = 0; - off_t orig_offset = kconsumerd_fd->out_fd_offset; - int fd = kconsumerd_fd->consumerd_fd; - int outfd = kconsumerd_fd->out_fd; - - /* get the offset inside the fd to mmap */ - ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_read_offset"); - goto end; - } - - while (len > 0) { - ret = write(outfd, kconsumerd_fd->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - ret = errno; - perror("Error in file write"); - goto end; - } - /* This won't block, but will start writeout asynchronously */ - sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - kconsumerd_fd->out_fd_offset += ret; - } - - lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset); - - goto end; - -end: - return ret; -} - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. - */ -int lttng_kconsumerd_on_read_subbuffer_splice( - struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len) -{ - long ret = 0; - loff_t offset = 0; - off_t orig_offset = kconsumerd_fd->out_fd_offset; - int fd = kconsumerd_fd->consumerd_fd; - int outfd = kconsumerd_fd->out_fd; - - while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, ctx->kconsumerd_thread_pipe[1], NULL, len, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %ld", ret); - if (ret < 0) { - ret = errno; - perror("Error in relay splice"); - goto splice_error; - } - - ret = splice(ctx->kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %ld", ret); - if (ret < 0) { - ret = errno; - perror("Error in file splice"); - goto splice_error; - } - len -= ret; - /* This won't block, but will start writeout asynchronously */ - sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - kconsumerd_fd->out_fd_offset += ret; - } - lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset); - - goto end; - -splice_error: - /* send the appropriate error description to sessiond */ - switch(ret) { - case EBADF: - lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EBADF); - break; - case EINVAL: - lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EINVAL); - break; - case ENOMEM: - lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ENOMEM); - break; - case ESPIPE: - lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ESPIPE); - break; - } - -end: - return ret; -} - -/* - * Take a snapshot for a specific fd - * - * Returns 0 on success, < 0 on error - */ -int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd) -{ - int ret = 0; - int infd = kconsumerd_fd->consumerd_fd; - - ret = kernctl_snapshot(infd); - if (ret != 0) { - ret = errno; - perror("Getting sub-buffer snapshot."); - } - - return ret; -} - -/* - * Get the produced position - * - * Returns 0 on success, < 0 on error - */ -int lttng_kconsumerd_get_produced_snapshot( - struct lttng_kconsumerd_local_data *ctx, - struct lttng_kconsumerd_fd *kconsumerd_fd, - unsigned long *pos) -{ - int ret; - int infd = kconsumerd_fd->consumerd_fd; - - ret = kernctl_snapshot_get_produced(infd, pos); - if (ret != 0) { - ret = errno; - perror("kernctl_snapshot_get_produced"); - } - - return ret; -} - -/* - * Poll on the should_quit pipe and the command socket return -1 on error and - * should exit, 0 if data is available on the command socket - */ -int lttng_kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) -{ - int num_rdy; - - num_rdy = poll(kconsumerd_sockpoll, 2, -1); - if (num_rdy == -1) { - perror("Poll error"); - goto exit; - } - if (kconsumerd_sockpoll[0].revents == POLLIN) { - DBG("kconsumerd_should_quit wake up"); - goto exit; - } - return 0; - -exit: - return -1; -} - -/* - * This thread polls the fds in the ltt_fd_list to consume the data and write - * it to tracefile if necessary. - */ -void *lttng_kconsumerd_thread_poll_fds(void *data) -{ - int num_rdy, num_hup, high_prio, ret, i; - struct pollfd *pollfd = NULL; - /* local view of the fds */ - struct lttng_kconsumerd_fd **local_kconsumerd_fd = NULL; - /* local view of kconsumerd_data.fds_count */ - int nb_fd = 0; - char tmp; - int tmp2; - struct lttng_kconsumerd_local_data *ctx = data; - - - local_kconsumerd_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); - - while (1) { - high_prio = 0; - num_hup = 0; - - /* - * the ltt_fd_list has been updated, we need to update our - * local array as well - */ - pthread_mutex_lock(&kconsumerd_data.lock); - if (kconsumerd_data.need_update) { - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_kconsumerd_fd != NULL) { - free(local_kconsumerd_fd); - local_kconsumerd_fd = NULL; - } - - /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); - if (pollfd == NULL) { - perror("pollfd malloc"); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - - /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * - sizeof(struct lttng_kconsumerd_fd)); - if (local_kconsumerd_fd == NULL) { - perror("local_kconsumerd_fd malloc"); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - ret = kconsumerd_update_poll_array(ctx, &pollfd, local_kconsumerd_fd); - if (ret < 0) { - ERR("Error in allocating pollfd or local_outfds"); - lttng_kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - nb_fd = ret; - kconsumerd_data.need_update = 0; - } - pthread_mutex_unlock(&kconsumerd_data.lock); - - /* poll on the array of fds */ - DBG("polling on %d fd", nb_fd + 1); - num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout); - DBG("poll num_rdy : %d", num_rdy); - if (num_rdy == -1) { - perror("Poll error"); - lttng_kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR); - goto end; - } else if (num_rdy == 0) { - DBG("Polling thread timed out"); - goto end; - } - - /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */ - if (nb_fd == 0 && kconsumerd_quit == 1) { - goto end; - } - - /* - * If the kconsumerd_poll_pipe triggered poll go - * directly to the beginning of the loop to update the - * array. We want to prioritize array update over - * low-priority reads. - */ - if (pollfd[nb_fd].revents == POLLIN) { - DBG("kconsumerd_poll_pipe wake up"); - tmp2 = read(ctx->kconsumerd_poll_pipe[0], &tmp, 1); - if (tmp2 < 0) { - perror("read kconsumerd poll"); - } - continue; - } - - /* Take care of high priority channels first. */ - for (i = 0; i < nb_fd; i++) { - switch(pollfd[i].revents) { - case POLLERR: - ERR("Error returned in polling fd %d.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLHUP: - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLNVAL: - ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLPRI: - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; - } - break; - } - } - - /* If every buffer FD has hung up, we end the read loop here */ - if (nb_fd > 0 && num_hup == nb_fd) { - DBG("every buffer FD has hung up\n"); - if (kconsumerd_quit == 1) { - goto end; - } - continue; - } - - /* Take care of low priority channels. */ - if (high_prio == 0) { - for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents == POLLIN) { - DBG("Normal read on fd %d", pollfd[i].fd); - ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]); - /* it's ok to have an unavailable subbuffer */ - if (ret == EAGAIN) { - ret = 0; - } - } - } - } - } -end: - DBG("polling thread exiting"); - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_kconsumerd_fd != NULL) { - free(local_kconsumerd_fd); - local_kconsumerd_fd = NULL; - } - return NULL; -} - -/* - * Initialise the necessary environnement : - * - create a new context - * - create the poll_pipe - * - create the should_quit pipe (for signal handler) - * - create the thread pipe (for splice) - * - * Takes a function pointer as argument, this function is called when data is - * available on a buffer. This function is responsible to do the - * kernctl_get_next_subbuf, read the data with mmap or splice depending on the - * buffer configuration and then kernctl_put_next_subbuf at the end. - * - * Returns a pointer to the new context or NULL on error. - */ -struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( - int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd), - int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd), - int (*update_fd)(int sessiond_fd, uint32_t state)) -{ - int ret, i; - struct lttng_kconsumerd_local_data *ctx; - - ctx = malloc(sizeof(struct lttng_kconsumerd_local_data)); - if (ctx == NULL) { - perror("allocating context"); - goto error; - } - - ctx->kconsumerd_error_socket = -1; - /* assign the callbacks */ - ctx->on_buffer_ready = buffer_ready; - ctx->on_recv_fd = recv_fd; - ctx->on_update_fd = update_fd; - - ret = pipe(ctx->kconsumerd_poll_pipe); - if (ret < 0) { - perror("Error creating poll pipe"); - goto error_poll_pipe; - } - - ret = pipe(ctx->kconsumerd_should_quit); - if (ret < 0) { - perror("Error creating recv pipe"); - goto error_quit_pipe; - } - - ret = pipe(ctx->kconsumerd_thread_pipe); - if (ret < 0) { - perror("Error creating thread pipe"); - goto error_thread_pipe; - } - - return ctx; - - -error_thread_pipe: - for (i = 0; i < 2; i++) { - int err; - - err = close(ctx->kconsumerd_should_quit[i]); - assert(!err); - } -error_quit_pipe: - for (i = 0; i < 2; i++) { - int err; - - err = close(ctx->kconsumerd_poll_pipe[i]); - assert(!err); - } -error_poll_pipe: - free(ctx); -error: - return NULL; -} - -/* - * Close all fds associated with the instance and free the context. - */ -void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data *ctx) -{ - close(ctx->kconsumerd_error_socket); - close(ctx->kconsumerd_thread_pipe[0]); - close(ctx->kconsumerd_thread_pipe[1]); - close(ctx->kconsumerd_poll_pipe[0]); - close(ctx->kconsumerd_poll_pipe[1]); - close(ctx->kconsumerd_should_quit[0]); - close(ctx->kconsumerd_should_quit[1]); - unlink(ctx->kconsumerd_command_sock_path); - free(ctx); - ctx = NULL; -} - -/* - * This thread listens on the consumerd socket and receives the file - * descriptors from the session daemon. - */ -void *lttng_kconsumerd_thread_receive_fds(void *data) -{ - int sock, client_socket, ret; - struct lttcomm_kconsumerd_header tmp; - /* - * structure to poll for incoming data on communication socket avoids - * making blocking sockets. - */ - struct pollfd kconsumerd_sockpoll[2]; - struct lttng_kconsumerd_local_data *ctx = data; - - - DBG("Creating command socket %s", ctx->kconsumerd_command_sock_path); - unlink(ctx->kconsumerd_command_sock_path); - client_socket = lttcomm_create_unix_sock(ctx->kconsumerd_command_sock_path); - if (client_socket < 0) { - ERR("Cannot create command socket"); - goto end; - } - - ret = lttcomm_listen_unix_sock(client_socket); - if (ret < 0) { - goto end; - } - - DBG("Sending ready command to ltt-sessiond"); - ret = lttng_kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY); - /* return < 0 on error, but == 0 is not fatal */ - if (ret < 0) { - ERR("Error sending ready command to ltt-sessiond"); - goto end; - } - - ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); - if (ret < 0) { - perror("fcntl O_NONBLOCK"); - goto end; - } - - /* prepare the FDs to poll : to client socket and the should_quit pipe */ - kconsumerd_sockpoll[0].fd = ctx->kconsumerd_should_quit[0]; - kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; - kconsumerd_sockpoll[1].fd = client_socket; - kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; - - if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - DBG("Connection on client_socket"); - - /* Blocking call, waiting for transmission */ - sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { - WARN("On accept"); - goto end; - } - ret = fcntl(sock, F_SETFL, O_NONBLOCK); - if (ret < 0) { - perror("fcntl O_NONBLOCK"); - goto end; - } - - /* update the polling structure to poll on the established socket */ - kconsumerd_sockpoll[1].fd = sock; - kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; - - while (1) { - if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - DBG("Incoming fds on sock"); - - /* We first get the number of fd we are about to receive */ - ret = lttcomm_recv_unix_sock(sock, &tmp, - sizeof(struct lttcomm_kconsumerd_header)); - if (ret <= 0) { - ERR("Communication interrupted on command socket"); - goto end; - } - if (tmp.cmd_type == STOP) { - DBG("Received STOP command"); - goto end; - } - if (kconsumerd_quit) { - DBG("kconsumerd_thread_receive_fds received quit from signal"); - goto end; - } - - /* we received a command to add or update fds */ - ret = kconsumerd_consumerd_recv_fd(ctx, sock, kconsumerd_sockpoll, - tmp.payload_size, tmp.cmd_type); - if (ret < 0) { - ERR("Receiving the FD, exiting"); - goto end; - } - DBG("received fds on sock"); - } - -end: - DBG("kconsumerd_thread_receive_fds exiting"); - - /* - * when all fds have hung up, the polling thread - * can exit cleanly - */ - kconsumerd_quit = 1; - - /* - * 2s of grace period, if no polling events occur during - * this period, the polling thread will exit even if there - * are still open FDs (should not happen, but safety mechanism). - */ - kconsumerd_poll_timeout = LTTNG_KCONSUMERD_POLL_GRACE_PERIOD; - - /* wake up the polling thread */ - ret = write(ctx->kconsumerd_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("poll pipe write"); - } - return NULL; -} - -/* - * Close all the tracefiles and stream fds, should be called when all instances - * are destroyed. - */ -void lttng_kconsumerd_cleanup(void) -{ - struct lttng_kconsumerd_fd *iter, *tmp; - - /* - * close all outfd. Called when there are no more threads - * running (after joining on the threads), no need to protect - * list iteration with mutex. - */ - cds_list_for_each_entry_safe(iter, tmp, - &kconsumerd_data.fd_list.head, list) { - kconsumerd_del_fd(iter); - } -} - -/* - * Called from signal handler. - */ -void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data *ctx) -{ - int ret; - kconsumerd_quit = 1; - ret = write(ctx->kconsumerd_should_quit[1], "4", 1); - if (ret < 0) { - perror("write kconsumerd quit"); - } -} - -/* - * Send return code to the session daemon. - * If the socket is not defined, we return 0, it is not a fatal error - */ -int lttng_kconsumerd_send_error( - struct lttng_kconsumerd_local_data *ctx, int cmd) -{ - if (ctx->kconsumerd_error_socket > 0) { - return lttcomm_send_unix_sock(ctx->kconsumerd_error_socket, &cmd, - sizeof(enum lttcomm_sessiond_command)); - } - - return 0; -} diff --git a/libustctl/Makefile.am b/libustctl/Makefile.am deleted file mode 100644 index c20e9665c..000000000 --- a/libustctl/Makefile.am +++ /dev/null @@ -1,9 +0,0 @@ -AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/libustcomm -AM_CFLAGS = -fno-strict-aliasing - -noinst_LTLIBRARIES = libustctl.la - -libustctl_la_SOURCES = - -libustctl_la_LIBADD = $(top_builddir)/libustcomm/libustcomm.la - diff --git a/ltt-kconsumerd/Makefile.am b/ltt-kconsumerd/Makefile.am deleted file mode 100644 index 068c55544..000000000 --- a/ltt-kconsumerd/Makefile.am +++ /dev/null @@ -1,10 +0,0 @@ -AM_CPPFLAGS = -I$(top_srcdir)/include - -bin_PROGRAMS = ltt-kconsumerd - -ltt_kconsumerd_SOURCES = ltt-kconsumerd.c - -ltt_kconsumerd_LDADD = \ - $(top_builddir)/libkernelctl/libkernelctl.la \ - $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \ - $(top_builddir)/liblttngkconsumerd/liblttngkconsumerd.la diff --git a/ltt-sessiond/Makefile.am b/ltt-sessiond/Makefile.am index cdcdfc60e..0d284efb1 100644 --- a/ltt-sessiond/Makefile.am +++ b/ltt-sessiond/Makefile.am @@ -14,11 +14,8 @@ endif ltt_sessiond_SOURCES = utils.c utils.h \ compat/poll.h $(COMPAT) \ trace-kernel.c trace-kernel.h \ - trace-ust.c trace-ust.h \ - ust-app.c ust-app.h \ - ust-comm.c ust-comm.h \ - ust-ctl.c ust-ctl.h \ kernel-ctl.c kernel-ctl.h \ + ust-ctl.h ust-app.h trace-ust.h \ context.c context.h \ channel.c channel.h \ event.c event.h \ @@ -29,9 +26,16 @@ ltt_sessiond_SOURCES = utils.c utils.h \ ../hashtable/rculfhash.c \ ../hashtable/rculfhash.h +if LTTNG_TOOLS_HAVE_UST +ltt_sessiond_SOURCES += \ + trace-ust.c \ + ust-app.c \ + ust-comm.c ust-comm.h \ + ust-ctl.c +endif + # link on liblttngctl for check if sessiond is already alive. ltt_sessiond_LDADD = -lrt \ $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \ $(top_builddir)/libkernelctl/libkernelctl.la \ - $(top_builddir)/libustctl/libustctl.la \ $(top_builddir)/liblttngctl/liblttngctl.la diff --git a/ltt-sessiond/compat/compat-epoll.c b/ltt-sessiond/compat/compat-epoll.c index 431c5eb0d..e909b603c 100644 --- a/ltt-sessiond/compat/compat-epoll.c +++ b/ltt-sessiond/compat/compat-epoll.c @@ -21,6 +21,7 @@ #include #include #include +#include #include diff --git a/ltt-sessiond/kernel-ctl.c b/ltt-sessiond/kernel-ctl.c index 9c270edb3..203c01050 100644 --- a/ltt-sessiond/kernel-ctl.c +++ b/ltt-sessiond/kernel-ctl.c @@ -122,7 +122,7 @@ int kernel_create_session(struct ltt_session *session, int tracer_fd) perror("fcntl session fd"); } - lks->kconsumer_fds_sent = 0; + lks->consumer_fds_sent = 0; session->kernel_session = lks; DBG("Kernel session created (fd: %d)", lks->fd); diff --git a/ltt-sessiond/lttng-ust-abi.h b/ltt-sessiond/lttng-ust-abi.h new file mode 100644 index 000000000..303493eeb --- /dev/null +++ b/ltt-sessiond/lttng-ust-abi.h @@ -0,0 +1,137 @@ +#ifndef _LTTNG_UST_ABI_H +#define _LTTNG_UST_ABI_H + +/* + * lttng-ust-abi.h + * + * Copyright 2010-2011 (c) - Mathieu Desnoyers + * + * LTTng-UST ABI header + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include + +#define LTTNG_UST_SYM_NAME_LEN 128 + +#define LTTNG_UST_COMM_VERSION_MAJOR 0 +#define LTTNG_UST_COMM_VERSION_MINOR 1 + +enum lttng_ust_instrumentation { + LTTNG_UST_TRACEPOINT = 0, + LTTNG_UST_PROBE = 1, + LTTNG_UST_FUNCTION = 2, +}; + +enum lttng_ust_output { + LTTNG_UST_MMAP = 0, +}; + +struct lttng_ust_tracer_version { + uint32_t version; + uint32_t patchlevel; + uint32_t sublevel; +}; + +struct lttng_ust_channel { + int overwrite; /* 1: overwrite, 0: discard */ + uint64_t subbuf_size; /* in bytes */ + uint64_t num_subbuf; + unsigned int switch_timer_interval; /* usecs */ + unsigned int read_timer_interval; /* usecs */ + enum lttng_ust_output output; /* output mode */ + /* The following fields are used internally within UST. */ + int shm_fd; + int wait_fd; + uint64_t memory_map_size; +}; + +/* + * This structure is only used internally within UST. It is not per-se + * part of the communication between sessiond and UST. + */ +struct lttng_ust_stream { + int shm_fd; + int wait_fd; + uint64_t memory_map_size; +}; + +struct lttng_ust_event { + char name[LTTNG_UST_SYM_NAME_LEN]; /* event name */ + enum lttng_ust_instrumentation instrumentation; + /* Per instrumentation type configuration */ + union { + } u; +}; + +enum lttng_ust_context_type { + LTTNG_UST_CONTEXT_VTID = 0, + LTTNG_UST_CONTEXT_VPID = 1, + LTTNG_UST_CONTEXT_PTHREAD_ID = 2, + LTTNG_UST_CONTEXT_PROCNAME = 3, +}; + +struct lttng_ust_context { + enum lttng_ust_context_type ctx; + union { + } u; +}; + +#define _UST_CMD(minor) (minor) +#define _UST_CMDR(minor, type) (minor) +#define _UST_CMDW(minor, type) (minor) + +/* Handled by object descriptor */ +#define LTTNG_UST_RELEASE _UST_CMD(0x1) + +/* Handled by object cmd */ + +/* LTTng-UST commands */ +#define LTTNG_UST_SESSION _UST_CMD(0x40) +#define LTTNG_UST_TRACER_VERSION \ + _UST_CMDR(0x41, struct lttng_ust_tracer_version) +#define LTTNG_UST_TRACEPOINT_LIST _UST_CMD(0x42) +#define LTTNG_UST_WAIT_QUIESCENT _UST_CMD(0x43) +#define LTTNG_UST_REGISTER_DONE _UST_CMD(0x44) + +/* Session FD commands */ +#define LTTNG_UST_METADATA \ + _UST_CMDW(0x50, struct lttng_ust_channel) +#define LTTNG_UST_CHANNEL \ + _UST_CMDW(0x51, struct lttng_ust_channel) +#define LTTNG_UST_SESSION_START _UST_CMD(0x52) +#define LTTNG_UST_SESSION_STOP _UST_CMD(0x53) + +/* Channel FD commands */ +#define LTTNG_UST_STREAM _UST_CMD(0x60) +#define LTTNG_UST_EVENT \ + _UST_CMDW(0x61, struct lttng_ust_event) + +/* Event and Channel FD commands */ +#define LTTNG_UST_CONTEXT \ + _UST_CMDW(0x70, struct lttng_ust_context) + +/* Event, Channel and Session commands */ +#define LTTNG_UST_ENABLE _UST_CMD(0x80) +#define LTTNG_UST_DISABLE _UST_CMD(0x81) + +#define LTTNG_UST_ROOT_HANDLE 0 + +struct obj; + +struct objd_ops { + long (*cmd)(int objd, unsigned int cmd, unsigned long arg); + int (*release)(int objd); +}; + +/* Create root handle. Always ID 0. */ +int lttng_abi_create_root_handle(void); + +const struct objd_ops *objd_ops(int id); +int objd_unref(int id); + +void lttng_ust_abi_exit(void); +void ltt_events_exit(void); + +#endif /* _LTTNG_UST_ABI_H */ diff --git a/ltt-sessiond/main.c b/ltt-sessiond/main.c index f887fa124..5fec7ae4e 100644 --- a/ltt-sessiond/main.c +++ b/ltt-sessiond/main.c @@ -36,10 +36,11 @@ #include #include #include +#include -#include +#include #include -#include +#include #include #include "channel.h" @@ -55,6 +56,24 @@ #include "utils.h" #include "ust-ctl.h" +struct consumer_data { + enum lttng_consumer_type type; + + pthread_t thread; /* Worker thread interacting with the consumer */ + sem_t sem; + + /* Mutex to control consumerd pid assignation */ + pthread_mutex_t pid_mutex; + pid_t pid; + + int err_sock; + int cmd_sock; + + /* consumer error and command Unix socket path */ + char err_unix_sock_path[PATH_MAX]; + char cmd_unix_sock_path[PATH_MAX]; +}; + /* Const values */ const char default_home_dir[] = DEFAULT_HOME_DIR; const char default_tracing_group[] = LTTNG_DEFAULT_TRACING_GROUP; @@ -63,7 +82,7 @@ const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE; /* Variables */ int opt_verbose; /* Not static for lttngerr.h */ -int opt_verbose_kconsumerd; /* Not static for lttngerr.h */ +int opt_verbose_consumer; /* Not static for lttngerr.h */ int opt_quiet; /* Not static for lttngerr.h */ const char *progname; @@ -72,24 +91,27 @@ static int opt_sig_parent; static int opt_daemon; static int is_root; /* Set to 1 if the daemon is running as root */ static pid_t ppid; /* Parent PID for --sig-parent option */ -static pid_t kconsumerd_pid; + +/* Consumer daemon specific control data */ +static struct consumer_data kconsumer_data = { + .type = LTTNG_CONSUMER_KERNEL, +}; +static struct consumer_data ustconsumer_data = { + .type = LTTNG_CONSUMER_UST, +}; + static int dispatch_thread_exit; /* Global application Unix socket path */ static char apps_unix_sock_path[PATH_MAX]; /* Global client Unix socket path */ static char client_unix_sock_path[PATH_MAX]; -/* kconsumerd error and command Unix socket path */ -static char kconsumerd_err_unix_sock_path[PATH_MAX]; -static char kconsumerd_cmd_unix_sock_path[PATH_MAX]; /* global wait shm path for UST */ static char wait_shm_path[PATH_MAX]; /* Sockets and FDs */ static int client_sock; static int apps_sock; -static int kconsumerd_err_sock; -static int kconsumerd_cmd_sock; static int kernel_tracer_fd; static int kernel_poll_pipe[2]; @@ -106,17 +128,12 @@ static int thread_quit_pipe[2]; static int apps_cmd_pipe[2]; /* Pthread, Mutexes and Semaphores */ -static pthread_t kconsumerd_thread; static pthread_t apps_thread; static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; -static sem_t kconsumerd_sem; - -/* Mutex to control kconsumerd pid assignation */ -static pthread_mutex_t kconsumerd_pid_mutex; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -269,7 +286,7 @@ static void teardown_kernel_session(struct ltt_session *session) * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (session->kernel_session->consumer_fd != kconsumerd_cmd_sock) { + if (session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { lttcomm_close_unix_sock(session->kernel_session->consumer_fd); } @@ -346,7 +363,7 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumerd_pid_mutex); + pthread_mutex_destroy(&kconsumer_data.pid_mutex); DBG("Closing kernel fd"); close(kernel_tracer_fd); @@ -396,54 +413,57 @@ static void clean_command_ctx(struct command_ctx **cmd_ctx) /* * Send all stream fds of kernel channel to the consumer. */ -static int send_kconsumerd_channel_fds(int sock, - struct ltt_kernel_channel *channel) +static int send_consumer_channel_streams(struct consumer_data *consumer_data, + int sock, struct ltt_kernel_channel *channel) { int ret; size_t nb_fd; struct ltt_kernel_stream *stream; - struct lttcomm_kconsumerd_header lkh; - struct lttcomm_kconsumerd_msg lkm; + struct lttcomm_consumer_msg lkm; - DBG("Sending fds of channel %s to kernel consumer", + DBG("Sending streams of channel %s to kernel consumer", channel->channel->name); - nb_fd = channel->stream_count; - /* Setup header */ - lkh.payload_size = nb_fd * sizeof(struct lttcomm_kconsumerd_msg); - lkh.cmd_type = ADD_STREAM; - - DBG("Sending kconsumerd header"); - - ret = lttcomm_send_unix_sock(sock, &lkh, - sizeof(struct lttcomm_kconsumerd_header)); + /* Send channel */ + lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; + lkm.u.channel.channel_key = channel->fd; + lkm.u.channel.max_sb_size = channel->channel->attr.subbuf_size; + lkm.u.channel.mmap_len = 0; /* for kernel */ + DBG("Sending channel %d to consumer", lkm.u.stream.stream_key); + ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm)); if (ret < 0) { - perror("send kconsumerd header"); + perror("send consumer channel"); goto error; } + /* Send streams */ cds_list_for_each_entry(stream, &channel->stream_list.head, list) { - if (stream->fd != 0) { - lkm.fd = stream->fd; - lkm.state = stream->state; - lkm.max_sb_size = channel->channel->attr.subbuf_size; - lkm.output = channel->channel->attr.output; - strncpy(lkm.path_name, stream->pathname, PATH_MAX); - lkm.path_name[PATH_MAX - 1] = '\0'; - - DBG("Sending fd %d to kconsumerd", lkm.fd); - - ret = lttcomm_send_fds_unix_sock(sock, &lkm, - &lkm.fd, 1, sizeof(lkm)); - if (ret < 0) { - perror("send kconsumerd fd"); - goto error; - } + if (!stream->fd) { + continue; + } + lkm.cmd_type = LTTNG_CONSUMER_ADD_STREAM; + lkm.u.stream.channel_key = channel->fd; + lkm.u.stream.stream_key = stream->fd; + lkm.u.stream.state = stream->state; + lkm.u.stream.output = channel->channel->attr.output; + lkm.u.stream.mmap_len = 0; /* for kernel */ + strncpy(lkm.u.stream.path_name, stream->pathname, PATH_MAX - 1); + lkm.u.stream.path_name[PATH_MAX - 1] = '\0'; + DBG("Sending stream %d to consumer", lkm.u.stream.stream_key); + ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm)); + if (ret < 0) { + perror("send consumer stream"); + goto error; + } + ret = lttcomm_send_fds_unix_sock(sock, &stream->fd, 1); + if (ret < 0) { + perror("send consumer stream ancillary data"); + goto error; } } - DBG("Kconsumerd channel fds sent"); + DBG("consumer channel streams sent"); return 0; @@ -454,58 +474,64 @@ error: /* * Send all stream fds of the kernel session to the consumer. */ -static int send_kconsumerd_fds(struct ltt_kernel_session *session) +static int send_consumer_session_streams(struct consumer_data *consumer_data, + struct ltt_kernel_session *session) { int ret; struct ltt_kernel_channel *chan; - struct lttcomm_kconsumerd_header lkh; - struct lttcomm_kconsumerd_msg lkm; - - /* Setup header */ - lkh.payload_size = sizeof(struct lttcomm_kconsumerd_msg); - lkh.cmd_type = ADD_STREAM; - - DBG("Sending kconsumerd header for metadata"); - - ret = lttcomm_send_unix_sock(session->consumer_fd, &lkh, - sizeof(struct lttcomm_kconsumerd_header)); - if (ret < 0) { - perror("send kconsumerd header"); - goto error; - } + struct lttcomm_consumer_msg lkm; + int sock = session->consumer_fd; DBG("Sending metadata stream fd"); /* Extra protection. It's NOT suppose to be set to 0 at this point */ if (session->consumer_fd == 0) { - session->consumer_fd = kconsumerd_cmd_sock; + session->consumer_fd = consumer_data->cmd_sock; } if (session->metadata_stream_fd != 0) { - /* Send metadata stream fd first */ - lkm.fd = session->metadata_stream_fd; - lkm.state = ACTIVE_FD; - lkm.max_sb_size = session->metadata->conf->attr.subbuf_size; - lkm.output = DEFAULT_KERNEL_CHANNEL_OUTPUT; - strncpy(lkm.path_name, session->metadata->pathname, PATH_MAX); - lkm.path_name[PATH_MAX - 1] = '\0'; - - ret = lttcomm_send_fds_unix_sock(session->consumer_fd, &lkm, - &lkm.fd, 1, sizeof(lkm)); + /* Send metadata channel fd */ + lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; + lkm.u.channel.channel_key = session->metadata->fd; + lkm.u.channel.max_sb_size = session->metadata->conf->attr.subbuf_size; + lkm.u.channel.mmap_len = 0; /* for kernel */ + DBG("Sending metadata channel %d to consumer", lkm.u.stream.stream_key); + ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm)); + if (ret < 0) { + perror("send consumer channel"); + goto error; + } + + /* Send metadata stream fd */ + lkm.cmd_type = LTTNG_CONSUMER_ADD_STREAM; + lkm.u.stream.channel_key = session->metadata->fd; + lkm.u.stream.stream_key = session->metadata_stream_fd; + lkm.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM; + lkm.u.stream.output = DEFAULT_KERNEL_CHANNEL_OUTPUT; + lkm.u.stream.mmap_len = 0; /* for kernel */ + strncpy(lkm.u.stream.path_name, session->metadata->pathname, PATH_MAX - 1); + lkm.u.stream.path_name[PATH_MAX - 1] = '\0'; + DBG("Sending metadata stream %d to consumer", lkm.u.stream.stream_key); + ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm)); + if (ret < 0) { + perror("send consumer stream"); + goto error; + } + ret = lttcomm_send_fds_unix_sock(sock, &session->metadata_stream_fd, 1); if (ret < 0) { - perror("send kconsumerd fd"); + perror("send consumer stream"); goto error; } } cds_list_for_each_entry(chan, &session->channel_list.head, list) { - ret = send_kconsumerd_channel_fds(session->consumer_fd, chan); + ret = send_consumer_channel_streams(consumer_data, sock, chan); if (ret < 0) { goto error; } } - DBG("Kconsumerd fds (metadata and channel streams) sent"); + DBG("consumer fds (metadata and channel streams) sent"); return 0; @@ -618,7 +644,7 @@ error: * * Useful for CPU hotplug feature. */ -static int update_kernel_stream(int fd) +static int update_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; @@ -636,7 +662,7 @@ static int update_kernel_stream(int fd) /* This is not suppose to be 0 but this is an extra security check */ if (session->kernel_session->consumer_fd == 0) { - session->kernel_session->consumer_fd = kconsumerd_cmd_sock; + session->kernel_session->consumer_fd = consumer_data->cmd_sock; } cds_list_for_each_entry(channel, @@ -653,8 +679,8 @@ static int update_kernel_stream(int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->kconsumer_fds_sent == 1) { - ret = send_kconsumerd_channel_fds( + if (session->kernel_session->consumer_fds_sent == 1) { + ret = send_consumer_channel_streams(consumer_data, session->kernel_session->consumer_fd, channel); if (ret < 0) { goto error; @@ -754,7 +780,7 @@ static void *thread_manage_kernel(void *data) * kernel session and updating the kernel consumer */ if (revents & LPOLLIN) { - ret = update_kernel_stream(pollfd); + ret = update_stream(&kconsumer_data, pollfd); if (ret < 0) { continue; } @@ -779,18 +805,19 @@ error: } /* - * This thread manage the kconsumerd error sent back to the session daemon. + * This thread manage the consumer error sent back to the session daemon. */ -static void *thread_manage_kconsumerd(void *data) +static void *thread_manage_consumer(void *data) { int sock = 0, i, ret, pollfd; uint32_t revents, nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; + struct consumer_data *consumer_data = data; - DBG("[thread] Manage kconsumerd started"); + DBG("[thread] Manage consumer started"); - ret = lttcomm_listen_unix_sock(kconsumerd_err_sock); + ret = lttcomm_listen_unix_sock(consumer_data->err_sock); if (ret < 0) { goto error; } @@ -804,7 +831,7 @@ static void *thread_manage_kconsumerd(void *data) goto error; } - ret = lttng_poll_add(&events, kconsumerd_err_sock, LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } @@ -829,20 +856,20 @@ static void *thread_manage_kconsumerd(void *data) } /* Event on the registration socket */ - if (pollfd == kconsumerd_err_sock) { + if (pollfd == consumer_data->err_sock) { if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Kconsumerd err socket poll error"); + ERR("consumer err socket poll error"); goto error; } } } - sock = lttcomm_accept_unix_sock(kconsumerd_err_sock); + sock = lttcomm_accept_unix_sock(consumer_data->err_sock); if (sock < 0) { goto error; } - DBG2("Receiving code from kconsumerd_err_sock"); + DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ ret = lttcomm_recv_unix_sock(sock, &code, @@ -851,25 +878,25 @@ static void *thread_manage_kconsumerd(void *data) goto error; } - if (code == KCONSUMERD_COMMAND_SOCK_READY) { - kconsumerd_cmd_sock = - lttcomm_connect_unix_sock(kconsumerd_cmd_unix_sock_path); - if (kconsumerd_cmd_sock < 0) { - sem_post(&kconsumerd_sem); - perror("kconsumerd connect"); + if (code == CONSUMERD_COMMAND_SOCK_READY) { + consumer_data->cmd_sock = + lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0) { + sem_post(&consumer_data->sem); + perror("consumer connect"); goto error; } /* Signal condition to tell that the kconsumerd is ready */ - sem_post(&kconsumerd_sem); - DBG("Kconsumerd command socket ready"); + sem_post(&consumer_data->sem); + DBG("consumer command socket ready"); } else { - ERR("Kconsumerd error when waiting for SOCK_READY : %s", + ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } /* Remove the kconsumerd error sock since we've established a connexion */ - ret = lttng_poll_del(&events, kconsumerd_err_sock); + ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; } @@ -902,7 +929,7 @@ static void *thread_manage_kconsumerd(void *data) /* Event on the kconsumerd socket */ if (pollfd == sock) { if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Kconsumerd err socket second poll error"); + ERR("consumer err socket second poll error"); goto error; } } @@ -912,21 +939,21 @@ static void *thread_manage_kconsumerd(void *data) ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { - ERR("Kconsumerd closed the command socket"); + ERR("consumer closed the command socket"); goto error; } - ERR("Kconsumerd return code : %s", lttcomm_get_readable_code(-code)); + ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); error: - DBG("Kconsumerd thread dying"); - close(kconsumerd_err_sock); - close(kconsumerd_cmd_sock); + DBG("consumer thread dying"); + close(consumer_data->err_sock); + close(consumer_data->cmd_sock); close(sock); - unlink(kconsumerd_err_unix_sock_path); - unlink(kconsumerd_cmd_unix_sock_path); - kconsumerd_pid = 0; + unlink(consumer_data->err_unix_sock_path); + unlink(consumer_data->cmd_unix_sock_path); + consumer_data->pid = 0; lttng_poll_clean(&events); @@ -1259,10 +1286,10 @@ error: } /* - * Start the thread_manage_kconsumerd. This must be done after a kconsumerd + * Start the thread_manage_consumer. This must be done after a lttng-consumerd * exec or it will fails. */ -static int spawn_kconsumerd_thread(void) +static int spawn_consumer_thread(struct consumer_data *consumer_data) { int ret; struct timespec timeout; @@ -1271,16 +1298,16 @@ static int spawn_kconsumerd_thread(void) timeout.tv_nsec = 0; /* Setup semaphore */ - ret = sem_init(&kconsumerd_sem, 0, 0); + ret = sem_init(&consumer_data->sem, 0, 0); if (ret < 0) { - PERROR("sem_init kconsumerd_sem"); + PERROR("sem_init consumer semaphore"); goto error; } - ret = pthread_create(&kconsumerd_thread, NULL, - thread_manage_kconsumerd, (void *) NULL); + ret = pthread_create(&consumer_data->thread, NULL, + thread_manage_consumer, consumer_data); if (ret != 0) { - PERROR("pthread_create kconsumerd"); + PERROR("pthread_create consumer"); ret = -1; goto error; } @@ -1288,13 +1315,13 @@ static int spawn_kconsumerd_thread(void) /* Get time for sem_timedwait absolute timeout */ ret = clock_gettime(CLOCK_REALTIME, &timeout); if (ret < 0) { - PERROR("clock_gettime spawn kconsumerd"); + PERROR("clock_gettime spawn consumer"); /* Infinite wait for the kconsumerd thread to be ready */ - ret = sem_wait(&kconsumerd_sem); + ret = sem_wait(&consumer_data->sem); } else { /* Normal timeout if the gettime was successful */ timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT; - ret = sem_timedwait(&kconsumerd_sem, &timeout); + ret = sem_timedwait(&consumer_data->sem, &timeout); } if (ret < 0) { @@ -1303,24 +1330,24 @@ static int spawn_kconsumerd_thread(void) * Call has timed out so we kill the kconsumerd_thread and return * an error. */ - ERR("The kconsumerd thread was never ready. Killing it"); - ret = pthread_cancel(kconsumerd_thread); + ERR("The consumer thread was never ready. Killing it"); + ret = pthread_cancel(consumer_data->thread); if (ret < 0) { - PERROR("pthread_cancel kconsumerd_thread"); + PERROR("pthread_cancel consumer thread"); } } else { - PERROR("semaphore wait failed kconsumerd thread"); + PERROR("semaphore wait failed consumer thread"); } goto error; } - pthread_mutex_lock(&kconsumerd_pid_mutex); - if (kconsumerd_pid == 0) { + pthread_mutex_lock(&consumer_data->pid_mutex); + if (consumer_data->pid == 0) { ERR("Kconsumerd did not start"); - pthread_mutex_unlock(&kconsumerd_pid_mutex); + pthread_mutex_unlock(&consumer_data->pid_mutex); goto error; } - pthread_mutex_unlock(&kconsumerd_pid_mutex); + pthread_mutex_unlock(&consumer_data->pid_mutex); return 0; @@ -1329,96 +1356,103 @@ error: } /* - * Join kernel consumer thread + * Join consumer thread */ -static int join_kconsumerd_thread(void) +static int join_consumer_thread(struct consumer_data *consumer_data) { void *status; int ret; - if (kconsumerd_pid != 0) { - ret = kill(kconsumerd_pid, SIGTERM); + if (consumer_data->pid != 0) { + ret = kill(consumer_data->pid, SIGTERM); if (ret) { - ERR("Error killing kconsumerd"); + ERR("Error killing consumer daemon"); return ret; } - return pthread_join(kconsumerd_thread, &status); + return pthread_join(consumer_data->thread, &status); } else { return 0; } } /* - * Fork and exec a kernel consumer daemon (kconsumerd). + * Fork and exec a consumer daemon (consumerd). * * Return pid if successful else -1. */ -static pid_t spawn_kconsumerd(void) +static pid_t spawn_consumerd(struct consumer_data *consumer_data) { int ret; pid_t pid; const char *verbosity; - DBG("Spawning kconsumerd"); + DBG("Spawning consumerd"); pid = fork(); if (pid == 0) { /* - * Exec kconsumerd. + * Exec consumerd. */ - if (opt_verbose > 1 || opt_verbose_kconsumerd) { + if (opt_verbose > 1 || opt_verbose_consumer) { verbosity = "--verbose"; } else { verbosity = "--quiet"; } - execl(INSTALL_BIN_PATH "/ltt-kconsumerd", - "ltt-kconsumerd", verbosity, NULL); + switch (consumer_data->type) { + case LTTNG_CONSUMER_KERNEL: + execl(INSTALL_BIN_PATH "/lttng-consumerd", + "lttng-consumerd", verbosity, "-k", NULL); + break; + case LTTNG_CONSUMER_UST: + execl(INSTALL_BIN_PATH "/lttng-consumerd", + "lttng-consumerd", verbosity, "-u", NULL); + break; + default: + perror("unknown consumer type"); + exit(EXIT_FAILURE); + } if (errno != 0) { perror("kernel start consumer exec"); } exit(EXIT_FAILURE); } else if (pid > 0) { ret = pid; - goto error; } else { - perror("kernel start consumer fork"); + perror("start consumer fork"); ret = -errno; - goto error; } - -error: return ret; } /* - * Spawn the kconsumerd daemon and session daemon thread. + * Spawn the consumerd daemon and session daemon thread. */ -static int start_kconsumerd(void) +static int start_consumerd(struct consumer_data *consumer_data) { int ret; - pthread_mutex_lock(&kconsumerd_pid_mutex); - if (kconsumerd_pid != 0) { - pthread_mutex_unlock(&kconsumerd_pid_mutex); + pthread_mutex_lock(&consumer_data->pid_mutex); + if (consumer_data->pid != 0) { + pthread_mutex_unlock(&consumer_data->pid_mutex); goto end; } - ret = spawn_kconsumerd(); + ret = spawn_consumerd(consumer_data); if (ret < 0) { - ERR("Spawning kconsumerd failed"); - pthread_mutex_unlock(&kconsumerd_pid_mutex); + ERR("Spawning consumerd failed"); + pthread_mutex_unlock(&consumer_data->pid_mutex); goto error; } - /* Setting up the global kconsumerd_pid */ - kconsumerd_pid = ret; - DBG2("Kconsumerd pid %d", kconsumerd_pid); - pthread_mutex_unlock(&kconsumerd_pid_mutex); + /* Setting up the consumer_data pid */ + consumer_data->pid = ret; + DBG2("consumer pid %d", consumer_data->pid); + pthread_mutex_unlock(&consumer_data->pid_mutex); - DBG2("Spawning kconsumerd thread"); - ret = spawn_kconsumerd_thread(); + DBG2("Spawning consumer control thread"); + ret = spawn_consumer_thread(consumer_data); if (ret < 0) { - ERR("Fatal error spawning kconsumerd thread"); + ERR("Fatal error spawning consumer control thread"); goto error; } @@ -1580,23 +1614,23 @@ static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; - if (session->kconsumer_fds_sent == 0) { + if (session->consumer_fds_sent == 0) { /* * Assign default kernel consumer socket if no consumer assigned to the * kernel session. At this point, it's NOT suppose to be 0 but this is * an extra security check. */ if (session->consumer_fd == 0) { - session->consumer_fd = kconsumerd_cmd_sock; + session->consumer_fd = kconsumer_data.cmd_sock; } - ret = send_kconsumerd_fds(session); + ret = send_consumer_session_streams(&kconsumer_data, session); if (ret < 0) { ret = LTTCOMM_KERN_CONSUMER_FAIL; goto error; } - session->kconsumer_fds_sent = 1; + session->consumer_fds_sent = 1; } error: @@ -1677,8 +1711,8 @@ static int create_kernel_session(struct ltt_session *session) } /* Set kernel consumer socket fd */ - if (kconsumerd_cmd_sock) { - session->kernel_session->consumer_fd = kconsumerd_cmd_sock; + if (kconsumer_data.cmd_sock) { + session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; } ret = mkdir_recursive(session->kernel_session->trace_path, @@ -2614,17 +2648,17 @@ static int process_client_msg(struct command_ctx *cmd_ctx) } /* Start the kernel consumer daemon */ - pthread_mutex_lock(&kconsumerd_pid_mutex); - if (kconsumerd_pid == 0 && + pthread_mutex_lock(&kconsumer_data.pid_mutex); + if (kconsumer_data.pid == 0 && cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { - pthread_mutex_unlock(&kconsumerd_pid_mutex); - ret = start_kconsumerd(); + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + ret = start_consumerd(&kconsumer_data); if (ret < 0) { ret = LTTCOMM_KERN_CONSUMER_FAIL; goto error; } } - pthread_mutex_unlock(&kconsumerd_pid_mutex); + pthread_mutex_unlock(&kconsumer_data.pid_mutex); } break; case LTTNG_DOMAIN_UST_PID: @@ -3058,13 +3092,15 @@ static void usage(void) fprintf(stderr, " -a, --apps-sock PATH Specify path for apps unix socket\n"); fprintf(stderr, " --kconsumerd-err-sock PATH Specify path for the kernel consumer error socket\n"); fprintf(stderr, " --kconsumerd-cmd-sock PATH Specify path for the kernel consumer command socket\n"); + fprintf(stderr, " --ustconsumerd-err-sock PATH Specify path for the UST consumer error socket\n"); + fprintf(stderr, " --ustconsumerd-cmd-sock PATH Specify path for the UST consumer command socket\n"); fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); fprintf(stderr, " -V, --version Show version number.\n"); fprintf(stderr, " -S, --sig-parent Send SIGCHLD to parent pid to notify readiness.\n"); fprintf(stderr, " -q, --quiet No output at all.\n"); fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); - fprintf(stderr, " --verbose-kconsumerd Verbose mode for kconsumerd. Activate DBG() macro.\n"); + fprintf(stderr, " --verbose-consumer Verbose mode for consumer. Activate DBG() macro.\n"); } /* @@ -3077,8 +3113,10 @@ static int parse_args(int argc, char **argv) static struct option long_options[] = { { "client-sock", 1, 0, 'c' }, { "apps-sock", 1, 0, 'a' }, - { "kconsumerd-cmd-sock", 1, 0, 0 }, - { "kconsumerd-err-sock", 1, 0, 0 }, + { "kconsumerd-cmd-sock", 1, 0, 'C' }, + { "kconsumerd-err-sock", 1, 0, 'E' }, + { "ustconsumerd-cmd-sock", 1, 0, 'D' }, + { "ustconsumerd-err-sock", 1, 0, 'F' }, { "daemonize", 0, 0, 'd' }, { "sig-parent", 0, 0, 'S' }, { "help", 0, 0, 'h' }, @@ -3086,13 +3124,13 @@ static int parse_args(int argc, char **argv) { "version", 0, 0, 'V' }, { "quiet", 0, 0, 'q' }, { "verbose", 0, 0, 'v' }, - { "verbose-kconsumerd", 0, 0, 'Z' }, + { "verbose-consumer", 0, 0, 'Z' }, { NULL, 0, 0, 0 } }; while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhqvVS" "a:c:g:s:E:C:Z", + c = getopt_long(argc, argv, "dhqvVS" "a:c:g:s:C:E:D:F:Z", long_options, &option_index); if (c == -1) { break; @@ -3127,10 +3165,16 @@ static int parse_args(int argc, char **argv) opt_sig_parent = 1; break; case 'E': - snprintf(kconsumerd_err_unix_sock_path, PATH_MAX, "%s", optarg); + snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX, "%s", optarg); break; case 'C': - snprintf(kconsumerd_cmd_unix_sock_path, PATH_MAX, "%s", optarg); + snprintf(kconsumer_data.cmd_unix_sock_path, PATH_MAX, "%s", optarg); + break; + case 'F': + snprintf(ustconsumer_data.err_unix_sock_path, PATH_MAX, "%s", optarg); + break; + case 'D': + snprintf(ustconsumer_data.cmd_unix_sock_path, PATH_MAX, "%s", optarg); break; case 'q': opt_quiet = 1; @@ -3140,7 +3184,7 @@ static int parse_args(int argc, char **argv) opt_verbose += 1; break; case 'Z': - opt_verbose_kconsumerd += 1; + opt_verbose_consumer += 1; break; default: /* Unknown option or other error. @@ -3258,10 +3302,17 @@ static int set_permissions(void) perror("chown"); } - /* kconsumerd error socket path */ - ret = chown(kconsumerd_err_unix_sock_path, 0, gid); + /* kconsumer error socket path */ + ret = chown(kconsumer_data.err_unix_sock_path, 0, gid); if (ret < 0) { - ERR("Unable to set group on %s", kconsumerd_err_unix_sock_path); + ERR("Unable to set group on %s", kconsumer_data.err_unix_sock_path); + perror("chown"); + } + + /* ustconsumer error socket path */ + ret = chown(ustconsumer_data.err_unix_sock_path, 0, gid); + if (ret < 0) { + ERR("Unable to set group on %s", ustconsumer_data.err_unix_sock_path); perror("chown"); } @@ -3312,43 +3363,49 @@ error: * Setup sockets and directory needed by the kconsumerd communication with the * session daemon. */ -static int set_kconsumerd_sockets(void) +static int set_consumer_sockets(struct consumer_data *consumer_data) { int ret; + const char *path = consumer_data->type == LTTNG_CONSUMER_KERNEL ? + KCONSUMERD_PATH : USTCONSUMERD_PATH; - if (strlen(kconsumerd_err_unix_sock_path) == 0) { - snprintf(kconsumerd_err_unix_sock_path, PATH_MAX, - KCONSUMERD_ERR_SOCK_PATH); + if (strlen(consumer_data->err_unix_sock_path) == 0) { + snprintf(consumer_data->err_unix_sock_path, PATH_MAX, + consumer_data->type == LTTNG_CONSUMER_KERNEL ? + KCONSUMERD_ERR_SOCK_PATH : + USTCONSUMERD_ERR_SOCK_PATH); } - if (strlen(kconsumerd_cmd_unix_sock_path) == 0) { - snprintf(kconsumerd_cmd_unix_sock_path, PATH_MAX, - KCONSUMERD_CMD_SOCK_PATH); + if (strlen(consumer_data->cmd_unix_sock_path) == 0) { + snprintf(consumer_data->cmd_unix_sock_path, PATH_MAX, + consumer_data->type == LTTNG_CONSUMER_KERNEL ? + KCONSUMERD_CMD_SOCK_PATH : + USTCONSUMERD_CMD_SOCK_PATH); } - ret = mkdir(KCONSUMERD_PATH, S_IRWXU | S_IRWXG); + ret = mkdir(path, S_IRWXU | S_IRWXG); if (ret < 0) { if (errno != EEXIST) { - ERR("Failed to create " KCONSUMERD_PATH); + ERR("Failed to create %s", path); goto error; } ret = 0; } /* Create the kconsumerd error unix socket */ - kconsumerd_err_sock = - lttcomm_create_unix_sock(kconsumerd_err_unix_sock_path); - if (kconsumerd_err_sock < 0) { - ERR("Create unix sock failed: %s", kconsumerd_err_unix_sock_path); + consumer_data->err_sock = + lttcomm_create_unix_sock(consumer_data->err_unix_sock_path); + if (consumer_data->err_sock < 0) { + ERR("Create unix sock failed: %s", consumer_data->err_unix_sock_path); ret = -1; goto error; } /* File permission MUST be 660 */ - ret = chmod(kconsumerd_err_unix_sock_path, + ret = chmod(consumer_data->err_unix_sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); if (ret < 0) { - ERR("Set file permissions failed: %s", kconsumerd_err_unix_sock_path); + ERR("Set file permissions failed: %s", consumer_data->err_unix_sock_path); perror("chmod"); goto error; } @@ -3543,11 +3600,14 @@ int main(int argc, char **argv) * kernel tracer. */ if (is_root) { - ret = set_kconsumerd_sockets(); + ret = set_consumer_sockets(&kconsumer_data); + if (ret < 0) { + goto exit; + } + ret = set_consumer_sockets(&ustconsumer_data); if (ret < 0) { goto exit; } - /* Setup kernel tracer */ init_kernel_tracer(); @@ -3670,9 +3730,9 @@ exit_dispatch: goto error; /* join error, exit without cleanup */ } - ret = join_kconsumerd_thread(); + ret = join_consumer_thread(&kconsumer_data); if (ret != 0) { - perror("join_kconsumerd"); + perror("join_consumer"); goto error; /* join error, exit without cleanup */ } diff --git a/ltt-sessiond/trace-kernel.h b/ltt-sessiond/trace-kernel.h index 6bf5752fd..2bf2f03c9 100644 --- a/ltt-sessiond/trace-kernel.h +++ b/ltt-sessiond/trace-kernel.h @@ -82,7 +82,7 @@ struct ltt_kernel_stream { struct ltt_kernel_session { int fd; int metadata_stream_fd; - int kconsumer_fds_sent; + int consumer_fds_sent; int consumer_fd; unsigned int channel_count; unsigned int stream_count_global; diff --git a/ltt-sessiond/trace-ust.c b/ltt-sessiond/trace-ust.c index fe007141e..cf1642a6e 100644 --- a/ltt-sessiond/trace-ust.c +++ b/ltt-sessiond/trace-ust.c @@ -23,7 +23,6 @@ #include #include -#include #include "trace-ust.h" @@ -123,7 +122,7 @@ struct ltt_ust_session *trace_ust_create_session(char *path, pid_t pid, /* Init data structure */ lus->handle = -1; lus->enabled = 1; - lus->uconsumer_fds_sent = 0; + lus->consumer_fds_sent = 0; lus->metadata = NULL; lus->channels.count = 0; CDS_INIT_LIST_HEAD(&lus->channels.head); diff --git a/ltt-sessiond/trace-ust.h b/ltt-sessiond/trace-ust.h index 9a2366269..abbf9a9d1 100644 --- a/ltt-sessiond/trace-ust.h +++ b/ltt-sessiond/trace-ust.h @@ -19,11 +19,22 @@ #ifndef _LTT_TRACE_UST_H #define _LTT_TRACE_UST_H +#include #include #include - #include -#include + +/* + * FIXME: temporary workaround: we use a lttng-tools local version of + * lttng-ust-abi.h if UST is not found. Eventually, we should use our + * own internal structures within lttng-tools instead of relying on the + * UST ABI. + */ +#ifdef CONFIG_CONFIG_LTTNG_TOOLS_HAVE_UST +#include +#else +#include "lttng-ust-abi.h" +#endif /* * UST session list. @@ -77,7 +88,7 @@ struct ltt_ust_metadata { struct ltt_ust_session { int handle; int enabled; - int uconsumer_fds_sent; + int consumer_fds_sent; char path[PATH_MAX]; struct lttng_domain domain; struct ltt_ust_metadata *metadata; @@ -85,6 +96,8 @@ struct ltt_ust_session { struct cds_list_head list; }; +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + /* * Lookup functions. NULL is returned if not found. */ @@ -114,4 +127,67 @@ void trace_ust_destroy_metadata(struct ltt_ust_metadata *metadata); void trace_ust_destroy_channel(struct ltt_ust_channel *channel); void trace_ust_destroy_event(struct ltt_ust_event *event); +#else + +static inline +struct ltt_ust_event *trace_ust_get_event_by_name( + char *name, struct ltt_ust_channel *channel) +{ + return NULL; +} +static inline +struct ltt_ust_channel *trace_ust_get_channel_by_name( + char *name, struct ltt_ust_session *session) +{ + return NULL; +} +static inline +struct ltt_ust_session *trace_ust_get_session_by_pid( + struct ltt_ust_session_list *session_list, pid_t pid) +{ + return NULL; +} + +static inline +struct ltt_ust_session *trace_ust_create_session(char *path, pid_t pid, + struct lttng_domain *domain) +{ + return NULL; +} +static inline +struct ltt_ust_channel *trace_ust_create_channel(struct lttng_channel *attr, + char *path) +{ + return NULL; +} +static inline +struct ltt_ust_event *trace_ust_create_event(struct lttng_event *ev) +{ + return NULL; +} +static inline +struct ltt_ust_metadata *trace_ust_create_metadata(char *path) +{ + return NULL; +} + +static inline +void trace_ust_destroy_session(struct ltt_ust_session *session) +{ +} +static inline +void trace_ust_destroy_metadata(struct ltt_ust_metadata *metadata) +{ +} +static inline +void trace_ust_destroy_channel(struct ltt_ust_channel *channel) +{ +} +static inline +void trace_ust_destroy_event(struct ltt_ust_event *event) +{ +} + +#endif + #endif /* _LTT_TRACE_UST_H */ diff --git a/ltt-sessiond/ust-app.h b/ltt-sessiond/ust-app.h index f2d3d261c..cc08a4c33 100644 --- a/ltt-sessiond/ust-app.h +++ b/ltt-sessiond/ust-app.h @@ -40,7 +40,7 @@ struct ust_register_msg { /* * Traceable application list. */ -struct ust_app_list{ +struct ust_app_list { /* * This lock protects any read/write access to the list and count (which is * basically the list size). All public functions in traceable-app.c @@ -76,6 +76,8 @@ struct ust_app { struct cds_list_head list; }; +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + int ust_app_register(struct ust_register_msg *msg, int sock); void ust_app_unregister(int sock); unsigned int ust_app_list_count(void); @@ -86,4 +88,48 @@ void ust_app_clean_list(void); struct ust_app_list *ust_app_get_list(void); struct ust_app *ust_app_get_by_pid(pid_t pid); +#else + +static inline +int ust_app_register(struct ust_register_msg *msg, int sock) +{ + return -ENOSYS; +} +static inline +void ust_app_unregister(int sock) +{ +} +static inline +unsigned int ust_app_list_count(void) +{ + return 0; +} + +static inline +void ust_app_lock_list(void) +{ +} +static inline +void ust_app_unlock_list(void) +{ +} +static inline +void ust_app_clean_list(void) +{ +} +static inline +struct ust_app_list *ust_app_get_list(void) +{ + return NULL; +} +static inline +struct ust_app *ust_app_get_by_pid(pid_t pid) +{ + return NULL; +} + + + +#endif + #endif /* _TRACEABLE_APP_H */ diff --git a/ltt-sessiond/ust-comm.c b/ltt-sessiond/ust-comm.c index 455d40c0a..724260511 100644 --- a/ltt-sessiond/ust-comm.c +++ b/ltt-sessiond/ust-comm.c @@ -16,10 +16,9 @@ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +#include #include - #include - #include "ust-comm.h" /* diff --git a/ltt-sessiond/ust-ctl.c b/ltt-sessiond/ust-ctl.c index 2753b6f5a..2f43b1e60 100644 --- a/ltt-sessiond/ust-ctl.c +++ b/ltt-sessiond/ust-ctl.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#include #include #include #include diff --git a/ltt-sessiond/ust-ctl.h b/ltt-sessiond/ust-ctl.h index c4d754eca..a592abf22 100644 --- a/ltt-sessiond/ust-ctl.h +++ b/ltt-sessiond/ust-ctl.h @@ -23,6 +23,8 @@ #include "trace-ust.h" +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + int ustctl_register_done(int sock); int ustctl_create_channel(int sock, struct ltt_ust_session *session, struct lttng_channel *channel); @@ -33,4 +35,42 @@ int ustctl_disable_channel(int sock, struct ltt_ust_session *session, int ustctl_enable_channel(int sock, struct ltt_ust_session *session, struct ltt_ust_channel *chan); +#else + +static inline +int ustctl_register_done(int sock) +{ + return -ENOSYS; +} +static inline +int ustctl_create_channel(int sock, struct ltt_ust_session *session, + struct lttng_channel *channel) +{ + return -ENOSYS; +} +static inline +int ustctl_create_session(int sock, struct ltt_ust_session *session) +{ + return -ENOSYS; +} +static inline +int ustctl_destroy_session(int sock, struct ltt_ust_session *session) +{ + return -ENOSYS; +} +static inline +int ustctl_disable_channel(int sock, struct ltt_ust_session *session, + struct ltt_ust_channel *chan) +{ + return -ENOSYS; +} +static inline +int ustctl_enable_channel(int sock, struct ltt_ust_session *session, + struct ltt_ust_channel *chan) +{ + return -ENOSYS; +} + +#endif + #endif /* _LTT_UST_CTL_H */ diff --git a/lttng-consumerd/Makefile.am b/lttng-consumerd/Makefile.am new file mode 100644 index 000000000..2c1d56419 --- /dev/null +++ b/lttng-consumerd/Makefile.am @@ -0,0 +1,14 @@ +AM_CPPFLAGS = -I$(top_srcdir)/include + +bin_PROGRAMS = lttng-consumerd + +lttng_consumerd_SOURCES = lttng-consumerd.c + +lttng_consumerd_LDADD = \ + $(top_builddir)/libkernelctl/libkernelctl.la \ + $(top_builddir)/liblttng-consumer/liblttng-consumer.la \ + $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la + +if LTTNG_TOOLS_HAVE_UST +lttng_consumerd_LDADD += -lustctl +endif diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/lttng-consumerd/lttng-consumerd.c similarity index 77% rename from ltt-kconsumerd/ltt-kconsumerd.c rename to lttng-consumerd/lttng-consumerd.c index a81be55e7..cac71cace 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/lttng-consumerd/lttng-consumerd.c @@ -37,13 +37,17 @@ #include #include #include +#include -#include +#include #include #include -#include +#include +#include #include +/* TODO : support UST (all direct kernctl accesses). */ + /* the two threads (receive fd and poll) */ static pthread_t threads[2]; @@ -57,9 +61,10 @@ static int opt_daemon; static const char *progname; static char command_sock_path[PATH_MAX]; /* Global command socket path */ static char error_sock_path[PATH_MAX]; /* Global error path */ +static enum lttng_consumer_type opt_type = LTTNG_CONSUMER_KERNEL; /* the liblttngkconsumerd context */ -static struct lttng_kconsumerd_local_data *ctx; +static struct lttng_consumer_local_data *ctx; /* * Signal handler for the daemon @@ -71,7 +76,7 @@ static void sighandler(int sig) return; } - lttng_kconsumerd_should_exit(ctx); + lttng_consumer_should_exit(ctx); } /* @@ -130,6 +135,16 @@ static void usage(void) "Verbose mode. Activate DBG() macro.\n"); fprintf(stderr, " -V, --version " "Show version number.\n"); + fprintf(stderr, " -k, --kernel " + "Consumer kernel buffers (default).\n"); + fprintf(stderr, " -u, --ust " + "Consumer UST buffers.%s\n", +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + "" +#else + " (support not compiled in)" +#endif + ); } /* @@ -147,12 +162,16 @@ static void parse_args(int argc, char **argv) { "quiet", 0, 0, 'q' }, { "verbose", 0, 0, 'v' }, { "version", 0, 0, 'V' }, + { "kernel", 0, 0, 'k' }, +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + { "ust", 0, 0, 'u' }, +#endif { NULL, 0, 0, 0 } }; while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index); + c = getopt_long(argc, argv, "dhqvVku" "c:e:", long_options, &option_index); if (c == -1) { break; } @@ -185,6 +204,14 @@ static void parse_args(int argc, char **argv) case 'V': fprintf(stdout, "%s\n", VERSION); exit(EXIT_SUCCESS); + case 'k': + opt_type = LTTNG_CONSUMER_KERNEL; + break; +#ifdef CONFIG_LTTNG_TOOLS_HAVE_UST + case 'u': + opt_type = LTTNG_CONSUMER_UST; + break; +#endif default: usage(); exit(EXIT_FAILURE); @@ -195,14 +222,14 @@ static void parse_args(int argc, char **argv) /* * Consume data on a file descriptor and write it on a trace file. */ -static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd) +static int read_subbuffer(struct lttng_consumer_stream *stream) { unsigned long len; int err; long ret = 0; - int infd = kconsumerd_fd->consumerd_fd; + int infd = stream->wait_fd; - DBG("In kconsumerd_read_subbuffer (infd : %d)", infd); + DBG("In read_subbuffer (infd : %d)", infd); /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -218,7 +245,7 @@ static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd) goto end; } - switch (kconsumerd_fd->output) { + switch (stream->output) { case LTTNG_EVENT_SPLICE: /* read the whole subbuffer */ err = kernctl_get_padded_subbuf_size(infd, &len); @@ -229,7 +256,7 @@ static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd) } /* splice the subbuffer to the tracefile */ - ret = lttng_kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); if (ret < 0) { /* * display the error but continue processing to try @@ -247,7 +274,7 @@ static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd) goto end; } /* write the subbuffer to the tracefile */ - ret = lttng_kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len); + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); if (ret < 0) { /* * display the error but continue processing to try @@ -277,37 +304,37 @@ end: return ret; } -static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd) +static int on_recv_stream(struct lttng_consumer_stream *stream) { int ret; /* Opening the tracefile in write mode */ - if (kconsumerd_fd->path_name != NULL) { - ret = open(kconsumerd_fd->path_name, + if (stream->path_name != NULL) { + ret = open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); if (ret < 0) { - ERR("Opening %s", kconsumerd_fd->path_name); + ERR("Opening %s", stream->path_name); perror("open"); goto error; } - kconsumerd_fd->out_fd = ret; + stream->out_fd = ret; } - if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) { + if (stream->output == LTTNG_EVENT_MMAP) { /* get the len of the mmap region */ unsigned long mmap_len; - ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &mmap_len); + ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { ret = errno; perror("kernctl_get_mmap_len"); goto error_close_fd; } - kconsumerd_fd->mmap_len = (size_t) mmap_len; + stream->mmap_len = (size_t) mmap_len; - kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len, - PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0); - if (kconsumerd_fd->mmap_base == MAP_FAILED) { + stream->mmap_base = mmap(NULL, stream->mmap_len, + PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); + if (stream->mmap_base == MAP_FAILED) { perror("Error mmaping"); ret = -1; goto error_close_fd; @@ -321,7 +348,7 @@ error_close_fd: { int err; - err = close(kconsumerd_fd->out_fd); + err = close(stream->out_fd); assert(!err); } error: @@ -352,18 +379,22 @@ int main(int argc, char **argv) if (strlen(command_sock_path) == 0) { snprintf(command_sock_path, PATH_MAX, - KCONSUMERD_CMD_SOCK_PATH); + opt_type == LTTNG_CONSUMER_KERNEL ? + KCONSUMERD_CMD_SOCK_PATH : + USTCONSUMERD_CMD_SOCK_PATH); } /* create the consumer instance with and assign the callbacks */ - ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL); + ctx = lttng_consumer_create(opt_type, read_subbuffer, NULL, on_recv_stream, NULL); if (ctx == NULL) { goto error; } - lttng_kconsumerd_set_command_sock_path(ctx, command_sock_path); + lttng_consumer_set_command_sock_path(ctx, command_sock_path); if (strlen(error_sock_path) == 0) { snprintf(error_sock_path, PATH_MAX, - KCONSUMERD_ERR_SOCK_PATH); + opt_type == LTTNG_CONSUMER_KERNEL ? + KCONSUMERD_ERR_SOCK_PATH : + USTCONSUMERD_ERR_SOCK_PATH); } if (set_signal_handler() < 0) { @@ -377,10 +408,10 @@ int main(int argc, char **argv) if (ret < 0) { WARN("Cannot connect to error socket, is ltt-sessiond started ?"); } - lttng_kconsumerd_set_error_sock(ctx, ret); + lttng_consumer_set_error_sock(ctx, ret); /* Create the thread to manage the receive of fd */ - ret = pthread_create(&threads[0], NULL, lttng_kconsumerd_thread_receive_fds, + ret = pthread_create(&threads[0], NULL, lttng_consumer_thread_receive_fds, (void *) ctx); if (ret != 0) { perror("pthread_create"); @@ -388,7 +419,7 @@ int main(int argc, char **argv) } /* Create thread to manage the polling/writing of traces */ - ret = pthread_create(&threads[1], NULL, lttng_kconsumerd_thread_poll_fds, + ret = pthread_create(&threads[1], NULL, lttng_consumer_thread_poll_fds, (void *) ctx); if (ret != 0) { perror("pthread_create"); @@ -403,16 +434,16 @@ int main(int argc, char **argv) } } ret = EXIT_SUCCESS; - lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS); + lttng_consumer_send_error(ctx, CONSUMERD_EXIT_SUCCESS); goto end; error: ret = EXIT_FAILURE; - lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE); + lttng_consumer_send_error(ctx, CONSUMERD_EXIT_FAILURE); end: - lttng_kconsumerd_destroy(ctx); - lttng_kconsumerd_cleanup(); + lttng_consumer_destroy(ctx); + lttng_consumer_cleanup(); return ret; } diff --git a/lttng/commands/version.c b/lttng/commands/version.c index 7b8852308..4d8d75b12 100644 --- a/lttng/commands/version.c +++ b/lttng/commands/version.c @@ -24,6 +24,7 @@ #include #include #include +#include #include "../cmd.h" diff --git a/lttng/lttng.c b/lttng/lttng.c index 4d81d2a1d..b9c6b5a8e 100644 --- a/lttng/lttng.c +++ b/lttng/lttng.c @@ -23,6 +23,7 @@ #include #include #include +#include #include diff --git a/tests/test_kernel_data_trace.c b/tests/test_kernel_data_trace.c index e46d90186..548714e94 100644 --- a/tests/test_kernel_data_trace.c +++ b/tests/test_kernel_data_trace.c @@ -69,7 +69,7 @@ static void create_one_kernel_session(void) printf("Validating kernel session: "); assert(kern->fd == 0); assert(kern->metadata_stream_fd == 0); - assert(kern->kconsumer_fds_sent == 0); + assert(kern->consumer_fds_sent == 0); assert(kern->channel_count == 0); assert(kern->stream_count_global == 0); assert(kern->metadata == NULL); -- 2.34.1