From 5eb91c9837a7b379a74e99358f0f9fb10011ef74 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Fri, 9 Sep 2011 14:43:13 -0400 Subject: [PATCH] Add poll/epoll compat layer for session daemon This is a big modification of the session daemon code base. Every thread is monitoring file descriptors using a poll set. This commit adds a compat wrapper for the use of poll(2) or epoll(7) determined at compile time on if the epoll API is available. Since epoll(7) is Linux specific, the poll(2) fallback is necessary for portability. By default, epoll(7) will be used having the --enable-epoll default to yes. To use the poll(2) API, simply run with --disable-epoll when running configure. With this implementation, some fixes are also introduced. * Two missing notification to the consumer when the default channel is created. * Memory and socket file descriptor leak when connect()/close() is done by the daemon alive check of the lttngctl API. * Root check (UID=0) on cleanup() before removing LTTNG_RUNDIR. * Comments here and there. At this commit, tests were made on the CPU hot plug feature, multiple UST registration at the same time and git tree tests. All passed for poll(2) and epoll(7) implementation. Signed-off-by: David Goulet --- include/lttng-share.h | 27 + include/lttng/lttng-kconsumerd.h | 1 + ltt-sessiond/Makefile.am | 7 +- ltt-sessiond/compat/compat-epoll.c | 218 +++++++ ltt-sessiond/compat/compat-poll.c | 195 ++++++ ltt-sessiond/compat/poll.h | 270 +++++++++ ltt-sessiond/main.c | 938 +++++++++++++++-------------- 7 files changed, 1213 insertions(+), 443 deletions(-) create mode 100644 ltt-sessiond/compat/compat-epoll.c create mode 100644 ltt-sessiond/compat/compat-poll.c create mode 100644 ltt-sessiond/compat/poll.h diff --git a/include/lttng-share.h b/include/lttng-share.h index 38598bcea..4e079d241 100644 --- a/include/lttng-share.h +++ b/include/lttng-share.h @@ -42,4 +42,31 @@ /* See lttng-kernel.h enum lttng_kernel_output for channel output */ #define DEFAULT_KERNEL_CHANNEL_OUTPUT LTTNG_EVENT_SPLICE +/* + * Takes a pointer x and transform it so we can use it to access members + * without a function call. Here an example: + * + * #define GET_SIZE(x) LTTNG_REF(x)->size + * + * struct { int size; } s; + * + * printf("size : %d\n", GET_SIZE(&s)); + * + * For this example we can't use something like this for compatibility purpose + * since this will fail: + * + * #define GET_SIZE(x) x->size; + * + * This is mostly use for the compatibility layer of lttng-tools. See + * poll/epoll for a good real example. Since x can be on the stack or allocated + * memory using malloc(), we must use generic accessors for compat in order to + * *not* use a function to access members. + */ +#define LTTNG_REF(x) ((typeof(*x) *)(x)) + +/* + * Memory allocation zeroed + */ +#define zmalloc(x) calloc(1, x) + #endif /* _LTTNG_SHARE_H */ diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h index 2aa337fa6..98771de3c 100644 --- a/include/lttng/lttng-kconsumerd.h +++ b/include/lttng/lttng-kconsumerd.h @@ -21,6 +21,7 @@ #include #include +#include #include /* diff --git a/ltt-sessiond/Makefile.am b/ltt-sessiond/Makefile.am index e1b42ca09..fad0cd576 100644 --- a/ltt-sessiond/Makefile.am +++ b/ltt-sessiond/Makefile.am @@ -6,12 +6,13 @@ AM_CFLAGS = -fno-strict-aliasing bin_PROGRAMS = ltt-sessiond if COMPAT_EPOLL -COMPAT=compat-epoll.c +COMPAT=compat/compat-epoll.c else -COMPAT=compat-poll.c +COMPAT=compat/compat-poll.c endif ltt_sessiond_SOURCES = utils.c utils.h \ + compat/poll.h $(COMPAT) \ trace-kernel.c trace-kernel.h \ trace-ust.c trace-ust.h \ traceable-app.c traceable-app.h \ @@ -21,8 +22,6 @@ ltt_sessiond_SOURCES = utils.c utils.h \ context.c context.h \ futex.c futex.h \ shm.c shm.h \ - poll.h \ - $(COMPAT) \ session.c session.h \ ltt-sessiond.h main.c diff --git a/ltt-sessiond/compat/compat-epoll.c b/ltt-sessiond/compat/compat-epoll.c new file mode 100644 index 000000000..578dd2e45 --- /dev/null +++ b/ltt-sessiond/compat/compat-epoll.c @@ -0,0 +1,218 @@ +/* + * Copyright (C) 2011 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; only version 2 of the License. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 59 Temple + * Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include "poll.h" + +unsigned int poll_max_size; + +/* + * Create epoll set and allocate returned events structure. + */ +int compat_epoll_create(struct lttng_poll_event *events, int size, int flags) +{ + int ret; + + if (events == NULL || size <= 0) { + goto error; + } + + /* Don't bust the limit here */ + if (size > poll_max_size) { + size = poll_max_size; + } + + ret = epoll_create1(flags); + if (ret < 0) { + /* At this point, every error is fatal */ + perror("epoll_create1"); + goto error; + } + + events->epfd = ret; + + /* This *must* be freed by using lttng_poll_free() */ + events->events = zmalloc(size * sizeof(struct epoll_event)); + if (events->events == NULL) { + perror("malloc epoll set"); + goto error_close; + } + + events->events_size = size; + events->nb_fd = 0; + + return 0; + +error_close: + close(events->epfd); +error: + return -1; +} + +/* + * Add a fd to the epoll set with requesting events. + */ +int compat_epoll_add(struct lttng_poll_event *events, int fd, uint32_t req_events) +{ + int ret, new_size; + struct epoll_event ev, *ptr; + + if (events == NULL || events->events == NULL || fd < 0) { + ERR("Bad compat epoll add arguments"); + goto error; + } + + ev.events = req_events; + ev.data.fd = fd; + + ret = epoll_ctl(events->epfd, EPOLL_CTL_ADD, fd, &ev); + if (ret < 0) { + switch (errno) { + case EEXIST: + case ENOSPC: + case EPERM: + /* Print perror and goto end not failing. Show must go on. */ + perror("epoll_ctl ADD"); + goto end; + default: + perror("epoll_ctl ADD fatal"); + goto error; + } + } + + events->nb_fd++; + + if (events->nb_fd >= events->events_size) { + new_size = 2 * events->events_size; + ptr = realloc(events->events, new_size * sizeof(struct epoll_event)); + if (ptr == NULL) { + perror("realloc epoll add"); + goto error; + } + events->events = ptr; + events->events_size = new_size; + } + +end: + return 0; + +error: + return -1; +} + +/* + * Remove a fd from the epoll set. + */ +int compat_epoll_del(struct lttng_poll_event *events, int fd) +{ + int ret; + + if (events == NULL || fd < 0) { + goto error; + } + + ret = epoll_ctl(events->epfd, EPOLL_CTL_DEL, fd, NULL); + if (ret < 0) { + switch (errno) { + case ENOENT: + case EPERM: + /* Print perror and goto end not failing. Show must go on. */ + perror("epoll_ctl DEL"); + goto end; + default: + perror("epoll_ctl DEL fatal"); + goto error; + } + perror("epoll_ctl del"); + goto error; + } + + events->nb_fd--; + +end: + return 0; + +error: + return -1; +} + +/* + * Wait on epoll set. This is a blocking call of timeout value. + */ +int compat_epoll_wait(struct lttng_poll_event *events, int timeout) +{ + int ret; + + if (events == NULL || events->events == NULL || + events->events_size < events->nb_fd) { + ERR("Wrong arguments in compat_epoll_wait"); + goto error; + } + + ret = epoll_wait(events->epfd, events->events, events->nb_fd, timeout); + if (ret < 0) { + /* At this point, every error is fatal */ + perror("epoll_wait"); + goto error; + } + + return ret; + +error: + return -1; +} + +/* + * Setup poll set maximum size. + */ +void compat_epoll_set_max_size(void) +{ + int ret, fd; + char buf[64]; + + poll_max_size = LTTNG_POLL_DEFAULT_SIZE; + + fd = open(LTTNG_EPOLL_PROC_PATH, O_RDONLY); + if (fd < 0) { + return; + } + + ret = read(fd, buf, sizeof(buf)); + if (ret < 0) { + perror("read set max size"); + goto error; + } + + poll_max_size = atoi(buf); + if (poll_max_size <= 0) { + /* Extra precaution */ + poll_max_size = LTTNG_POLL_DEFAULT_SIZE; + } + + DBG("epoll set max size is %d", poll_max_size); + +error: + close(fd); +} diff --git a/ltt-sessiond/compat/compat-poll.c b/ltt-sessiond/compat/compat-poll.c new file mode 100644 index 000000000..cc4bb0f97 --- /dev/null +++ b/ltt-sessiond/compat/compat-poll.c @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2011 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; only version 2 of the License. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 59 Temple + * Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include +#include +#include + +#include + +#include "poll.h" + +unsigned int poll_max_size; + +/* + * Create pollfd data structure. + */ +int compat_poll_create(struct lttng_poll_event *events, int size) +{ + if (events == NULL || size <= 0) { + ERR("Wrong arguments for poll create"); + goto error; + } + + /* Don't bust the limit here */ + if (size > poll_max_size) { + size = poll_max_size; + } + + /* This *must* be freed by using lttng_poll_free() */ + events->events = zmalloc(size * sizeof(struct pollfd)); + if (events->events == NULL) { + perror("malloc struct pollfd"); + goto error; + } + + events->events_size = size; + events->nb_fd = 0; + + return 0; + +error: + return -1; +} + +/* + * Add fd to pollfd data structure with requested events. + */ +int compat_poll_add(struct lttng_poll_event *events, int fd, + uint32_t req_events) +{ + int new_size; + struct pollfd *ptr; + + if (events == NULL || events->events == NULL || fd < 0) { + ERR("Bad compat poll add arguments"); + goto error; + } + + /* Reallocate pollfd structure by a factor of 2 if needed. */ + if (events->nb_fd >= events->events_size) { + new_size = 2 * events->events_size; + ptr = realloc(events->events, new_size * sizeof(struct pollfd)); + if (ptr == NULL) { + perror("realloc poll add"); + goto error; + } + events->events = ptr; + events->events_size = new_size; + } + + events->events[events->nb_fd].fd = fd; + events->events[events->nb_fd].events = req_events; + events->nb_fd++; + + DBG("fd %d of %d added to pollfd", fd, events->nb_fd); + + return 0; + +error: + return -1; +} + +/* + * Remove a fd from the pollfd structure. + */ +int compat_poll_del(struct lttng_poll_event *events, int fd) +{ + int new_size, i, count = 0; + struct pollfd *old = NULL, *new = NULL; + + if (events == NULL || events->events == NULL || fd < 0) { + ERR("Wrong arguments for poll del"); + goto error; + } + + old = events->events; + new_size = events->events_size - 1; + + /* Safety check on size */ + if (new_size > poll_max_size) { + new_size = poll_max_size; + } + + new = zmalloc(new_size * sizeof(struct pollfd)); + if (new == NULL) { + perror("malloc poll del"); + goto error; + } + + for (i = 0; i < events->events_size; i++) { + /* Don't put back the fd we want to delete */ + if (old[i].fd != fd) { + new[count].fd = old[i].fd; + new[count].events = old[i].events; + count++; + } + } + + events->events_size = new_size; + events->events = new; + events->nb_fd--; + + free(old); + + return 0; + +error: + return -1; +} + +/* + * Wait on poll() with timeout. Blocking call. + */ +int compat_poll_wait(struct lttng_poll_event *events, int timeout) +{ + int ret; + + if (events == NULL || events->events == NULL || + events->events_size < events->nb_fd) { + ERR("poll wait arguments error"); + goto error; + } + + ret = poll(events->events, events->nb_fd, timeout); + if (ret < 0) { + /* At this point, every error is fatal */ + perror("poll wait"); + goto error; + } + + return ret; + +error: + return -1; +} + +/* + * Setup poll set maximum size. + */ +void compat_poll_set_max_size(void) +{ + int ret; + struct rlimit lim; + + /* Default value */ + poll_max_size = LTTNG_POLL_DEFAULT_SIZE; + + ret = getrlimit(RLIMIT_NOFILE, &lim); + if (ret < 0) { + perror("getrlimit poll RLIMIT_NOFILE"); + return; + } + + poll_max_size = lim.rlim_cur; + if (poll_max_size <= 0) { + /* Extra precaution */ + poll_max_size = LTTNG_POLL_DEFAULT_SIZE; + } + + DBG("poll set max size set to %u", poll_max_size); +} diff --git a/ltt-sessiond/compat/poll.h b/ltt-sessiond/compat/poll.h new file mode 100644 index 000000000..3e804305f --- /dev/null +++ b/ltt-sessiond/compat/poll.h @@ -0,0 +1,270 @@ +/* + * Copyright (C) 2011 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; only version 2 of the License. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 59 Temple + * Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTT_POLL_H +#define _LTT_POLL_H + +#include +#include + +#include + +/* + * Value taken from the hard limit allowed by the kernel when using setrlimit + * with RLIMIT_NOFILE on an Intel i7 CPU and Linux 3.0.3. + */ +#define LTTNG_POLL_DEFAULT_SIZE 65535 + +/* + * Maximum number of fd we can monitor. + * + * For epoll(7), /proc/sys/fs/epoll/max_user_watches (since Linux 2.6.28) will + * be used for the maximum size of the poll set. If this interface is not + * available, according to the manpage, the max_user_watches value is 1/25 (4%) + * of the available low memory divided by the registration cost in bytes which + * is 90 bytes on a 32-bit kernel and 160 bytes on a 64-bit kernel. + * + * For poll(2), the max fds must not exceed RLIMIT_NOFILE given by + * getrlimit(2). + */ +extern unsigned int poll_max_size; + +/* + * Used by lttng_poll_clean to free the events structure in a lttng_poll_event. + */ +static inline void __lttng_poll_free(void *events) +{ + free(events); +} + +/* + * epoll(7) implementation. + */ +#ifdef HAVE_EPOLL +#include + +/* See man epoll(7) for this define path */ +#define LTTNG_EPOLL_PROC_PATH "/proc/sys/fs/epoll/max_user_watches" + +enum { + /* Polling variables compatibility for epoll */ + LPOLLIN = EPOLLIN, + LPOLLPRI = EPOLLPRI, + LPOLLOUT = EPOLLOUT, + LPOLLRDNORM = EPOLLRDNORM, + LPOLLRDBAND = EPOLLRDBAND, + LPOLLWRNORM = EPOLLWRNORM, + LPOLLWRBAND = EPOLLWRBAND, + LPOLLMSG = EPOLLMSG, + LPOLLERR = EPOLLERR, + LPOLLHUP = EPOLLHUP, + LPOLLNVAL = EPOLLHUP, + LPOLLRDHUP = EPOLLRDHUP, + /* Close on exec feature of epoll */ + LTTNG_CLOEXEC = EPOLL_CLOEXEC, +}; + +struct compat_epoll_event { + int epfd; + uint32_t nb_fd; /* Current number of fd in events */ + uint32_t events_size; /* Size of events array */ + struct epoll_event *events; +}; +#define lttng_poll_event compat_epoll_event + +/* + * For the following calls, consider 'e' to be a lttng_poll_event pointer and i + * being the index of the events array. + */ +#define LTTNG_POLL_GETFD(e, i) LTTNG_REF(e)->events[i].data.fd +#define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->events[i].events +#define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->nb_fd +#define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->events_size + +/* + * Create the epoll set. No memory allocation is done here. + */ +extern int compat_epoll_create(struct lttng_poll_event *events, + int size, int flags); +#define lttng_poll_create(events, size, flags) \ + compat_epoll_create(events, size, flags); + +/* + * Wait on epoll set with the number of fd registered to the lttng_poll_event + * data structure (events). + */ +extern int compat_epoll_wait(struct lttng_poll_event *events, int timeout); +#define lttng_poll_wait(events, timeout) \ + compat_epoll_wait(events, timeout); + +/* + * Add a fd to the epoll set and resize the epoll_event structure if needed. + */ +extern int compat_epoll_add(struct lttng_poll_event *events, + int fd, uint32_t req_events); +#define lttng_poll_add(events, fd, req_events) \ + compat_epoll_add(events, fd, req_events); + +/* + * Remove a fd from the epoll set. + */ +extern int compat_epoll_del(struct lttng_poll_event *events, int fd); +#define lttng_poll_del(events, fd) \ + compat_epoll_del(events, fd); + +/* + * Set up the poll set limits variable poll_max_size + */ +extern void compat_epoll_set_max_size(void); +#define lttng_poll_set_max_size(void) \ + compat_epoll_set_max_size(void); + +/* + * This function memset with zero the structure since it can be reused at each + * round of a main loop. Being in a loop and using a non static number of fds, + * this function must be called to insure coherent events with associted fds. + */ +static inline void lttng_poll_reset(struct lttng_poll_event *events) +{ + if (events && events->events) { + memset(events->events, 0, + events->nb_fd * sizeof(struct epoll_event)); + } +} + +/* + * Clean the events structure of a lttng_poll_event. It's the caller + * responsability to free the lttng_poll_event memory. + */ +static inline void lttng_poll_clean(struct lttng_poll_event *events) +{ + if (events) { + close(events->epfd); + __lttng_poll_free((void *) events->events); + } +} + +#else /* HAVE_EPOLL */ +/* + * Fallback on poll(2) API + */ + +/* Needed for some poll event values */ +#ifndef __USE_XOPEN +#define __USE_XOPEN +#endif + +/* Needed for some poll event values */ +#ifndef __USE_GNU +#define __USE_GNU +#endif + +#include +#include + +enum { + /* Polling variables compatibility for poll */ + LPOLLIN = POLLIN, + LPOLLPRI = POLLPRI, + LPOLLOUT = POLLOUT, + LPOLLRDNORM = POLLRDNORM, + LPOLLRDBAND = POLLRDBAND, + LPOLLWRNORM = POLLWRNORM, + LPOLLWRBAND = POLLWRBAND, + LPOLLMSG = POLLMSG, + LPOLLERR = POLLERR, + LPOLLHUP = POLLHUP | POLLNVAL, + LPOLLRDHUP = POLLRDHUP, + /* Close on exec feature does not exist for poll(2) */ + LTTNG_CLOEXEC = 0xdead, +}; + +struct compat_poll_event { + uint32_t nb_fd; /* Current number of fd in events */ + uint32_t events_size; /* Size of events array */ + struct pollfd *events; +}; +#define lttng_poll_event compat_poll_event + +/* + * For the following calls, consider 'e' to be a lttng_poll_event pointer and i + * being the index of the events array. + */ +#define LTTNG_POLL_GETFD(e, i) LTTNG_REF(e)->events[i].fd +#define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->events[i].revents +#define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->nb_fd +#define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->events_size + +/* + * Create a pollfd structure of size 'size'. + */ +extern int compat_poll_create(struct lttng_poll_event *events, int size); +#define lttng_poll_create(events, size, flags) \ + compat_poll_create(events, size); + +/* + * Wait on poll(2) event with nb_fd registered to the lttng_poll_event data + * structure. + */ +extern int compat_poll_wait(struct lttng_poll_event *events, int timeout); +#define lttng_poll_wait(events, timeout) \ + compat_poll_wait(events, timeout); + +/* + * Add the fd to the pollfd structure. Resize if needed. + */ +extern int compat_poll_add(struct lttng_poll_event *events, + int fd, uint32_t req_events); +#define lttng_poll_add(events, fd, req_events) \ + compat_poll_add(events, fd, req_events); + +/* + * Remove the fd from the pollfd. Memory allocation is done to recreate a new + * pollfd, data is copied from the old pollfd to the new and, finally, the old + * one is freed(). + */ +extern int compat_poll_del(struct lttng_poll_event *events, int fd); +#define lttng_poll_del(events, fd) \ + compat_poll_del(events, fd); + +/* + * Set up the poll set limits variable poll_max_size + */ +extern void compat_poll_set_max_size(void); +#define lttng_poll_set_max_size(void) \ + compat_poll_set_max_size(void); + +/* + * No need to reset a pollfd structure for poll(2) + */ +static inline void lttng_poll_reset(struct lttng_poll_event *events) +{} + +/* + * Clean the events structure of a lttng_poll_event. It's the caller + * responsability to free the lttng_poll_event memory. + */ +static inline void lttng_poll_clean(struct lttng_poll_event *events) +{ + if (events) { + __lttng_poll_free((void *) events->events); + } +} + +#endif /* HAVE_EPOLL */ + +#endif /* _LTT_POLL_H */ diff --git a/ltt-sessiond/main.c b/ltt-sessiond/main.c index 094fe4adf..d2240d9dd 100644 --- a/ltt-sessiond/main.c +++ b/ltt-sessiond/main.c @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -44,6 +43,7 @@ #include #include +#include "compat/poll.h" #include "context.h" #include "futex.h" #include "kernel-ctl.h" @@ -72,7 +72,6 @@ static int opt_daemon; static int is_root; /* Set to 1 if the daemon is running as root */ static pid_t ppid; /* Parent PID for --sig-parent option */ static pid_t kconsumerd_pid; -static struct pollfd *kernel_pollfd; static int dispatch_thread_exit; static char apps_unix_sock_path[PATH_MAX]; /* Global application Unix socket path */ @@ -110,7 +109,9 @@ static pthread_t kernel_thread; static pthread_t dispatch_thread; static sem_t kconsumerd_sem; -static pthread_mutex_t kconsumerd_pid_mutex; /* Mutex to control kconsumerd pid assignation */ + +/* Mutex to control kconsumerd pid assignation */ +static pthread_mutex_t kconsumerd_pid_mutex; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -133,6 +134,50 @@ static struct ust_cmd_queue ust_cmd_queue; */ static struct ltt_session_list *session_list_ptr; +/* + * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. + */ +static int create_thread_poll_set(struct lttng_poll_event *events, + unsigned int size) +{ + int ret; + + if (events == NULL || size == 0) { + ret = -1; + goto error; + } + + ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); + if (ret < 0) { + goto error; + } + + /* Add quit pipe */ + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + return 0; + +error: + return ret; +} + +/* + * Check if the thread quit pipe was triggered. + * + * Return 1 if it was triggered else 0; + */ +static int check_thread_quit_pipe(int fd, uint32_t events) +{ + if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { + return 1; + } + + return 0; +} + /* * Remove modules in reverse load order. */ @@ -188,7 +233,7 @@ static gid_t allowed_group(void) } /* - * Init quit pipe. + * Init thread quit pipe. * * Return -1 on error or 0 if all pipes are created. */ @@ -234,10 +279,15 @@ static void teardown_kernel_session(struct ltt_session *session) */ static void stop_threads(void) { + int ret; + /* Stopping all threads */ DBG("Terminating all threads"); - close(thread_quit_pipe[0]); - close(thread_quit_pipe[1]); + ret = write(thread_quit_pipe[1], "!", 1); + if (ret < 0) { + ERR("write error on thread quit pipe"); + } + /* Dispatch thread */ dispatch_thread_exit = 1; futex_nto1_wake(&ust_cmd_queue.futex); @@ -260,16 +310,18 @@ static void cleanup(void) 27, 1, 31, 27, 0, 27, 1, 33, 27, 0); /* */ - DBG("Removing %s directory", LTTNG_RUNDIR); - ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR); - if (ret < 0) { - ERR("asprintf failed. Something is really wrong!"); - } + if (is_root) { + DBG("Removing %s directory", LTTNG_RUNDIR); + ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR); + if (ret < 0) { + ERR("asprintf failed. Something is really wrong!"); + } - /* Remove lttng run directory */ - ret = system(cmd); - if (ret < 0) { - ERR("Unable to clean " LTTNG_RUNDIR); + /* Remove lttng run directory */ + ret = system(cmd); + if (ret < 0) { + ERR("Unable to clean " LTTNG_RUNDIR); + } } DBG("Cleaning up all session"); @@ -297,6 +349,9 @@ static void cleanup(void) DBG("Unloading kernel modules"); modprobe_remove_kernel_modules(); } + + close(thread_quit_pipe[0]); + close(thread_quit_pipe[1]); } /* @@ -505,23 +560,17 @@ error: } /* - * Update the kernel pollfd set of all channel fd available over all tracing + * Update the kernel poll set of all channel fd available over all tracing * session. Add the wakeup pipe at the end of the set. */ -static int update_kernel_pollfd(void) +static int update_kernel_poll(struct lttng_poll_event *events) { - int i = 0; - /* - * The wakup pipe and the quit pipe are needed so the number of fds starts - * at 2 for those pipes. - */ - unsigned int nb_fd = 2; + int ret; struct ltt_session *session; struct ltt_kernel_channel *channel; - DBG("Updating kernel_pollfd"); + DBG("Updating kernel poll set"); - /* Get the number of channel of all kernel session */ lock_session_list(); cds_list_for_each_entry(session, &session_list_ptr->head, list) { lock_session(session); @@ -529,48 +578,21 @@ static int update_kernel_pollfd(void) unlock_session(session); continue; } - nb_fd += session->kernel_session->channel_count; - unlock_session(session); - } - - DBG("Resizing kernel_pollfd to size %d", nb_fd); - kernel_pollfd = realloc(kernel_pollfd, nb_fd * sizeof(struct pollfd)); - if (kernel_pollfd == NULL) { - perror("malloc kernel_pollfd"); - goto error; - } - - cds_list_for_each_entry(session, &session_list_ptr->head, list) { - lock_session(session); - if (session->kernel_session == NULL) { - unlock_session(session); - continue; - } - if (i >= nb_fd) { - ERR("To much channel for kernel_pollfd size"); - unlock_session(session); - break; - } cds_list_for_each_entry(channel, &session->kernel_session->channel_list.head, list) { - kernel_pollfd[i].fd = channel->fd; - kernel_pollfd[i].events = POLLIN | POLLRDNORM; - i++; + /* Add channel fd to the kernel poll set */ + ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM); + if (ret < 0) { + unlock_session(session); + goto error; + } + DBG("Channel fd %d added to kernel set", channel->fd); } unlock_session(session); } unlock_session_list(); - /* Adding wake up pipe */ - kernel_pollfd[nb_fd - 2].fd = kernel_poll_pipe[0]; - kernel_pollfd[nb_fd - 2].events = POLLIN; - - /* Adding the quit pipe */ - kernel_pollfd[nb_fd - 1].fd = thread_quit_pipe[0]; - kernel_pollfd[nb_fd - 1].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; - - return nb_fd; + return 0; error: unlock_session_list(); @@ -604,7 +626,8 @@ static int update_kernel_stream(int fd) session->kernel_session->consumer_fd = kconsumerd_cmd_sock; } - cds_list_for_each_entry(channel, &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, + &session->kernel_session->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -613,12 +636,13 @@ static int update_kernel_stream(int fd) } /* - * Have we already sent fds to the consumer? If yes, it means that - * tracing is started so it is safe to send our updated stream fds. + * Have we already sent fds to the consumer? If yes, it means + * that tracing is started so it is safe to send our updated + * stream fds. */ if (session->kernel_session->kconsumer_fds_sent == 1) { - ret = send_kconsumerd_channel_fds(session->kernel_session->consumer_fd, - channel); + ret = send_kconsumerd_channel_fds( + session->kernel_session->consumer_fd, channel); if (ret < 0) { goto error; } @@ -645,27 +669,42 @@ error: */ static void *thread_manage_kernel(void *data) { - int ret, i, nb_fd = 0; + int ret, i, pollfd, update_poll_flag = 1; + uint32_t revents, nb_fd; char tmp; - int update_poll_flag = 1; + struct lttng_poll_event events; DBG("Thread manage kernel started"); + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } + + ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + while (1) { if (update_poll_flag == 1) { - nb_fd = update_kernel_pollfd(); - if (nb_fd < 0) { + ret = update_kernel_poll(&events); + if (ret < 0) { goto error; } update_poll_flag = 0; } - DBG("Polling on %d fds", nb_fd); + nb_fd = LTTNG_POLL_GETNB(&events); + + DBG("Thread kernel polling on %d fds", nb_fd); + + /* Zeroed the poll events */ + lttng_poll_reset(&events); /* Poll infinite value of time */ - ret = poll(kernel_pollfd, nb_fd, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll kernel thread"); goto error; } else if (ret == 0) { /* Should not happen since timeout is infinite */ @@ -674,52 +713,49 @@ static void *thread_manage_kernel(void *data) continue; } - /* Thread quit pipe has been closed. Killing thread. */ - if (kernel_pollfd[nb_fd - 1].revents == POLLNVAL) { - goto error; - } - - DBG("Kernel poll event triggered"); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - /* - * Check if the wake up pipe was triggered. If so, the kernel_pollfd - * must be updated. - */ - switch (kernel_pollfd[nb_fd - 2].revents) { - case POLLIN: - ret = read(kernel_poll_pipe[0], &tmp, 1); - update_poll_flag = 1; - continue; - case POLLERR: - goto error; - default: - break; - } + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } - for (i = 0; i < nb_fd; i++) { - switch (kernel_pollfd[i].revents) { - /* - * New CPU detected by the kernel. Adding kernel stream to kernel - * session and updating the kernel consumer - */ - case POLLIN | POLLRDNORM: - ret = update_kernel_stream(kernel_pollfd[i].fd); - if (ret < 0) { - continue; + /* Check for data on kernel pipe */ + if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) { + ret = read(kernel_poll_pipe[0], &tmp, 1); + update_poll_flag = 1; + continue; + } else { + /* + * New CPU detected by the kernel. Adding kernel stream to + * kernel session and updating the kernel consumer + */ + if (revents & LPOLLIN) { + ret = update_kernel_stream(pollfd); + if (ret < 0) { + continue; + } + break; + /* + * TODO: We might want to handle the LPOLLERR | LPOLLHUP + * and unregister kernel stream at this point. + */ } - break; } } } error: DBG("Kernel thread dying"); - if (kernel_pollfd) { - free(kernel_pollfd); - } - close(kernel_poll_pipe[0]); close(kernel_poll_pipe[1]); + + lttng_poll_clean(&events); + return NULL; } @@ -728,9 +764,10 @@ error: */ static void *thread_manage_kconsumerd(void *data) { - int sock = 0, ret; + int sock = 0, i, ret, pollfd; + uint32_t revents, nb_fd; enum lttcomm_return_code code; - struct pollfd pollfd[2]; + struct lttng_poll_event events; DBG("[thread] Manage kconsumerd started"); @@ -739,26 +776,46 @@ static void *thread_manage_kconsumerd(void *data) goto error; } - /* First fd is always the quit pipe */ - pollfd[0].fd = thread_quit_pipe[0]; + /* + * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. + * Nothing more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - /* Apps socket */ - pollfd[1].fd = kconsumerd_err_sock; - pollfd[1].events = POLLIN; + ret = lttng_poll_add(&events, kconsumerd_err_sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + + nb_fd = LTTNG_POLL_GETNB(&events); /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll kconsumerd thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { - goto error; - } else if (pollfd[1].revents == POLLERR) { - ERR("Kconsumerd err socket poll error"); - goto error; + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Event on the registration socket */ + if (pollfd == kconsumerd_err_sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Kconsumerd err socket poll error"); + goto error; + } + } } sock = lttcomm_accept_unix_sock(kconsumerd_err_sock); @@ -788,25 +845,46 @@ static void *thread_manage_kconsumerd(void *data) goto error; } - /* Kconsumerd err socket */ - pollfd[1].fd = sock; - pollfd[1].events = POLLIN; - - /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + /* Remove the kconsumerd error socket since we have established a connexion */ + ret = lttng_poll_del(&events, kconsumerd_err_sock); if (ret < 0) { - perror("poll kconsumerd thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { goto error; - } else if (pollfd[1].revents == POLLERR) { - ERR("Kconsumerd err socket second poll error"); + } + + /* Update number of fd */ + nb_fd = LTTNG_POLL_GETNB(&events); + + /* Inifinite blocking call, waiting for transmission */ + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { goto error; } + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Event on the kconsumerd socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Kconsumerd err socket second poll error"); + goto error; + } + } + } + /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { @@ -818,97 +896,17 @@ static void *thread_manage_kconsumerd(void *data) error: DBG("Kconsumerd thread dying"); - if (kconsumerd_err_sock) { - close(kconsumerd_err_sock); - } - if (kconsumerd_cmd_sock) { - close(kconsumerd_cmd_sock); - } - if (sock) { - close(sock); - } + close(kconsumerd_err_sock); + close(kconsumerd_cmd_sock); + close(sock); unlink(kconsumerd_err_unix_sock_path); unlink(kconsumerd_cmd_unix_sock_path); - kconsumerd_pid = 0; - return NULL; -} -/* - * Reallocate the apps command pollfd structure of nb_fd size. - * - * The first two fds must be there at all time. - */ -static int update_apps_cmd_pollfd(unsigned int nb_fd, unsigned int old_nb_fd, - struct pollfd **pollfd) -{ - int i, count; - struct pollfd *old_pollfd = NULL; + lttng_poll_clean(&events); - /* Can't accept pollfd less than 2 */ - if (nb_fd < 2) { - goto end; - } - - if (*pollfd) { - /* Save pointer */ - old_pollfd = *pollfd; - } - - *pollfd = malloc(nb_fd * sizeof(struct pollfd)); - if (*pollfd == NULL) { - perror("malloc manage apps pollfd"); - goto error; - } - - /* First fd is always the quit pipe */ - (*pollfd)[0].fd = thread_quit_pipe[0]; - (*pollfd)[0].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; - - /* Apps command pipe */ - (*pollfd)[1].fd = apps_cmd_pipe[0]; - (*pollfd)[1].events = POLLIN; - - /* Start count after the two pipes below */ - count = 2; - for (i = 2; i < old_nb_fd; i++) { - /* Add to new pollfd */ - if (old_pollfd[i].fd != -1) { - (*pollfd)[count].fd = old_pollfd[i].fd; - (*pollfd)[count].events = POLLHUP | POLLNVAL | POLLERR; - count++; - } - - if (count > nb_fd) { - ERR("Updating poll fd wrong size"); - goto error; - } - } - - if (nb_fd < 2) { - /* - * There should *always* be at least two fds in the pollfd. This safety - * check make sure the poll() will actually try on those two pipes at - * best which are the thread_quit_pipe and apps_cmd_pipe. - */ - nb_fd = 2; - MSG("nb_fd < 2 --> Not good! Continuing..."); - } - - /* Destroy old pollfd */ - free(old_pollfd); - - DBG("Apps cmd pollfd realloc of size %d", nb_fd); - -end: - return 0; - -error: - /* Destroy old pollfd */ - free(old_pollfd); - return -1; + return NULL; } /* @@ -916,105 +914,107 @@ error: */ static void *thread_manage_apps(void *data) { - int i, ret, current_nb_fd; - unsigned int nb_fd = 2; - int update_poll_flag = 1; - struct pollfd *pollfd = NULL; + int i, ret, pollfd; + uint32_t revents, nb_fd; struct ust_command ust_cmd; + struct lttng_poll_event events; DBG("[thread] Manage application started"); - ust_cmd.sock = -1; - current_nb_fd = nb_fd; + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - while (1) { - /* See if we have a valid socket to add to pollfd */ - if (ust_cmd.sock != -1) { - nb_fd++; - update_poll_flag = 1; - } + ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } - /* The pollfd struct must be updated */ - if (update_poll_flag) { - ret = update_apps_cmd_pollfd(nb_fd, current_nb_fd, &pollfd); - if (ret < 0) { - /* malloc failed so we quit */ - goto error; - } + while (1) { + /* Zeroed the events structure */ + lttng_poll_reset(&events); - if (ust_cmd.sock != -1) { - /* Update pollfd with the new UST socket */ - DBG("Adding sock %d to apps cmd pollfd", ust_cmd.sock); - pollfd[nb_fd - 1].fd = ust_cmd.sock; - pollfd[nb_fd - 1].events = POLLHUP | POLLNVAL | POLLERR; - ust_cmd.sock = -1; - } - } + nb_fd = LTTNG_POLL_GETNB(&events); DBG("Apps thread polling on %d fds", nb_fd); /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, nb_fd, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll apps thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { - goto error; - } else { - /* apps_cmd_pipe pipe events */ - switch (pollfd[1].revents) { - case POLLERR: - ERR("Apps command pipe poll error"); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { goto error; - case POLLIN: - /* Empty pipe */ - ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); - if (ret < 0 || ret < sizeof(ust_cmd)) { - perror("read apps cmd pipe"); - goto error; - } + } - /* Register applicaton to the session daemon */ - ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock); - if (ret < 0) { - /* Only critical ENOMEM error can be returned here */ + /* Inspect the apps cmd pipe */ + if (pollfd == apps_cmd_pipe[0]) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps command pipe error"); goto error; - } + } else if (revents & LPOLLIN) { + /* Empty pipe */ + ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); + if (ret < 0 || ret < sizeof(ust_cmd)) { + perror("read apps cmd pipe"); + goto error; + } - ret = ustctl_register_done(ust_cmd.sock); - if (ret < 0) { - /* - * If the registration is not possible, we simply unregister - * the apps and continue - */ - unregister_traceable_app(ust_cmd.sock); + /* Register applicaton to the session daemon */ + ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock); + if (ret < 0) { + /* Only critical ENOMEM error can be returned here */ + goto error; + } + + ret = ustctl_register_done(ust_cmd.sock); + if (ret < 0) { + /* + * If the registration is not possible, we simply + * unregister the apps and continue + */ + unregister_traceable_app(ust_cmd.sock); + } else { + /* + * We just need here to monitor the close of the UST + * socket and poll set monitor those by default. + */ + ret = lttng_poll_add(&events, ust_cmd.sock, 0); + if (ret < 0) { + goto error; + } + + DBG("Apps with sock %d added to poll set", ust_cmd.sock); + } + break; } - break; - } - } + } else { + /* + * At this point, we know that a registered application made the + * event at poll_wait. + */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } - current_nb_fd = nb_fd; - for (i = 2; i < current_nb_fd; i++) { - /* Apps socket is closed/hungup */ - switch (pollfd[i].revents) { - case POLLERR: - case POLLHUP: - case POLLNVAL: - /* Pipe closed */ - unregister_traceable_app(pollfd[i].fd); - /* Indicate to remove this fd from the pollfd */ - pollfd[i].fd = -1; - nb_fd--; - break; + /* Socket closed */ + unregister_traceable_app(pollfd); + break; + } } } - - if (nb_fd != current_nb_fd) { - update_poll_flag = 1; - } } error: @@ -1022,7 +1022,7 @@ error: close(apps_cmd_pipe[0]); close(apps_cmd_pipe[1]); - free(pollfd); + lttng_poll_clean(&events); return NULL; } @@ -1096,8 +1096,9 @@ error: */ static void *thread_registration_apps(void *data) { - int sock = 0, ret; - struct pollfd pollfd[2]; + int sock = 0, i, ret, pollfd; + uint32_t revents, nb_fd; + struct lttng_poll_event events; /* * Get allocated in this thread, enqueued to a global queue, dequeued and * freed in the manage apps thread. @@ -1111,14 +1112,20 @@ static void *thread_registration_apps(void *data) goto error; } - /* First fd is always the quit pipe */ - pollfd[0].fd = thread_quit_pipe[0]; - pollfd[0].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; + /* + * Pass 2 as size here for the thread quit pipe and apps socket. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - /* Apps socket */ - pollfd[1].fd = apps_sock; - pollfd[1].events = POLLIN; + /* Add the application registration socket */ + ret = lttng_poll_add(&events, apps_sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } /* Notify all applications to register */ ret = notify_ust_apps(1); @@ -1131,75 +1138,81 @@ static void *thread_registration_apps(void *data) while (1) { DBG("Accepting application registration"); + nb_fd = LTTNG_POLL_GETNB(&events); + /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll register apps thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { - goto error; - } + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - switch (pollfd[1].revents) { - case POLLNVAL: - case POLLHUP: - case POLLRDHUP: - case POLLERR: - ERR("Register apps socket poll error"); - goto error; - case POLLIN: - sock = lttcomm_accept_unix_sock(apps_sock); - if (sock < 0) { + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { goto error; } - /* Create UST registration command for enqueuing */ - ust_cmd = malloc(sizeof(struct ust_command)); - if (ust_cmd == NULL) { - perror("ust command malloc"); - goto error; - } + /* Event on the registration socket */ + if (pollfd == apps_sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Register apps socket poll error"); + goto error; + } else if (revents & LPOLLIN) { + sock = lttcomm_accept_unix_sock(apps_sock); + if (sock < 0) { + goto error; + } - /* - * Using message-based transmissions to ensure we don't have to deal - * with partially received messages. - */ - ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, - sizeof(struct ust_register_msg)); - if (ret < 0 || ret < sizeof(struct ust_register_msg)) { - if (ret < 0) { - perror("lttcomm_recv_unix_sock register apps"); - } else { - ERR("Wrong size received on apps register"); - } - free(ust_cmd); - close(sock); - continue; - } + /* Create UST registration command for enqueuing */ + ust_cmd = malloc(sizeof(struct ust_command)); + if (ust_cmd == NULL) { + perror("ust command malloc"); + goto error; + } - ust_cmd->sock = sock; + /* + * Using message-based transmissions to ensure we don't + * have to deal with partially received messages. + */ + ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, + sizeof(struct ust_register_msg)); + if (ret < 0 || ret < sizeof(struct ust_register_msg)) { + if (ret < 0) { + perror("lttcomm_recv_unix_sock register apps"); + } else { + ERR("Wrong size received on apps register"); + } + free(ust_cmd); + close(sock); + continue; + } - DBG("UST registration received with pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - /* - * Lock free enqueue the registration request. - * The red pill has been taken! This apps will be part of the *system* - */ - cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node); + ust_cmd->sock = sock; - /* - * Wake the registration queue futex. - * Implicit memory barrier with the exchange in cds_wfq_enqueue. - */ - futex_nto1_wake(&ust_cmd_queue.futex); - break; + DBG("UST registration received with pid:%d ppid:%d uid:%d" + " gid:%d sock:%d name:%s (version %d.%d)", + ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, + ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, + ust_cmd->sock, ust_cmd->reg_msg.name, + ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); + /* + * Lock free enqueue the registration request. The red pill + * has been taken! This apps will be part of the *system* :) + */ + cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node); + + /* + * Wake the registration queue futex. Implicit memory + * barrier with the exchange in cds_wfq_enqueue. + */ + futex_nto1_wake(&ust_cmd_queue.futex); + } + } } } @@ -1211,9 +1224,10 @@ error: close(apps_sock); close(sock); - unlink(apps_unix_sock_path); + lttng_poll_clean(&events); + return NULL; } @@ -1526,9 +1540,9 @@ error: } /* - * Notify kernel thread to update it's pollfd. + * Notify kernel thread to update it's poll set. */ -static int notify_kernel_pollfd(void) +static int notify_kernel_channels_update(void) { int ret; @@ -1999,11 +2013,13 @@ static int process_client_msg(struct command_ctx *cmd_ctx) switch (cmd_ctx->lsm->domain.type) { case LTTNG_DOMAIN_KERNEL: - kchan = trace_kernel_get_channel_by_name(cmd_ctx->lsm->u.enable.channel_name, + kchan = trace_kernel_get_channel_by_name( + cmd_ctx->lsm->u.enable.channel_name, cmd_ctx->session->kernel_session); if (kchan == NULL) { /* Channel not found, creating it */ - DBG("Creating kernel channel"); + DBG("Creating kernel channel %s", + cmd_ctx->lsm->u.enable.channel_name); ret = kernel_create_channel(cmd_ctx->session->kernel_session, &cmd_ctx->lsm->u.channel.chan, @@ -2014,7 +2030,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) } /* Notify kernel thread that there is a new channel */ - ret = notify_kernel_pollfd(); + ret = notify_kernel_channels_update(); if (ret < 0) { ret = LTTCOMM_FATAL; goto error; @@ -2084,6 +2100,12 @@ static int process_client_msg(struct command_ctx *cmd_ctx) ret = LTTCOMM_FATAL; goto error; } + + ret = notify_kernel_channels_update(); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } } kevent = trace_kernel_get_event_by_name(cmd_ctx->lsm->u.enable.event.name, kchan); @@ -2162,6 +2184,12 @@ static int process_client_msg(struct command_ctx *cmd_ctx) ret = LTTCOMM_FATAL; goto error; } + + ret = notify_kernel_channels_update(); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } } /* For each event in the kernel session */ @@ -2398,10 +2426,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx) } /* - * Must notify the kernel thread here to update it's pollfd in order to - * remove the channel(s)' fd just destroyed. + * Must notify the kernel thread here to update it's poll setin order + * to remove the channel(s)' fd just destroyed. */ - ret = notify_kernel_pollfd(); + ret = notify_kernel_channels_update(); if (ret < 0) { ret = LTTCOMM_FATAL; goto error; @@ -2623,9 +2651,10 @@ setup_error: */ static void *thread_manage_clients(void *data) { - int sock = 0, ret; + int sock = 0, ret, i, pollfd; + uint32_t revents, nb_fd; struct command_ctx *cmd_ctx = NULL; - struct pollfd pollfd[2]; + struct lttng_poll_event events; DBG("[thread] Manage client started"); @@ -2634,14 +2663,20 @@ static void *thread_manage_clients(void *data) goto error; } - /* First fd is always the quit pipe */ - pollfd[0].fd = thread_quit_pipe[0]; - pollfd[0].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; + /* + * Pass 2 as size here for the thread quit pipe and client_sock. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - /* Apps socket */ - pollfd[1].fd = client_sock; - pollfd[1].events = POLLIN; + /* Add the application registration socket */ + ret = lttng_poll_add(&events, client_sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } /* * Notify parent pid that we are ready to accept command for client side. @@ -2653,92 +2688,114 @@ static void *thread_manage_clients(void *data) while (1) { DBG("Accepting client command ..."); + nb_fd = LTTNG_POLL_GETNB(&events); + /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll client thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Event on the registration socket */ + if (pollfd == client_sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Client socket poll error"); + goto error; + } + } + } + + DBG("Wait for client response"); + + sock = lttcomm_accept_unix_sock(client_sock); + if (sock < 0) { goto error; } - switch (pollfd[1].revents) { - case POLLNVAL: - case POLLHUP: - case POLLERR: - ERR("Client socket poll error"); + /* Allocate context command to process the client request */ + cmd_ctx = malloc(sizeof(struct command_ctx)); + if (cmd_ctx == NULL) { + perror("malloc cmd_ctx"); goto error; - case POLLIN: - sock = lttcomm_accept_unix_sock(client_sock); - if (sock < 0) { - goto error; - } + } - /* Allocate context command to process the client request */ - cmd_ctx = malloc(sizeof(struct command_ctx)); + /* Allocate data buffer for reception */ + cmd_ctx->lsm = malloc(sizeof(struct lttcomm_session_msg)); + if (cmd_ctx->lsm == NULL) { + perror("malloc cmd_ctx->lsm"); + goto error; + } - /* Allocate data buffer for reception */ - cmd_ctx->lsm = malloc(sizeof(struct lttcomm_session_msg)); - cmd_ctx->llm = NULL; - cmd_ctx->session = NULL; + cmd_ctx->llm = NULL; + cmd_ctx->session = NULL; - /* - * Data is received from the lttng client. The struct - * lttcomm_session_msg (lsm) contains the command and data request of - * the client. - */ - DBG("Receiving data from client ..."); - ret = lttcomm_recv_unix_sock(sock, cmd_ctx->lsm, sizeof(struct lttcomm_session_msg)); - if (ret <= 0) { - continue; - } + /* + * Data is received from the lttng client. The struct + * lttcomm_session_msg (lsm) contains the command and data request of + * the client. + */ + DBG("Receiving data from client ..."); + ret = lttcomm_recv_unix_sock(sock, cmd_ctx->lsm, + sizeof(struct lttcomm_session_msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + close(sock); + free(cmd_ctx); + continue; + } - // TODO: Validate cmd_ctx including sanity check for security purpose. + // TODO: Validate cmd_ctx including sanity check for + // security purpose. + /* + * This function dispatch the work to the kernel or userspace tracer + * libs and fill the lttcomm_lttng_msg data structure of all the needed + * informations for the client. The command context struct contains + * everything this function may needs. + */ + ret = process_client_msg(cmd_ctx); + if (ret < 0) { /* - * This function dispatch the work to the kernel or userspace tracer - * libs and fill the lttcomm_lttng_msg data structure of all the needed - * informations for the client. The command context struct contains - * everything this function may needs. + * TODO: Inform client somehow of the fatal error. At + * this point, ret < 0 means that a malloc failed + * (ENOMEM). Error detected but still accept command. */ - ret = process_client_msg(cmd_ctx); - if (ret < 0) { - /* TODO: Inform client somehow of the fatal error. At this point, - * ret < 0 means that a malloc failed (ENOMEM). */ - /* Error detected but still accept command */ - clean_command_ctx(&cmd_ctx); - continue; - } - - DBG("Sending response (size: %d, retcode: %d)", - cmd_ctx->lttng_msg_size, cmd_ctx->llm->ret_code); - ret = send_unix_sock(sock, cmd_ctx->llm, cmd_ctx->lttng_msg_size); - if (ret < 0) { - ERR("Failed to send data back to client"); - } - clean_command_ctx(&cmd_ctx); + continue; + } - /* End of transmission */ - close(sock); - break; + DBG("Sending response (size: %d, retcode: %d)", + cmd_ctx->lttng_msg_size, cmd_ctx->llm->ret_code); + ret = send_unix_sock(sock, cmd_ctx->llm, + cmd_ctx->lttng_msg_size); + if (ret < 0) { + ERR("Failed to send data back to client"); } - } -error: - DBG("Client thread dying"); - if (client_sock) { - close(client_sock); - } - if (sock) { + clean_command_ctx(&cmd_ctx); + + /* End of transmission */ close(sock); } +error: + DBG("Client thread dying"); unlink(client_unix_sock_path); + close(client_sock); + close(sock); + lttng_poll_clean(&events); clean_command_ctx(&cmd_ctx); return NULL; } @@ -3283,6 +3340,9 @@ int main(int argc, char **argv) */ session_list_ptr = get_session_list(); + /* Set up max poll set size */ + lttng_poll_set_max_size(); + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL); -- 2.34.1