From: Mathieu Desnoyers Date: Tue, 9 Aug 2011 00:46:35 +0000 (-0400) Subject: Remove "lib" prefix from .c/.h file names X-Git-Tag: v2.0-pre10~1 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=50ecdf72034d220d3b0300d0caa13e6946be555b Remove "lib" prefix from .c/.h file names This is uncommon. "lib" prefix is only needed for the library name. Let's keep the lib prefix for source tree directory names for clarity though. Signed-off-by: Mathieu Desnoyers --- diff --git a/libkernelctl/Makefile.am b/libkernelctl/Makefile.am index dac301d39..97a2c9711 100644 --- a/libkernelctl/Makefile.am +++ b/libkernelctl/Makefile.am @@ -2,4 +2,4 @@ AM_CPPFLAGS = -I$(top_srcdir)/include noinst_LTLIBRARIES = libkernelctl.la -libkernelctl_la_SOURCES = libkernelctl.c libkernelctl.h kernel-ioctl.h +libkernelctl_la_SOURCES = kernelctl.c kernelctl.h kernel-ioctl.h diff --git a/libkernelctl/kernelctl.c b/libkernelctl/kernelctl.c new file mode 100644 index 000000000..afb0e012b --- /dev/null +++ b/libkernelctl/kernelctl.c @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include + +#include "kernel-ioctl.h" +#include "kernelctl.h" + +int kernctl_create_session(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_SESSION); +} + +/* open the metadata global channel */ +int kernctl_open_metadata(int fd, struct lttng_channel_attr *chops) +{ + return ioctl(fd, LTTNG_KERNEL_METADATA, chops); +} + +int kernctl_create_channel(int fd, struct lttng_channel_attr *chops) +{ + return ioctl(fd, LTTNG_KERNEL_CHANNEL, chops); +} + +int kernctl_create_stream(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_STREAM); +} + +int kernctl_create_event(int fd, struct lttng_kernel_event *ev) +{ + return ioctl(fd, LTTNG_KERNEL_EVENT, ev); +} + +int kernctl_add_context(int fd, struct lttng_kernel_context *ctx) +{ + return ioctl(fd, LTTNG_KERNEL_CONTEXT, ctx); +} + + +/* Enable event, channel and session ioctl */ +int kernctl_enable(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_ENABLE); +} + +/* Disable event, channel and session ioctl */ +int kernctl_disable(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_DISABLE); +} + +int kernctl_start_session(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_SESSION_START); +} + +int kernctl_stop_session(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_SESSION_STOP); +} + + +int kernctl_tracepoint_list(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_TRACEPOINT_LIST); +} + +int kernctl_tracer_version(int fd, struct lttng_kernel_tracer_version *v) +{ + return ioctl(fd, LTTNG_KERNEL_TRACER_VERSION, v); +} + +int kernctl_wait_quiescent(int fd) +{ + return ioctl(fd, LTTNG_KERNEL_WAIT_QUIESCENT); +} + +int kernctl_calibrate(int fd, struct lttng_kernel_calibrate *calibrate) +{ + return ioctl(fd, LTTNG_KERNEL_CALIBRATE, calibrate); +} + + +int kernctl_buffer_flush(int fd) +{ + return ioctl(fd, RING_BUFFER_FLUSH); +} + + +/* Buffer operations */ + +/* For mmap mode, readable without "get" operation */ + +/* returns the length to mmap. */ +int kernctl_get_mmap_len(int fd, unsigned long *len) +{ + return ioctl(fd, RING_BUFFER_GET_MMAP_LEN, len); +} + +/* returns the maximum size for sub-buffers. */ +int kernctl_get_max_subbuf_size(int fd, unsigned long *len) +{ + return ioctl(fd, RING_BUFFER_GET_MAX_SUBBUF_SIZE, len); +} + +/* + * For mmap mode, operate on the current packet (between get/put or + * get_next/put_next). + */ + +/* returns the offset of the subbuffer belonging to the mmap reader. */ +int kernctl_get_mmap_read_offset(int fd, unsigned long *off) +{ + return ioctl(fd, RING_BUFFER_GET_MMAP_READ_OFFSET, off); +} + +/* returns the size of the current sub-buffer, without padding (for mmap). */ +int kernctl_get_subbuf_size(int fd, unsigned long *len) +{ + return ioctl(fd, RING_BUFFER_GET_SUBBUF_SIZE, len); +} + +/* returns the size of the current sub-buffer, without padding (for mmap). */ +int kernctl_get_padded_subbuf_size(int fd, unsigned long *len) +{ + return ioctl(fd, RING_BUFFER_GET_PADDED_SUBBUF_SIZE, len); +} + +/* Get exclusive read access to the next sub-buffer that can be read. */ +int kernctl_get_next_subbuf(int fd) +{ + return ioctl(fd, RING_BUFFER_GET_NEXT_SUBBUF); +} + + +/* Release exclusive sub-buffer access, move consumer forward. */ +int kernctl_put_next_subbuf(int fd) +{ + return ioctl(fd, RING_BUFFER_PUT_NEXT_SUBBUF); +} + +/* snapshot */ + +/* Get a snapshot of the current ring buffer producer and consumer positions */ +int kernctl_snapshot(int fd) +{ + return ioctl(fd, RING_BUFFER_SNAPSHOT); +} + +/* Get the consumer position (iteration start) */ +int kernctl_snapshot_get_consumed(int fd, unsigned long *pos) +{ + return ioctl(fd, RING_BUFFER_SNAPSHOT_GET_CONSUMED, pos); +} + +/* Get the producer position (iteration end) */ +int kernctl_snapshot_get_produced(int fd, unsigned long *pos) +{ + return ioctl(fd, RING_BUFFER_SNAPSHOT_GET_PRODUCED, pos); +} + +/* Get exclusive read access to the specified sub-buffer position */ +int kernctl_get_subbuf(int fd, unsigned long *len) +{ + return ioctl(fd, RING_BUFFER_GET_SUBBUF, len); +} + +/* Release exclusive sub-buffer access */ +int kernctl_put_subbuf(int fd) +{ + return ioctl(fd, RING_BUFFER_PUT_SUBBUF); +} diff --git a/libkernelctl/kernelctl.h b/libkernelctl/kernelctl.h new file mode 100644 index 000000000..783f1258d --- /dev/null +++ b/libkernelctl/kernelctl.h @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTT_LIBKERNELCTL_H +#define _LTT_LIBKERNELCTL_H + +#include + +#include "lttng-kernel.h" + +int kernctl_create_session(int fd); +int kernctl_open_metadata(int fd, struct lttng_channel_attr *chops); +int kernctl_create_channel(int fd, struct lttng_channel_attr *chops); +int kernctl_create_stream(int fd); +int kernctl_create_event(int fd, struct lttng_kernel_event *ev); +int kernctl_add_context(int fd, struct lttng_kernel_context *ctx); + +int kernctl_enable(int fd); +int kernctl_disable(int fd); +int kernctl_start_session(int fd); +int kernctl_stop_session(int fd); + +int kernctl_tracepoint_list(int fd); +int kernctl_tracer_version(int fd, struct lttng_kernel_tracer_version *v); +int kernctl_wait_quiescent(int fd); +int kernctl_calibrate(int fd, struct lttng_kernel_calibrate *calibrate); + + +/* Buffer operations */ + +/* For mmap mode, readable without "get" operation */ +int kernctl_get_mmap_len(int fd, unsigned long *len); +int kernctl_get_max_subbuf_size(int fd, unsigned long *len); + +/* + * For mmap mode, operate on the current packet (between get/put or + * get_next/put_next). + */ +int kernctl_get_mmap_read_offset(int fd, unsigned long *len); +int kernctl_get_subbuf_size(int fd, unsigned long *len); +int kernctl_get_padded_subbuf_size(int fd, unsigned long *len); + +int kernctl_get_next_subbuf(int fd); +int kernctl_put_next_subbuf(int fd); + +/* snapshot */ +int kernctl_snapshot(int fd); +int kernctl_snapshot_get_consumed(int fd, unsigned long *pos); +int kernctl_snapshot_get_produced(int fd, unsigned long *pos); +int kernctl_get_subbuf(int fd, unsigned long *pos); +int kernctl_put_subbuf(int fd); + +int kernctl_buffer_flush(int fd); + +#endif /* _LTT_LIBKERNELCTL_H */ diff --git a/libkernelctl/libkernelctl.c b/libkernelctl/libkernelctl.c deleted file mode 100644 index 47d749778..000000000 --- a/libkernelctl/libkernelctl.c +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#include - -#include "kernel-ioctl.h" -#include "libkernelctl.h" - -int kernctl_create_session(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_SESSION); -} - -/* open the metadata global channel */ -int kernctl_open_metadata(int fd, struct lttng_channel_attr *chops) -{ - return ioctl(fd, LTTNG_KERNEL_METADATA, chops); -} - -int kernctl_create_channel(int fd, struct lttng_channel_attr *chops) -{ - return ioctl(fd, LTTNG_KERNEL_CHANNEL, chops); -} - -int kernctl_create_stream(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_STREAM); -} - -int kernctl_create_event(int fd, struct lttng_kernel_event *ev) -{ - return ioctl(fd, LTTNG_KERNEL_EVENT, ev); -} - -int kernctl_add_context(int fd, struct lttng_kernel_context *ctx) -{ - return ioctl(fd, LTTNG_KERNEL_CONTEXT, ctx); -} - - -/* Enable event, channel and session ioctl */ -int kernctl_enable(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_ENABLE); -} - -/* Disable event, channel and session ioctl */ -int kernctl_disable(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_DISABLE); -} - -int kernctl_start_session(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_SESSION_START); -} - -int kernctl_stop_session(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_SESSION_STOP); -} - - -int kernctl_tracepoint_list(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_TRACEPOINT_LIST); -} - -int kernctl_tracer_version(int fd, struct lttng_kernel_tracer_version *v) -{ - return ioctl(fd, LTTNG_KERNEL_TRACER_VERSION, v); -} - -int kernctl_wait_quiescent(int fd) -{ - return ioctl(fd, LTTNG_KERNEL_WAIT_QUIESCENT); -} - -int kernctl_calibrate(int fd, struct lttng_kernel_calibrate *calibrate) -{ - return ioctl(fd, LTTNG_KERNEL_CALIBRATE, calibrate); -} - - -int kernctl_buffer_flush(int fd) -{ - return ioctl(fd, RING_BUFFER_FLUSH); -} - - -/* Buffer operations */ - -/* For mmap mode, readable without "get" operation */ - -/* returns the length to mmap. */ -int kernctl_get_mmap_len(int fd, unsigned long *len) -{ - return ioctl(fd, RING_BUFFER_GET_MMAP_LEN, len); -} - -/* returns the maximum size for sub-buffers. */ -int kernctl_get_max_subbuf_size(int fd, unsigned long *len) -{ - return ioctl(fd, RING_BUFFER_GET_MAX_SUBBUF_SIZE, len); -} - -/* - * For mmap mode, operate on the current packet (between get/put or - * get_next/put_next). - */ - -/* returns the offset of the subbuffer belonging to the mmap reader. */ -int kernctl_get_mmap_read_offset(int fd, unsigned long *off) -{ - return ioctl(fd, RING_BUFFER_GET_MMAP_READ_OFFSET, off); -} - -/* returns the size of the current sub-buffer, without padding (for mmap). */ -int kernctl_get_subbuf_size(int fd, unsigned long *len) -{ - return ioctl(fd, RING_BUFFER_GET_SUBBUF_SIZE, len); -} - -/* returns the size of the current sub-buffer, without padding (for mmap). */ -int kernctl_get_padded_subbuf_size(int fd, unsigned long *len) -{ - return ioctl(fd, RING_BUFFER_GET_PADDED_SUBBUF_SIZE, len); -} - -/* Get exclusive read access to the next sub-buffer that can be read. */ -int kernctl_get_next_subbuf(int fd) -{ - return ioctl(fd, RING_BUFFER_GET_NEXT_SUBBUF); -} - - -/* Release exclusive sub-buffer access, move consumer forward. */ -int kernctl_put_next_subbuf(int fd) -{ - return ioctl(fd, RING_BUFFER_PUT_NEXT_SUBBUF); -} - -/* snapshot */ - -/* Get a snapshot of the current ring buffer producer and consumer positions */ -int kernctl_snapshot(int fd) -{ - return ioctl(fd, RING_BUFFER_SNAPSHOT); -} - -/* Get the consumer position (iteration start) */ -int kernctl_snapshot_get_consumed(int fd, unsigned long *pos) -{ - return ioctl(fd, RING_BUFFER_SNAPSHOT_GET_CONSUMED, pos); -} - -/* Get the producer position (iteration end) */ -int kernctl_snapshot_get_produced(int fd, unsigned long *pos) -{ - return ioctl(fd, RING_BUFFER_SNAPSHOT_GET_PRODUCED, pos); -} - -/* Get exclusive read access to the specified sub-buffer position */ -int kernctl_get_subbuf(int fd, unsigned long *len) -{ - return ioctl(fd, RING_BUFFER_GET_SUBBUF, len); -} - -/* Release exclusive sub-buffer access */ -int kernctl_put_subbuf(int fd) -{ - return ioctl(fd, RING_BUFFER_PUT_SUBBUF); -} diff --git a/libkernelctl/libkernelctl.h b/libkernelctl/libkernelctl.h deleted file mode 100644 index 783f1258d..000000000 --- a/libkernelctl/libkernelctl.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#ifndef _LTT_LIBKERNELCTL_H -#define _LTT_LIBKERNELCTL_H - -#include - -#include "lttng-kernel.h" - -int kernctl_create_session(int fd); -int kernctl_open_metadata(int fd, struct lttng_channel_attr *chops); -int kernctl_create_channel(int fd, struct lttng_channel_attr *chops); -int kernctl_create_stream(int fd); -int kernctl_create_event(int fd, struct lttng_kernel_event *ev); -int kernctl_add_context(int fd, struct lttng_kernel_context *ctx); - -int kernctl_enable(int fd); -int kernctl_disable(int fd); -int kernctl_start_session(int fd); -int kernctl_stop_session(int fd); - -int kernctl_tracepoint_list(int fd); -int kernctl_tracer_version(int fd, struct lttng_kernel_tracer_version *v); -int kernctl_wait_quiescent(int fd); -int kernctl_calibrate(int fd, struct lttng_kernel_calibrate *calibrate); - - -/* Buffer operations */ - -/* For mmap mode, readable without "get" operation */ -int kernctl_get_mmap_len(int fd, unsigned long *len); -int kernctl_get_max_subbuf_size(int fd, unsigned long *len); - -/* - * For mmap mode, operate on the current packet (between get/put or - * get_next/put_next). - */ -int kernctl_get_mmap_read_offset(int fd, unsigned long *len); -int kernctl_get_subbuf_size(int fd, unsigned long *len); -int kernctl_get_padded_subbuf_size(int fd, unsigned long *len); - -int kernctl_get_next_subbuf(int fd); -int kernctl_put_next_subbuf(int fd); - -/* snapshot */ -int kernctl_snapshot(int fd); -int kernctl_snapshot_get_consumed(int fd, unsigned long *pos); -int kernctl_snapshot_get_produced(int fd, unsigned long *pos); -int kernctl_get_subbuf(int fd, unsigned long *pos); -int kernctl_put_subbuf(int fd); - -int kernctl_buffer_flush(int fd); - -#endif /* _LTT_LIBKERNELCTL_H */ diff --git a/liblttkconsumerd/Makefile.am b/liblttkconsumerd/Makefile.am index 4f08b4c67..a685221c8 100644 --- a/liblttkconsumerd/Makefile.am +++ b/liblttkconsumerd/Makefile.am @@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/liblttkconsumerd -I$(top_s lib_LTLIBRARIES = liblttkconsumerd.la -liblttkconsumerd_la_SOURCES = liblttkconsumerd.c liblttkconsumerd.h +liblttkconsumerd_la_SOURCES = lttkconsumerd.c lttkconsumerd.h liblttkconsumerd_la_LIBADD = \ $(top_builddir)/libkernelctl/libkernelctl.la \ diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c deleted file mode 100644 index 9ad380c8d..000000000 --- a/liblttkconsumerd/liblttkconsumerd.c +++ /dev/null @@ -1,1051 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "libkernelctl.h" -#include "liblttkconsumerd.h" -#include "lttngerr.h" - -static -struct kconsumerd_global_data { - /* - * kconsumerd_data.lock protects kconsumerd_data.fd_list, - * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It - * ensures the count matches the number of items in the fd_list. - * It ensures the list updates *always* trigger an fd_array - * update (therefore need to make list update vs - * kconsumerd_data.need_update flag update atomic, and also flag - * read, fd array and flag clear atomic). - */ - pthread_mutex_t lock; - /* - * Number of element for the list below. Protected by - * kconsumerd_data.lock. - */ - unsigned int fds_count; - /* - * List of FDs. Protected by kconsumerd_data.lock. - */ - struct kconsumerd_fd_list fd_list; - /* - * Flag specifying if the local array of FDs needs update in the - * poll function. Protected by kconsumerd_data.lock. - */ - unsigned int need_update; -} kconsumerd_data = { - .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), -}; - -/* communication with splice */ -static int kconsumerd_thread_pipe[2]; - -/* pipe to wake the poll thread when necessary */ -static int kconsumerd_poll_pipe[2]; - -/* to let the signal handler wake up the fd receiver thread */ -static int kconsumerd_should_quit[2]; - -/* timeout parameter, to control the polling thread grace period */ -static int kconsumerd_poll_timeout = -1; - -/* socket to communicate errors with sessiond */ -static int kconsumerd_error_socket; - -/* socket to exchange commands with sessiond */ -static char *kconsumerd_command_sock_path; - -/* - * flag to inform the polling thread to quit when all fd hung up. - * Updated by the kconsumerd_thread_receive_fds when it notices that all - * fds has hung up. Also updated by the signal handler - * (kconsumerd_should_exit()). Read by the polling threads. - */ -static volatile int kconsumerd_quit = 0; - -/* - * kconsumerd_set_error_socket - * - * Set the error socket - */ -void kconsumerd_set_error_socket(int sock) -{ - kconsumerd_error_socket = sock; -} - -/* - * kconsumerd_set_command_socket_path - * - * Set the command socket path - */ -void kconsumerd_set_command_socket_path(char *sock) -{ - kconsumerd_command_sock_path = sock; -} - -/* - * kconsumerd_find_session_fd - * - * Find a session fd in the global list. - * The kconsumerd_data.lock must be locked during this call - * - * Return 1 if found else 0 - */ -static int kconsumerd_find_session_fd(int fd) -{ - struct kconsumerd_fd *iter; - - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->sessiond_fd == fd) { - DBG("Duplicate session fd %d", fd); - return 1; - } - } - - return 0; -} - -/* - * kconsumerd_del_fd - * - * Remove a fd from the global list protected by a mutex - */ -static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) -{ - pthread_mutex_lock(&kconsumerd_data.lock); - cds_list_del(&lcf->list); - if (kconsumerd_data.fds_count > 0) { - kconsumerd_data.fds_count--; - if (lcf != NULL) { - close(lcf->out_fd); - close(lcf->consumerd_fd); - free(lcf); - lcf = NULL; - } - } - kconsumerd_data.need_update = 1; - pthread_mutex_unlock(&kconsumerd_data.lock); -} - -/* - * kconsumerd_add_fd - * - * Add a fd to the global list protected by a mutex - */ -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) -{ - int ret; - struct kconsumerd_fd *tmp_fd; - - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(buf->fd); - if (ret == 1) { - goto end; - } - - tmp_fd = malloc(sizeof(struct kconsumerd_fd)); - tmp_fd->sessiond_fd = buf->fd; - tmp_fd->consumerd_fd = consumerd_fd; - tmp_fd->state = buf->state; - tmp_fd->max_sb_size = buf->max_sb_size; - strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); - tmp_fd->path_name[PATH_MAX - 1] = '\0'; - - /* Opening the tracefile in write mode */ - ret = open(tmp_fd->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", tmp_fd->path_name); - perror("open"); - goto end; - } - tmp_fd->out_fd = ret; - tmp_fd->out_fd_offset = 0; - - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); - kconsumerd_data.fds_count++; - kconsumerd_data.need_update = 1; -end: - pthread_mutex_unlock(&kconsumerd_data.lock); - return ret; -} - -/* - * kconsumerd_change_fd_state - * - * Update a fd according to what we just received - */ -static void kconsumerd_change_fd_state(int sessiond_fd, - enum kconsumerd_fd_state state) -{ - struct kconsumerd_fd *iter; - - pthread_mutex_lock(&kconsumerd_data.lock); - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->sessiond_fd == sessiond_fd) { - iter->state = state; - break; - } - } - kconsumerd_data.need_update = 1; - pthread_mutex_unlock(&kconsumerd_data.lock); -} - -/* - * kconsumerd_update_poll_array - * - * Allocate the pollfd structure and the local view of the out fds - * to avoid doing a lookup in the linked list and concurrency issues - * when writing is needed. - * Returns the number of fds in the structures - * Called with kconsumerd_data.lock held. - */ -static int kconsumerd_update_poll_array(struct pollfd **pollfd, - struct kconsumerd_fd **local_kconsumerd_fd) -{ - struct kconsumerd_fd *iter; - int i = 0; - - DBG("Updating poll fd array"); - - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - DBG("Inside for each"); - if (iter->state == ACTIVE_FD) { - DBG("Active FD %d", iter->consumerd_fd); - (*pollfd)[i].fd = iter->consumerd_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_kconsumerd_fd[i] = iter; - i++; - } - } - - /* - * insert the kconsumerd_poll_pipe at the end of the array and don't - * increment i so nb_fd is the number of real FD - */ - (*pollfd)[i].fd = kconsumerd_poll_pipe[0]; - (*pollfd)[i].events = POLLIN; - return i; -} - - -/* - * kconsumerd_on_read_subbuffer_mmap - * - * mmap the ring buffer, read it and write the data to the tracefile. - * Returns the number of bytes written - */ -static int kconsumerd_on_read_subbuffer_mmap( - struct kconsumerd_fd *kconsumerd_fd, unsigned long len) -{ - unsigned long mmap_len, mmap_offset, padded_len, padding_len; - char *mmap_base; - char *padding = NULL; - long ret = 0; - off_t orig_offset = kconsumerd_fd->out_fd_offset; - int fd = kconsumerd_fd->consumerd_fd; - int outfd = kconsumerd_fd->out_fd; - - /* get the padded subbuffer size to know the padding required */ - ret = kernctl_get_padded_subbuf_size(fd, &padded_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_padded_subbuf_size"); - goto end; - } - padding_len = padded_len - len; - padding = malloc(padding_len * sizeof(char)); - memset(padding, '\0', padding_len); - - /* get the len of the mmap region */ - ret = kernctl_get_mmap_len(fd, &mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto end; - } - - /* get the offset inside the fd to mmap */ - ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_read_offset"); - goto end; - } - - mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset); - if (mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto end; - } - - while (len > 0) { - ret = write(outfd, mmap_base, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - ret = errno; - perror("Error in file write"); - goto end; - } - /* This won't block, but will start writeout asynchronously */ - sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - kconsumerd_fd->out_fd_offset += ret; - } - - /* once all the data is written, write the padding to disk */ - ret = write(outfd, padding, padding_len); - if (ret < 0) { - ret = errno; - perror("Error writing padding to file"); - goto end; - } - - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); - - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } - goto end; - -end: - if (padding != NULL) { - free(padding); - } - return ret; -} - -/* - * kconsumerd_on_read_subbuffer - * - * Splice the data from the ring buffer to the tracefile. - * Returns the number of bytes spliced - */ -static int kconsumerd_on_read_subbuffer( - struct kconsumerd_fd *kconsumerd_fd, unsigned long len) -{ - long ret = 0; - loff_t offset = 0; - off_t orig_offset = kconsumerd_fd->out_fd_offset; - int fd = kconsumerd_fd->consumerd_fd; - int outfd = kconsumerd_fd->out_fd; - - while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %ld", ret); - if (ret < 0) { - ret = errno; - perror("Error in relay splice"); - goto splice_error; - } - - ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %ld", ret); - if (ret < 0) { - ret = errno; - perror("Error in file splice"); - goto splice_error; - } - if (ret >= len) { - len = 0; - } - /* This won't block, but will start writeout asynchronously */ - sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - kconsumerd_fd->out_fd_offset += ret; - } - - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } - goto end; - -splice_error: - /* send the appropriate error description to sessiond */ - switch(ret) { - case EBADF: - kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF); - break; - case EINVAL: - kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL); - break; - case ENOMEM: - kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM); - break; - case ESPIPE: - kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE); - break; - } - -end: - return ret; -} - -/* - * kconsumerd_read_subbuffer - * - * Consume data on a file descriptor and write it on a trace file - */ -static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) -{ - unsigned long len; - int err; - long ret = 0; - int infd = kconsumerd_fd->consumerd_fd; - - DBG("In kconsumerd_read_subbuffer (infd : %d)", infd); - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - ret = errno; - perror("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); - goto end; - } - - switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { - case LTTNG_EVENT_SPLICE: - /* read the whole subbuffer */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } - - /* splice the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len); - if (ret < 0) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile"); - } - break; - case LTTNG_EVENT_MMAP: - /* read the used subbuffer size */ - err = kernctl_get_subbuf_size(infd, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } - /* write the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len); - if (ret < 0) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error writing to tracefile"); - } - break; - default: - ERR("Unknown output method"); - ret = -1; - } - - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - ret = errno; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto end; - } - -end: - return ret; -} - -/* - * kconsumerd_poll_socket - * - * Poll on the should_quit pipe and the command socket - * return -1 on error and should exit, 0 if data is - * available on the command socket - */ -int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) -{ - int num_rdy; - - num_rdy = poll(kconsumerd_sockpoll, 2, -1); - if (num_rdy == -1) { - perror("Poll error"); - goto exit; - } - if (kconsumerd_sockpoll[0].revents == POLLIN) { - DBG("kconsumerd_should_quit wake up"); - goto exit; - } - return 0; - -exit: - return -1; -} - -/* - * kconsumerd_consumerd_recv_fd - * - * Receives an array of file descriptors and the associated - * structures describing each fd (path name). - * Returns the size of received data - */ -static int kconsumerd_consumerd_recv_fd(int sfd, - struct pollfd *kconsumerd_sockpoll, int size, - enum kconsumerd_command cmd_type) -{ - struct iovec iov[1]; - int ret = 0, i, tmp2; - struct cmsghdr *cmsg; - int nb_fd; - char recv_fd[CMSG_SPACE(sizeof(int))]; - struct lttcomm_kconsumerd_msg lkm; - - /* the number of fds we are about to receive */ - nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); - - /* - * nb_fd is the number of fds we receive. One fd per recvmsg. - */ - for (i = 0; i < nb_fd; i++) { - struct msghdr msg = { 0 }; - - /* Prepare to receive the structures */ - iov[0].iov_base = &lkm; - iov[0].iov_len = sizeof(lkm); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - msg.msg_control = recv_fd; - msg.msg_controllen = sizeof(recv_fd); - - DBG("Waiting to receive fd"); - if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - - if ((ret = recvmsg(sfd, &msg, 0)) < 0) { - perror("recvmsg"); - continue; - } - - if (ret != (size / nb_fd)) { - ERR("Received only %d, expected %d", ret, size); - kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); - goto end; - } - - cmsg = CMSG_FIRSTHDR(&msg); - if (!cmsg) { - ERR("Invalid control message header"); - ret = -1; - kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); - goto end; - } - - /* if we received fds */ - if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { - switch (cmd_type) { - case ADD_STREAM: - DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); - ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); - if (ret < 0) { - kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR); - goto end; - } - break; - case UPDATE_STREAM: - kconsumerd_change_fd_state(lkm.fd, lkm.state); - break; - default: - break; - } - /* signal the poll thread */ - tmp2 = write(kconsumerd_poll_pipe[1], "4", 1); - if (tmp2 < 0) { - perror("write kconsumerd poll"); - } - } else { - ERR("Didn't received any fd"); - kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); - ret = -1; - goto end; - } - } - -end: - return ret; -} - -/* - * kconsumerd_thread_poll_fds - * - * This thread polls the fds in the ltt_fd_list to consume the data - * and write it to tracefile if necessary. - */ -void *kconsumerd_thread_poll_fds(void *data) -{ - int num_rdy, num_hup, high_prio, ret, i; - struct pollfd *pollfd = NULL; - /* local view of the fds */ - struct kconsumerd_fd **local_kconsumerd_fd = NULL; - /* local view of kconsumerd_data.fds_count */ - int nb_fd = 0; - char tmp; - int tmp2; - - ret = pipe(kconsumerd_thread_pipe); - if (ret < 0) { - perror("Error creating pipe"); - goto end; - } - - local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd)); - - while (1) { - high_prio = 0; - num_hup = 0; - - /* - * the ltt_fd_list has been updated, we need to update our - * local array as well - */ - pthread_mutex_lock(&kconsumerd_data.lock); - if (kconsumerd_data.need_update) { - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_kconsumerd_fd != NULL) { - free(local_kconsumerd_fd); - local_kconsumerd_fd = NULL; - } - - /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); - if (pollfd == NULL) { - perror("pollfd malloc"); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - - /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * - sizeof(struct kconsumerd_fd)); - if (local_kconsumerd_fd == NULL) { - perror("local_kconsumerd_fd malloc"); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd); - if (ret < 0) { - ERR("Error in allocating pollfd or local_outfds"); - kconsumerd_send_error(KCONSUMERD_POLL_ERROR); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - nb_fd = ret; - kconsumerd_data.need_update = 0; - } - pthread_mutex_unlock(&kconsumerd_data.lock); - - /* poll on the array of fds */ - DBG("polling on %d fd", nb_fd + 1); - num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout); - DBG("poll num_rdy : %d", num_rdy); - if (num_rdy == -1) { - perror("Poll error"); - kconsumerd_send_error(KCONSUMERD_POLL_ERROR); - goto end; - } else if (num_rdy == 0) { - DBG("Polling thread timed out"); - goto end; - } - - /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */ - if (nb_fd == 0 && kconsumerd_quit == 1) { - goto end; - } - - /* - * If the kconsumerd_poll_pipe triggered poll go - * directly to the beginning of the loop to update the - * array. We want to prioritize array update over - * low-priority reads. - */ - if (pollfd[nb_fd].revents == POLLIN) { - DBG("kconsumerd_poll_pipe wake up"); - tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1); - if (tmp2 < 0) { - perror("read kconsumerd poll"); - } - continue; - } - - /* Take care of high priority channels first. */ - for (i = 0; i < nb_fd; i++) { - switch(pollfd[i].revents) { - case POLLERR: - ERR("Error returned in polling fd %d.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLHUP: - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLNVAL: - ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLPRI: - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; - } - break; - } - } - - /* If every buffer FD has hung up, we end the read loop here */ - if (nb_fd > 0 && num_hup == nb_fd) { - DBG("every buffer FD has hung up\n"); - if (kconsumerd_quit == 1) { - goto end; - } - continue; - } - - /* Take care of low priority channels. */ - if (high_prio == 0) { - for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents == POLLIN) { - DBG("Normal read on fd %d", pollfd[i].fd); - ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]); - /* it's ok to have an unavailable subbuffer */ - if (ret == EAGAIN) { - ret = 0; - } - } - } - } - } -end: - DBG("polling thread exiting"); - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_kconsumerd_fd != NULL) { - free(local_kconsumerd_fd); - local_kconsumerd_fd = NULL; - } - return NULL; -} - -/* - * kconsumerd_init(void) - * - * initialise the necessary environnement : - * - inform the polling thread to update the polling array - * - create the poll_pipe - * - create the should_quit pipe (for signal handler) - */ -int kconsumerd_init(void) -{ - int ret; - - /* need to update the polling array at init time */ - kconsumerd_data.need_update = 1; - - ret = pipe(kconsumerd_poll_pipe); - if (ret < 0) { - perror("Error creating poll pipe"); - goto end; - } - - ret = pipe(kconsumerd_should_quit); - if (ret < 0) { - perror("Error creating recv pipe"); - goto end; - } - -end: - return ret; -} - -/* - * kconsumerd_thread_receive_fds - * - * This thread listens on the consumerd socket and - * receives the file descriptors from ltt-sessiond - */ -void *kconsumerd_thread_receive_fds(void *data) -{ - int sock, client_socket, ret; - struct lttcomm_kconsumerd_header tmp; - /* - * structure to poll for incoming data on communication socket - * avoids making blocking sockets - */ - struct pollfd kconsumerd_sockpoll[2]; - - - DBG("Creating command socket %s", kconsumerd_command_sock_path); - unlink(kconsumerd_command_sock_path); - client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path); - if (client_socket < 0) { - ERR("Cannot create command socket"); - goto end; - } - - ret = lttcomm_listen_unix_sock(client_socket); - if (ret < 0) { - goto end; - } - - DBG("Sending ready command to ltt-sessiond"); - ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY); - if (ret < 0) { - ERR("Error sending ready command to ltt-sessiond"); - goto end; - } - - ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); - if (ret < 0) { - perror("fcntl O_NONBLOCK"); - goto end; - } - - /* prepare the FDs to poll : to client socket and the should_quit pipe */ - kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0]; - kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; - kconsumerd_sockpoll[1].fd = client_socket; - kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; - - if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - DBG("Connection on client_socket"); - - /* Blocking call, waiting for transmission */ - sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { - WARN("On accept"); - goto end; - } - ret = fcntl(sock, F_SETFL, O_NONBLOCK); - if (ret < 0) { - perror("fcntl O_NONBLOCK"); - goto end; - } - - /* update the polling structure to poll on the established socket */ - kconsumerd_sockpoll[1].fd = sock; - kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; - - while (1) { - if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - DBG("Incoming fds on sock"); - - /* We first get the number of fd we are about to receive */ - ret = lttcomm_recv_unix_sock(sock, &tmp, - sizeof(struct lttcomm_kconsumerd_header)); - if (ret <= 0) { - ERR("Communication interrupted on command socket"); - goto end; - } - if (tmp.cmd_type == STOP) { - DBG("Received STOP command"); - goto end; - } - if (kconsumerd_quit) { - DBG("kconsumerd_thread_receive_fds received quit from signal"); - goto end; - } - - /* we received a command to add or update fds */ - ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll, - tmp.payload_size, tmp.cmd_type); - if (ret <= 0) { - ERR("Receiving the FD, exiting"); - goto end; - } - DBG("received fds on sock"); - } - -end: - DBG("kconsumerd_thread_receive_fds exiting"); - - /* - * when all fds have hung up, the polling thread - * can exit cleanly - */ - kconsumerd_quit = 1; - - /* - * 2s of grace period, if no polling events occur during - * this period, the polling thread will exit even if there - * are still open FDs (should not happen, but safety mechanism). - */ - kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD; - - /* wake up the polling thread */ - ret = write(kconsumerd_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("poll pipe write"); - } - return NULL; -} - -/* - * kconsumerd_cleanup - * - * Cleanup the daemon's socket on exit - */ -void kconsumerd_cleanup(void) -{ - struct kconsumerd_fd *iter, *tmp; - - /* remove the socket file */ - unlink(kconsumerd_command_sock_path); - - /* - * close all outfd. Called when there are no more threads - * running (after joining on the threads), no need to protect - * list iteration with mutex. - */ - cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) { - kconsumerd_del_fd(iter); - } -} - -/* - * kconsumerd_should_exit - * - * Called from signal handler. - */ -void kconsumerd_should_exit(void) -{ - int ret; - kconsumerd_quit = 1; - ret = write(kconsumerd_should_quit[1], "4", 1); - if (ret < 0) { - perror("write kconsumerd quit"); - } -} - -/* - * kconsumerd_send_error - * - * send return code to ltt-sessiond - */ -int kconsumerd_send_error(enum lttcomm_return_code cmd) -{ - if (kconsumerd_error_socket > 0) { - return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd, - sizeof(enum lttcomm_sessiond_command)); - } - - return 0; -} diff --git a/liblttkconsumerd/liblttkconsumerd.h b/liblttkconsumerd/liblttkconsumerd.h deleted file mode 100644 index d085a7950..000000000 --- a/liblttkconsumerd/liblttkconsumerd.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#ifndef _LIBLTTKCONSUMERD_H -#define _LIBLTTKCONSUMERD_H - -#include -#include "lttng-kconsumerd.h" - -/* - * When the receiving thread dies, we need to have a way to make - * the polling thread exit eventually. - * If all FDs hang up (normal case when the ltt-sessiond stops), - * we can exit cleanly, but if there is a problem and for whatever - * reason some FDs remain open, the consumer should still exit eventually. - * - * If the timeout is reached, it means that during this period - * no events occurred on the FDs so we need to force an exit. - * This case should not happen but it is a safety to ensure we won't block - * the consumer indefinitely. - * - * The value of 2 seconds is an arbitrary choice. - */ -#define KCONSUMERD_POLL_GRACE_PERIOD 2000 - -struct kconsumerd_fd_list { - struct cds_list_head head; -}; - -/* - * Internal representation of the FDs, - * sessiond_fd is used to identify uniquely a fd - */ -struct kconsumerd_fd { - struct cds_list_head list; - int sessiond_fd; /* used to identify uniquely a fd with sessiond */ - int consumerd_fd; /* fd to consume */ - int out_fd; /* output file to write the data */ - off_t out_fd_offset; /* write position in the output file descriptor */ - char path_name[PATH_MAX]; /* tracefile name */ - enum kconsumerd_fd_state state; - unsigned long max_sb_size; /* the subbuffer size for this channel */ -}; - -int kconsumerd_init(void); -int kconsumerd_send_error(enum lttcomm_return_code cmd); -int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll); -void *kconsumerd_thread_poll_fds(void *data); -void *kconsumerd_thread_receive_fds(void *data); -void kconsumerd_should_exit(void); -void kconsumerd_cleanup(void); -void kconsumerd_set_error_socket(int sock); -void kconsumerd_set_command_socket_path(char *sock); - -#endif /* _LIBLTTKCONSUMERD_H */ diff --git a/liblttkconsumerd/lttkconsumerd.c b/liblttkconsumerd/lttkconsumerd.c new file mode 100644 index 000000000..5c22d5ec5 --- /dev/null +++ b/liblttkconsumerd/lttkconsumerd.c @@ -0,0 +1,1051 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kernelctl.h" +#include "lttkconsumerd.h" +#include "lttngerr.h" + +static +struct kconsumerd_global_data { + /* + * kconsumerd_data.lock protects kconsumerd_data.fd_list, + * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It + * ensures the count matches the number of items in the fd_list. + * It ensures the list updates *always* trigger an fd_array + * update (therefore need to make list update vs + * kconsumerd_data.need_update flag update atomic, and also flag + * read, fd array and flag clear atomic). + */ + pthread_mutex_t lock; + /* + * Number of element for the list below. Protected by + * kconsumerd_data.lock. + */ + unsigned int fds_count; + /* + * List of FDs. Protected by kconsumerd_data.lock. + */ + struct kconsumerd_fd_list fd_list; + /* + * Flag specifying if the local array of FDs needs update in the + * poll function. Protected by kconsumerd_data.lock. + */ + unsigned int need_update; +} kconsumerd_data = { + .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), +}; + +/* communication with splice */ +static int kconsumerd_thread_pipe[2]; + +/* pipe to wake the poll thread when necessary */ +static int kconsumerd_poll_pipe[2]; + +/* to let the signal handler wake up the fd receiver thread */ +static int kconsumerd_should_quit[2]; + +/* timeout parameter, to control the polling thread grace period */ +static int kconsumerd_poll_timeout = -1; + +/* socket to communicate errors with sessiond */ +static int kconsumerd_error_socket; + +/* socket to exchange commands with sessiond */ +static char *kconsumerd_command_sock_path; + +/* + * flag to inform the polling thread to quit when all fd hung up. + * Updated by the kconsumerd_thread_receive_fds when it notices that all + * fds has hung up. Also updated by the signal handler + * (kconsumerd_should_exit()). Read by the polling threads. + */ +static volatile int kconsumerd_quit = 0; + +/* + * kconsumerd_set_error_socket + * + * Set the error socket + */ +void kconsumerd_set_error_socket(int sock) +{ + kconsumerd_error_socket = sock; +} + +/* + * kconsumerd_set_command_socket_path + * + * Set the command socket path + */ +void kconsumerd_set_command_socket_path(char *sock) +{ + kconsumerd_command_sock_path = sock; +} + +/* + * kconsumerd_find_session_fd + * + * Find a session fd in the global list. + * The kconsumerd_data.lock must be locked during this call + * + * Return 1 if found else 0 + */ +static int kconsumerd_find_session_fd(int fd) +{ + struct kconsumerd_fd *iter; + + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + if (iter->sessiond_fd == fd) { + DBG("Duplicate session fd %d", fd); + return 1; + } + } + + return 0; +} + +/* + * kconsumerd_del_fd + * + * Remove a fd from the global list protected by a mutex + */ +static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) +{ + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_del(&lcf->list); + if (kconsumerd_data.fds_count > 0) { + kconsumerd_data.fds_count--; + if (lcf != NULL) { + close(lcf->out_fd); + close(lcf->consumerd_fd); + free(lcf); + lcf = NULL; + } + } + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); +} + +/* + * kconsumerd_add_fd + * + * Add a fd to the global list protected by a mutex + */ +static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) +{ + int ret; + struct kconsumerd_fd *tmp_fd; + + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(buf->fd); + if (ret == 1) { + goto end; + } + + tmp_fd = malloc(sizeof(struct kconsumerd_fd)); + tmp_fd->sessiond_fd = buf->fd; + tmp_fd->consumerd_fd = consumerd_fd; + tmp_fd->state = buf->state; + tmp_fd->max_sb_size = buf->max_sb_size; + strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); + tmp_fd->path_name[PATH_MAX - 1] = '\0'; + + /* Opening the tracefile in write mode */ + ret = open(tmp_fd->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", tmp_fd->path_name); + perror("open"); + goto end; + } + tmp_fd->out_fd = ret; + tmp_fd->out_fd_offset = 0; + + DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, + tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); + + cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); + kconsumerd_data.fds_count++; + kconsumerd_data.need_update = 1; +end: + pthread_mutex_unlock(&kconsumerd_data.lock); + return ret; +} + +/* + * kconsumerd_change_fd_state + * + * Update a fd according to what we just received + */ +static void kconsumerd_change_fd_state(int sessiond_fd, + enum kconsumerd_fd_state state) +{ + struct kconsumerd_fd *iter; + + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + if (iter->sessiond_fd == sessiond_fd) { + iter->state = state; + break; + } + } + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); +} + +/* + * kconsumerd_update_poll_array + * + * Allocate the pollfd structure and the local view of the out fds + * to avoid doing a lookup in the linked list and concurrency issues + * when writing is needed. + * Returns the number of fds in the structures + * Called with kconsumerd_data.lock held. + */ +static int kconsumerd_update_poll_array(struct pollfd **pollfd, + struct kconsumerd_fd **local_kconsumerd_fd) +{ + struct kconsumerd_fd *iter; + int i = 0; + + DBG("Updating poll fd array"); + + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + DBG("Inside for each"); + if (iter->state == ACTIVE_FD) { + DBG("Active FD %d", iter->consumerd_fd); + (*pollfd)[i].fd = iter->consumerd_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_kconsumerd_fd[i] = iter; + i++; + } + } + + /* + * insert the kconsumerd_poll_pipe at the end of the array and don't + * increment i so nb_fd is the number of real FD + */ + (*pollfd)[i].fd = kconsumerd_poll_pipe[0]; + (*pollfd)[i].events = POLLIN; + return i; +} + + +/* + * kconsumerd_on_read_subbuffer_mmap + * + * mmap the ring buffer, read it and write the data to the tracefile. + * Returns the number of bytes written + */ +static int kconsumerd_on_read_subbuffer_mmap( + struct kconsumerd_fd *kconsumerd_fd, unsigned long len) +{ + unsigned long mmap_len, mmap_offset, padded_len, padding_len; + char *mmap_base; + char *padding = NULL; + long ret = 0; + off_t orig_offset = kconsumerd_fd->out_fd_offset; + int fd = kconsumerd_fd->consumerd_fd; + int outfd = kconsumerd_fd->out_fd; + + /* get the padded subbuffer size to know the padding required */ + ret = kernctl_get_padded_subbuf_size(fd, &padded_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_padded_subbuf_size"); + goto end; + } + padding_len = padded_len - len; + padding = malloc(padding_len * sizeof(char)); + memset(padding, '\0', padding_len); + + /* get the len of the mmap region */ + ret = kernctl_get_mmap_len(fd, &mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto end; + } + + /* get the offset inside the fd to mmap */ + ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_read_offset"); + goto end; + } + + mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset); + if (mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto end; + } + + while (len > 0) { + ret = write(outfd, mmap_base, len); + if (ret >= len) { + len = 0; + } else if (ret < 0) { + ret = errno; + perror("Error in file write"); + goto end; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + kconsumerd_fd->out_fd_offset += ret; + } + + /* once all the data is written, write the padding to disk */ + ret = write(outfd, padding, padding_len); + if (ret < 0) { + ret = errno; + perror("Error writing padding to file"); + goto end; + } + + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset >= kconsumerd_fd->max_sb_size) { + sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near future after + * we write it. + * + * We need to call fadvise again after the file grows because the + * kernel does not seem to apply fadvise to non-existing parts of the + * file. + * + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not well + * defined. So it can be expected to lead to lower throughput in + * streaming. + */ + posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); + } + goto end; + +end: + if (padding != NULL) { + free(padding); + } + return ret; +} + +/* + * kconsumerd_on_read_subbuffer + * + * Splice the data from the ring buffer to the tracefile. + * Returns the number of bytes spliced + */ +static int kconsumerd_on_read_subbuffer( + struct kconsumerd_fd *kconsumerd_fd, unsigned long len) +{ + long ret = 0; + loff_t offset = 0; + off_t orig_offset = kconsumerd_fd->out_fd_offset; + int fd = kconsumerd_fd->consumerd_fd; + int outfd = kconsumerd_fd->out_fd; + + while (len > 0) { + DBG("splice chan to pipe offset %lu (fd : %d)", + (unsigned long)offset, fd); + ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe ret %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in relay splice"); + goto splice_error; + } + + ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in file splice"); + goto splice_error; + } + if (ret >= len) { + len = 0; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + kconsumerd_fd->out_fd_offset += ret; + } + + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset >= kconsumerd_fd->max_sb_size) { + sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near future after + * we write it. + * + * We need to call fadvise again after the file grows because the + * kernel does not seem to apply fadvise to non-existing parts of the + * file. + * + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not well + * defined. So it can be expected to lead to lower throughput in + * streaming. + */ + posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); + } + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch(ret) { + case EBADF: + kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE); + break; + } + +end: + return ret; +} + +/* + * kconsumerd_read_subbuffer + * + * Consume data on a file descriptor and write it on a trace file + */ +static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) +{ + unsigned long len; + int err; + long ret = 0; + int infd = kconsumerd_fd->consumerd_fd; + + DBG("In kconsumerd_read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ + err = kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + + switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { + case LTTNG_EVENT_SPLICE: + /* read the whole subbuffer */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile"); + } + break; + case LTTNG_EVENT_MMAP: + /* read the used subbuffer size */ + err = kernctl_get_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + break; + default: + ERR("Unknown output method"); + ret = -1; + } + + err = kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } + +end: + return ret; +} + +/* + * kconsumerd_poll_socket + * + * Poll on the should_quit pipe and the command socket + * return -1 on error and should exit, 0 if data is + * available on the command socket + */ +int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) +{ + int num_rdy; + + num_rdy = poll(kconsumerd_sockpoll, 2, -1); + if (num_rdy == -1) { + perror("Poll error"); + goto exit; + } + if (kconsumerd_sockpoll[0].revents == POLLIN) { + DBG("kconsumerd_should_quit wake up"); + goto exit; + } + return 0; + +exit: + return -1; +} + +/* + * kconsumerd_consumerd_recv_fd + * + * Receives an array of file descriptors and the associated + * structures describing each fd (path name). + * Returns the size of received data + */ +static int kconsumerd_consumerd_recv_fd(int sfd, + struct pollfd *kconsumerd_sockpoll, int size, + enum kconsumerd_command cmd_type) +{ + struct iovec iov[1]; + int ret = 0, i, tmp2; + struct cmsghdr *cmsg; + int nb_fd; + char recv_fd[CMSG_SPACE(sizeof(int))]; + struct lttcomm_kconsumerd_msg lkm; + + /* the number of fds we are about to receive */ + nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); + + /* + * nb_fd is the number of fds we receive. One fd per recvmsg. + */ + for (i = 0; i < nb_fd; i++) { + struct msghdr msg = { 0 }; + + /* Prepare to receive the structures */ + iov[0].iov_base = &lkm; + iov[0].iov_len = sizeof(lkm); + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + msg.msg_control = recv_fd; + msg.msg_controllen = sizeof(recv_fd); + + DBG("Waiting to receive fd"); + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + + if ((ret = recvmsg(sfd, &msg, 0)) < 0) { + perror("recvmsg"); + continue; + } + + if (ret != (size / nb_fd)) { + ERR("Received only %d, expected %d", ret, size); + kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); + goto end; + } + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) { + ERR("Invalid control message header"); + ret = -1; + kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); + goto end; + } + + /* if we received fds */ + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + switch (cmd_type) { + case ADD_STREAM: + DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); + ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); + if (ret < 0) { + kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR); + goto end; + } + break; + case UPDATE_STREAM: + kconsumerd_change_fd_state(lkm.fd, lkm.state); + break; + default: + break; + } + /* signal the poll thread */ + tmp2 = write(kconsumerd_poll_pipe[1], "4", 1); + if (tmp2 < 0) { + perror("write kconsumerd poll"); + } + } else { + ERR("Didn't received any fd"); + kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); + ret = -1; + goto end; + } + } + +end: + return ret; +} + +/* + * kconsumerd_thread_poll_fds + * + * This thread polls the fds in the ltt_fd_list to consume the data + * and write it to tracefile if necessary. + */ +void *kconsumerd_thread_poll_fds(void *data) +{ + int num_rdy, num_hup, high_prio, ret, i; + struct pollfd *pollfd = NULL; + /* local view of the fds */ + struct kconsumerd_fd **local_kconsumerd_fd = NULL; + /* local view of kconsumerd_data.fds_count */ + int nb_fd = 0; + char tmp; + int tmp2; + + ret = pipe(kconsumerd_thread_pipe); + if (ret < 0) { + perror("Error creating pipe"); + goto end; + } + + local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd)); + + while (1) { + high_prio = 0; + num_hup = 0; + + /* + * the ltt_fd_list has been updated, we need to update our + * local array as well + */ + pthread_mutex_lock(&kconsumerd_data.lock); + if (kconsumerd_data.need_update) { + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_kconsumerd_fd != NULL) { + free(local_kconsumerd_fd); + local_kconsumerd_fd = NULL; + } + + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ + pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); + if (pollfd == NULL) { + perror("pollfd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); + goto end; + } + + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ + local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * + sizeof(struct kconsumerd_fd)); + if (local_kconsumerd_fd == NULL) { + perror("local_kconsumerd_fd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); + goto end; + } + ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd); + if (ret < 0) { + ERR("Error in allocating pollfd or local_outfds"); + kconsumerd_send_error(KCONSUMERD_POLL_ERROR); + pthread_mutex_unlock(&kconsumerd_data.lock); + goto end; + } + nb_fd = ret; + kconsumerd_data.need_update = 0; + } + pthread_mutex_unlock(&kconsumerd_data.lock); + + /* poll on the array of fds */ + DBG("polling on %d fd", nb_fd + 1); + num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout); + DBG("poll num_rdy : %d", num_rdy); + if (num_rdy == -1) { + perror("Poll error"); + kconsumerd_send_error(KCONSUMERD_POLL_ERROR); + goto end; + } else if (num_rdy == 0) { + DBG("Polling thread timed out"); + goto end; + } + + /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */ + if (nb_fd == 0 && kconsumerd_quit == 1) { + goto end; + } + + /* + * If the kconsumerd_poll_pipe triggered poll go + * directly to the beginning of the loop to update the + * array. We want to prioritize array update over + * low-priority reads. + */ + if (pollfd[nb_fd].revents == POLLIN) { + DBG("kconsumerd_poll_pipe wake up"); + tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1); + if (tmp2 < 0) { + perror("read kconsumerd poll"); + } + continue; + } + + /* Take care of high priority channels first. */ + for (i = 0; i < nb_fd; i++) { + switch(pollfd[i].revents) { + case POLLERR: + ERR("Error returned in polling fd %d.", pollfd[i].fd); + kconsumerd_del_fd(local_kconsumerd_fd[i]); + num_hup++; + break; + case POLLHUP: + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); + kconsumerd_del_fd(local_kconsumerd_fd[i]); + num_hup++; + break; + case POLLNVAL: + ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); + kconsumerd_del_fd(local_kconsumerd_fd[i]); + num_hup++; + break; + case POLLPRI: + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + break; + } + } + + /* If every buffer FD has hung up, we end the read loop here */ + if (nb_fd > 0 && num_hup == nb_fd) { + DBG("every buffer FD has hung up\n"); + if (kconsumerd_quit == 1) { + goto end; + } + continue; + } + + /* Take care of low priority channels. */ + if (high_prio == 0) { + for (i = 0; i < nb_fd; i++) { + if (pollfd[i].revents == POLLIN) { + DBG("Normal read on fd %d", pollfd[i].fd); + ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]); + /* it's ok to have an unavailable subbuffer */ + if (ret == EAGAIN) { + ret = 0; + } + } + } + } + } +end: + DBG("polling thread exiting"); + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_kconsumerd_fd != NULL) { + free(local_kconsumerd_fd); + local_kconsumerd_fd = NULL; + } + return NULL; +} + +/* + * kconsumerd_init(void) + * + * initialise the necessary environnement : + * - inform the polling thread to update the polling array + * - create the poll_pipe + * - create the should_quit pipe (for signal handler) + */ +int kconsumerd_init(void) +{ + int ret; + + /* need to update the polling array at init time */ + kconsumerd_data.need_update = 1; + + ret = pipe(kconsumerd_poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + goto end; + } + + ret = pipe(kconsumerd_should_quit); + if (ret < 0) { + perror("Error creating recv pipe"); + goto end; + } + +end: + return ret; +} + +/* + * kconsumerd_thread_receive_fds + * + * This thread listens on the consumerd socket and + * receives the file descriptors from ltt-sessiond + */ +void *kconsumerd_thread_receive_fds(void *data) +{ + int sock, client_socket, ret; + struct lttcomm_kconsumerd_header tmp; + /* + * structure to poll for incoming data on communication socket + * avoids making blocking sockets + */ + struct pollfd kconsumerd_sockpoll[2]; + + + DBG("Creating command socket %s", kconsumerd_command_sock_path); + unlink(kconsumerd_command_sock_path); + client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path); + if (client_socket < 0) { + ERR("Cannot create command socket"); + goto end; + } + + ret = lttcomm_listen_unix_sock(client_socket); + if (ret < 0) { + goto end; + } + + DBG("Sending ready command to ltt-sessiond"); + ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY); + if (ret < 0) { + ERR("Error sending ready command to ltt-sessiond"); + goto end; + } + + ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* prepare the FDs to poll : to client socket and the should_quit pipe */ + kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0]; + kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; + kconsumerd_sockpoll[1].fd = client_socket; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Connection on client_socket"); + + /* Blocking call, waiting for transmission */ + sock = lttcomm_accept_unix_sock(client_socket); + if (sock <= 0) { + WARN("On accept"); + goto end; + } + ret = fcntl(sock, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* update the polling structure to poll on the established socket */ + kconsumerd_sockpoll[1].fd = sock; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + + while (1) { + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Incoming fds on sock"); + + /* We first get the number of fd we are about to receive */ + ret = lttcomm_recv_unix_sock(sock, &tmp, + sizeof(struct lttcomm_kconsumerd_header)); + if (ret <= 0) { + ERR("Communication interrupted on command socket"); + goto end; + } + if (tmp.cmd_type == STOP) { + DBG("Received STOP command"); + goto end; + } + if (kconsumerd_quit) { + DBG("kconsumerd_thread_receive_fds received quit from signal"); + goto end; + } + + /* we received a command to add or update fds */ + ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll, + tmp.payload_size, tmp.cmd_type); + if (ret <= 0) { + ERR("Receiving the FD, exiting"); + goto end; + } + DBG("received fds on sock"); + } + +end: + DBG("kconsumerd_thread_receive_fds exiting"); + + /* + * when all fds have hung up, the polling thread + * can exit cleanly + */ + kconsumerd_quit = 1; + + /* + * 2s of grace period, if no polling events occur during + * this period, the polling thread will exit even if there + * are still open FDs (should not happen, but safety mechanism). + */ + kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD; + + /* wake up the polling thread */ + ret = write(kconsumerd_poll_pipe[1], "4", 1); + if (ret < 0) { + perror("poll pipe write"); + } + return NULL; +} + +/* + * kconsumerd_cleanup + * + * Cleanup the daemon's socket on exit + */ +void kconsumerd_cleanup(void) +{ + struct kconsumerd_fd *iter, *tmp; + + /* remove the socket file */ + unlink(kconsumerd_command_sock_path); + + /* + * close all outfd. Called when there are no more threads + * running (after joining on the threads), no need to protect + * list iteration with mutex. + */ + cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) { + kconsumerd_del_fd(iter); + } +} + +/* + * kconsumerd_should_exit + * + * Called from signal handler. + */ +void kconsumerd_should_exit(void) +{ + int ret; + kconsumerd_quit = 1; + ret = write(kconsumerd_should_quit[1], "4", 1); + if (ret < 0) { + perror("write kconsumerd quit"); + } +} + +/* + * kconsumerd_send_error + * + * send return code to ltt-sessiond + */ +int kconsumerd_send_error(enum lttcomm_return_code cmd) +{ + if (kconsumerd_error_socket > 0) { + return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd, + sizeof(enum lttcomm_sessiond_command)); + } + + return 0; +} diff --git a/liblttkconsumerd/lttkconsumerd.h b/liblttkconsumerd/lttkconsumerd.h new file mode 100644 index 000000000..d085a7950 --- /dev/null +++ b/liblttkconsumerd/lttkconsumerd.h @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LIBLTTKCONSUMERD_H +#define _LIBLTTKCONSUMERD_H + +#include +#include "lttng-kconsumerd.h" + +/* + * When the receiving thread dies, we need to have a way to make + * the polling thread exit eventually. + * If all FDs hang up (normal case when the ltt-sessiond stops), + * we can exit cleanly, but if there is a problem and for whatever + * reason some FDs remain open, the consumer should still exit eventually. + * + * If the timeout is reached, it means that during this period + * no events occurred on the FDs so we need to force an exit. + * This case should not happen but it is a safety to ensure we won't block + * the consumer indefinitely. + * + * The value of 2 seconds is an arbitrary choice. + */ +#define KCONSUMERD_POLL_GRACE_PERIOD 2000 + +struct kconsumerd_fd_list { + struct cds_list_head head; +}; + +/* + * Internal representation of the FDs, + * sessiond_fd is used to identify uniquely a fd + */ +struct kconsumerd_fd { + struct cds_list_head list; + int sessiond_fd; /* used to identify uniquely a fd with sessiond */ + int consumerd_fd; /* fd to consume */ + int out_fd; /* output file to write the data */ + off_t out_fd_offset; /* write position in the output file descriptor */ + char path_name[PATH_MAX]; /* tracefile name */ + enum kconsumerd_fd_state state; + unsigned long max_sb_size; /* the subbuffer size for this channel */ +}; + +int kconsumerd_init(void); +int kconsumerd_send_error(enum lttcomm_return_code cmd); +int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll); +void *kconsumerd_thread_poll_fds(void *data); +void *kconsumerd_thread_receive_fds(void *data); +void kconsumerd_should_exit(void); +void kconsumerd_cleanup(void); +void kconsumerd_set_error_socket(int sock); +void kconsumerd_set_command_socket_path(char *sock); + +#endif /* _LIBLTTKCONSUMERD_H */ diff --git a/liblttngctl/Makefile.am b/liblttngctl/Makefile.am index 943295653..87b94dc86 100644 --- a/liblttngctl/Makefile.am +++ b/liblttngctl/Makefile.am @@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include lib_LTLIBRARIES = liblttngctl.la -liblttngctl_la_SOURCES = liblttngctl.c +liblttngctl_la_SOURCES = lttngctl.c liblttngctl_la_LIBADD = \ $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la diff --git a/liblttngctl/liblttngctl.c b/liblttngctl/liblttngctl.c deleted file mode 100644 index ec01859a9..000000000 --- a/liblttngctl/liblttngctl.c +++ /dev/null @@ -1,639 +0,0 @@ -/* - * liblttngctl.c - * - * Linux Trace Toolkit Control Library - * - * Copyright (C) 2011 David Goulet - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; only - * version 2.1 of the License. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - */ - -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include - -#include - -#include -#include "lttngerr.h" -#include "lttng-share.h" - -/* Socket to session daemon for communication */ -static int sessiond_socket; -static char sessiond_sock_path[PATH_MAX]; - -/* Communication structure to ltt-sessiond */ -static struct lttcomm_session_msg lsm; -static struct lttcomm_lttng_msg llm; - -/* Variables */ -static char *tracing_group; -static int connected; - -/* - * Copy string from src to dst and enforce null terminated byte. - */ -static void copy_string(char *dst, const char *src, size_t len) -{ - if (src && dst) { - strncpy(dst, src, len); - /* Enforce the NULL terminated byte */ - dst[len - 1] = '\0'; - } -} - -/* - * send_data_sessiond - * - * Send lttcomm_session_msg to the session daemon. - * - * On success, return 0 - * On error, return error code - */ -static int send_data_sessiond(void) -{ - int ret; - - if (!connected) { - ret = -ENOTCONN; - goto end; - } - - ret = lttcomm_send_unix_sock(sessiond_socket, &lsm, sizeof(lsm)); - -end: - return ret; -} - -/* - * recv_data_sessiond - * - * Receive data from the sessiond socket. - * - * On success, return 0 - * On error, return recv() error code - */ -static int recv_data_sessiond(void *buf, size_t len) -{ - int ret; - - if (!connected) { - ret = -ENOTCONN; - goto end; - } - - ret = lttcomm_recv_unix_sock(sessiond_socket, buf, len); - -end: - return ret; -} - -/* - * Check if the specified group name exist. - * - * If yes return 0, else return -1. - */ -static int check_tracing_group(const char *grp_name) -{ - struct group *grp_tracing; /* no free(). See getgrnam(3) */ - gid_t *grp_list; - int grp_list_size, grp_id, i; - int ret = -1; - - /* Get GID of group 'tracing' */ - grp_tracing = getgrnam(grp_name); - if (grp_tracing == NULL) { - /* NULL means not found also. getgrnam(3) */ - if (errno != 0) { - perror("getgrnam"); - } - goto end; - } - - /* Get number of supplementary group IDs */ - grp_list_size = getgroups(0, NULL); - if (grp_list_size < 0) { - perror("getgroups"); - goto end; - } - - /* Alloc group list of the right size */ - grp_list = malloc(grp_list_size * sizeof(gid_t)); - grp_id = getgroups(grp_list_size, grp_list); - if (grp_id < -1) { - perror("getgroups"); - goto free_list; - } - - for (i = 0; i < grp_list_size; i++) { - if (grp_list[i] == grp_tracing->gr_gid) { - ret = 0; - break; - } - } - -free_list: - free(grp_list); - -end: - return ret; -} - -/* - * Set sessiond socket path by putting it in the global sessiond_sock_path - * variable. - */ -static int set_session_daemon_path(void) -{ - int ret; - - /* Are we in the tracing group ? */ - ret = check_tracing_group(tracing_group); - if (ret < 0 && getuid() != 0) { - if (snprintf(sessiond_sock_path, PATH_MAX, - DEFAULT_HOME_CLIENT_UNIX_SOCK, - getenv("HOME")) < 0) { - return -ENOMEM; - } - } else { - copy_string(sessiond_sock_path, DEFAULT_GLOBAL_CLIENT_UNIX_SOCK, - PATH_MAX); - } - - return 0; -} - -/* - * Connect to the LTTng session daemon. - * - * On success, return 0. On error, return -1. - */ -static int connect_sessiond(void) -{ - int ret; - - ret = set_session_daemon_path(); - if (ret < 0) { - return ret; - } - - /* Connect to the sesssion daemon */ - ret = lttcomm_connect_unix_sock(sessiond_sock_path); - if (ret < 0) { - return ret; - } - - sessiond_socket = ret; - connected = 1; - - return 0; -} - -/* - * Clean disconnect the session daemon. - */ -static int disconnect_sessiond(void) -{ - int ret = 0; - - if (connected) { - ret = lttcomm_close_unix_sock(sessiond_socket); - sessiond_socket = 0; - connected = 0; - } - - return ret; -} - -/* - * Reset the session message structure. - */ -static void reset_session_msg(void) -{ - memset(&lsm, 0, sizeof(struct lttcomm_session_msg)); -} - -/* - * ask_sessiond - * - * Ask the session daemon a specific command and put the data into buf. - * - * Return size of data (only payload, not header). - */ -static int ask_sessiond(enum lttcomm_sessiond_command lct, void **buf) -{ - int ret; - size_t size; - void *data = NULL; - - ret = connect_sessiond(); - if (ret < 0) { - goto end; - } - - lsm.cmd_type = lct; - - /* Send command to session daemon */ - ret = send_data_sessiond(); - if (ret < 0) { - goto end; - } - - /* Get header from data transmission */ - ret = recv_data_sessiond(&llm, sizeof(llm)); - if (ret < 0) { - goto end; - } - - /* Check error code if OK */ - if (llm.ret_code != LTTCOMM_OK) { - ret = -llm.ret_code; - goto end; - } - - size = llm.data_size; - if (size == 0) { - ret = 0; - goto end; - } - - data = (void*) malloc(size); - - /* Get payload data */ - ret = recv_data_sessiond(data, size); - if (ret < 0) { - free(data); - goto end; - } - - *buf = data; - ret = size; - -end: - disconnect_sessiond(); - reset_session_msg(); - return ret; -} - -/* - * Copy domain to lttcomm_session_msg domain. If unknown domain, default domain - * will be the kernel. - */ -static void copy_lttng_domain(struct lttng_domain *dom) -{ - if (dom) { - switch (dom->type) { - case LTTNG_DOMAIN_KERNEL: - case LTTNG_DOMAIN_UST: - case LTTNG_DOMAIN_UST_EXEC_NAME: - case LTTNG_DOMAIN_UST_PID: - case LTTNG_DOMAIN_UST_PID_FOLLOW_CHILDREN: - memcpy(&lsm.domain, dom, sizeof(struct lttng_domain)); - break; - default: - lsm.domain.type = LTTNG_DOMAIN_KERNEL; - break; - } - } -} - -/* - * Start tracing for all trace of the session. - */ -int lttng_start_tracing(const char *session_name) -{ - copy_string(lsm.session.name, session_name, NAME_MAX); - return ask_sessiond(LTTNG_START_TRACE, NULL); -} - -/* - * Stop tracing for all trace of the session. - */ -int lttng_stop_tracing(const char *session_name) -{ - copy_string(lsm.session.name, session_name, NAME_MAX); - return ask_sessiond(LTTNG_STOP_TRACE, NULL); -} - -/* - * lttng_add_context - */ -int lttng_add_context(struct lttng_domain *domain, - struct lttng_event_context *ctx, const char *event_name, - const char *channel_name) -{ - copy_string(lsm.u.context.channel_name, channel_name, NAME_MAX); - copy_string(lsm.u.context.event_name, event_name, NAME_MAX); - copy_lttng_domain(domain); - - if (ctx) { - memcpy(&lsm.u.context.ctx, ctx, sizeof(struct lttng_event_context)); - } - - return ask_sessiond(LTTNG_ADD_CONTEXT, NULL); -} - -/* - * lttng_enable_event - */ -int lttng_enable_event(struct lttng_domain *domain, - struct lttng_event *ev, const char *channel_name) -{ - int ret; - - if (channel_name == NULL) { - copy_string(lsm.u.enable.channel_name, DEFAULT_CHANNEL_NAME, NAME_MAX); - } else { - copy_string(lsm.u.enable.channel_name, channel_name, NAME_MAX); - } - - copy_lttng_domain(domain); - - if (ev == NULL) { - ret = ask_sessiond(LTTNG_ENABLE_ALL_EVENT, NULL); - } else { - memcpy(&lsm.u.enable.event, ev, sizeof(struct lttng_event)); - ret = ask_sessiond(LTTNG_ENABLE_EVENT, NULL); - } - - return ret; -} - -/* - * Disable event of a channel and domain. - */ -int lttng_disable_event(struct lttng_domain *domain, const char *name, - const char *channel_name) -{ - int ret = -1; - - if (channel_name == NULL) { - copy_string(lsm.u.disable.channel_name, DEFAULT_CHANNEL_NAME, NAME_MAX); - } else { - copy_string(lsm.u.disable.channel_name, channel_name, NAME_MAX); - } - - copy_lttng_domain(domain); - - if (name == NULL) { - ret = ask_sessiond(LTTNG_DISABLE_ALL_EVENT, NULL); - } else { - copy_string(lsm.u.disable.name, name, NAME_MAX); - ret = ask_sessiond(LTTNG_DISABLE_EVENT, NULL); - } - - return ret; -} - -/* - * Enable channel per domain - */ -int lttng_enable_channel(struct lttng_domain *domain, - struct lttng_channel *chan) -{ - if (chan) { - memcpy(&lsm.u.channel.chan, chan, sizeof(struct lttng_channel)); - } - - copy_lttng_domain(domain); - - return ask_sessiond(LTTNG_ENABLE_CHANNEL, NULL); -} - -/* - * All tracing will be stopped for registered events of the channel. - */ -int lttng_disable_channel(struct lttng_domain *domain, const char *name) -{ - copy_string(lsm.u.disable.channel_name, name, NAME_MAX); - copy_lttng_domain(domain); - - return ask_sessiond(LTTNG_DISABLE_CHANNEL, NULL); -} - -/* - * List all available tracepoints of domain. - * - * Return the size (bytes) of the list and set the events array. - * On error, return negative value. - */ -int lttng_list_tracepoints(struct lttng_domain *domain, - struct lttng_event **events) -{ - int ret; - - copy_lttng_domain(domain); - - ret = ask_sessiond(LTTNG_LIST_TRACEPOINTS, (void **) events); - if (ret < 0) { - return ret; - } - - return ret / sizeof(struct lttng_event); -} - -/* - * Return a human readable string of code - */ -const char *lttng_get_readable_code(int code) -{ - if (code > -LTTCOMM_OK) { - return "Ended with errors"; - } - - return lttcomm_get_readable_code(code); -} - -/* - * Create a brand new session using name. - */ -int lttng_create_session(const char *name, const char *path) -{ - copy_string(lsm.session.name, name, NAME_MAX); - copy_string(lsm.session.path, path, PATH_MAX); - return ask_sessiond(LTTNG_CREATE_SESSION, NULL); -} - -/* - * Destroy session using name. - */ -int lttng_destroy_session(const char *name) -{ - copy_string(lsm.session.name, name, NAME_MAX); - return ask_sessiond(LTTNG_DESTROY_SESSION, NULL); -} - -/* - * Ask the session daemon for all available sessions. - * - * Return number of session. - * On error, return negative value. - */ -int lttng_list_sessions(struct lttng_session **sessions) -{ - int ret; - - ret = ask_sessiond(LTTNG_LIST_SESSIONS, (void**) sessions); - if (ret < 0) { - return ret; - } - - return ret / sizeof(struct lttng_session); -} - -/* - * List domain of a session. - */ -int lttng_list_domains(const char *session_name, struct lttng_domain **domains) -{ - int ret; - - copy_string(lsm.session.name, session_name, NAME_MAX); - ret = ask_sessiond(LTTNG_LIST_DOMAINS, (void**) domains); - if (ret < 0) { - return ret; - } - - return ret / sizeof(struct lttng_domain); -} - -/* - * List channels of a session - */ -int lttng_list_channels(struct lttng_domain *domain, - const char *session_name, struct lttng_channel **channels) -{ - int ret; - - copy_string(lsm.session.name, session_name, NAME_MAX); - copy_lttng_domain(domain); - - ret = ask_sessiond(LTTNG_LIST_CHANNELS, (void**) channels); - if (ret < 0) { - return ret; - } - - return ret / sizeof(struct lttng_channel); -} - -/* - * List events of a session channel. - */ -int lttng_list_events(struct lttng_domain *domain, - const char *session_name, const char *channel_name, - struct lttng_event **events) -{ - int ret; - - copy_string(lsm.session.name, session_name, NAME_MAX); - copy_string(lsm.u.list.channel_name, channel_name, NAME_MAX); - copy_lttng_domain(domain); - - ret = ask_sessiond(LTTNG_LIST_EVENTS, (void**) events); - if (ret < 0) { - return ret; - } - - return ret / sizeof(struct lttng_event); -} - -/* - * Set session name for the current lsm. - */ -void lttng_set_session_name(const char *name) -{ - copy_string(lsm.session.name, name, NAME_MAX); -} - -/* - * lttng_set_tracing_group - * - * Set tracing group variable with name. This function - * allocate memory pointed by tracing_group. - */ -int lttng_set_tracing_group(const char *name) -{ - if (asprintf(&tracing_group, "%s", name) < 0) { - return -ENOMEM; - } - - return 0; -} - -/* - * lttng_calibrate - */ -int lttng_calibrate(struct lttng_domain *domain, - struct lttng_calibrate *calibrate) -{ - int ret; - - copy_lttng_domain(domain); - - memcpy(&lsm.u.calibrate, calibrate, sizeof(struct lttng_calibrate)); - ret = ask_sessiond(LTTNG_CALIBRATE, NULL); - - return ret; -} - -/* - * lttng_check_session_daemon - * - * Yes, return 1 - * No, return 0 - * Error, return negative value - */ -int lttng_session_daemon_alive(void) -{ - int ret; - - ret = set_session_daemon_path(); - if (ret < 0) { - /* Error */ - return ret; - } - - /* If socket exist, we check if the daemon listens to connect. */ - ret = access(sessiond_sock_path, F_OK); - if (ret < 0) { - /* Not alive */ - return 0; - } - - ret = lttcomm_connect_unix_sock(sessiond_sock_path); - if (ret < 0) { - /* Not alive */ - return 0; - } - ret = lttcomm_close_unix_sock(ret); - if (ret < 0) - perror("lttcomm_close_unix_sock"); - - /* Is alive */ - return 1; -} - -/* - * lib constructor - */ -static void __attribute__((constructor)) init() -{ - /* Set default session group */ - lttng_set_tracing_group(LTTNG_DEFAULT_TRACING_GROUP); -} diff --git a/liblttngctl/lttngctl.c b/liblttngctl/lttngctl.c new file mode 100644 index 000000000..ec01859a9 --- /dev/null +++ b/liblttngctl/lttngctl.c @@ -0,0 +1,639 @@ +/* + * liblttngctl.c + * + * Linux Trace Toolkit Control Library + * + * Copyright (C) 2011 David Goulet + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + +#include + +#include +#include "lttngerr.h" +#include "lttng-share.h" + +/* Socket to session daemon for communication */ +static int sessiond_socket; +static char sessiond_sock_path[PATH_MAX]; + +/* Communication structure to ltt-sessiond */ +static struct lttcomm_session_msg lsm; +static struct lttcomm_lttng_msg llm; + +/* Variables */ +static char *tracing_group; +static int connected; + +/* + * Copy string from src to dst and enforce null terminated byte. + */ +static void copy_string(char *dst, const char *src, size_t len) +{ + if (src && dst) { + strncpy(dst, src, len); + /* Enforce the NULL terminated byte */ + dst[len - 1] = '\0'; + } +} + +/* + * send_data_sessiond + * + * Send lttcomm_session_msg to the session daemon. + * + * On success, return 0 + * On error, return error code + */ +static int send_data_sessiond(void) +{ + int ret; + + if (!connected) { + ret = -ENOTCONN; + goto end; + } + + ret = lttcomm_send_unix_sock(sessiond_socket, &lsm, sizeof(lsm)); + +end: + return ret; +} + +/* + * recv_data_sessiond + * + * Receive data from the sessiond socket. + * + * On success, return 0 + * On error, return recv() error code + */ +static int recv_data_sessiond(void *buf, size_t len) +{ + int ret; + + if (!connected) { + ret = -ENOTCONN; + goto end; + } + + ret = lttcomm_recv_unix_sock(sessiond_socket, buf, len); + +end: + return ret; +} + +/* + * Check if the specified group name exist. + * + * If yes return 0, else return -1. + */ +static int check_tracing_group(const char *grp_name) +{ + struct group *grp_tracing; /* no free(). See getgrnam(3) */ + gid_t *grp_list; + int grp_list_size, grp_id, i; + int ret = -1; + + /* Get GID of group 'tracing' */ + grp_tracing = getgrnam(grp_name); + if (grp_tracing == NULL) { + /* NULL means not found also. getgrnam(3) */ + if (errno != 0) { + perror("getgrnam"); + } + goto end; + } + + /* Get number of supplementary group IDs */ + grp_list_size = getgroups(0, NULL); + if (grp_list_size < 0) { + perror("getgroups"); + goto end; + } + + /* Alloc group list of the right size */ + grp_list = malloc(grp_list_size * sizeof(gid_t)); + grp_id = getgroups(grp_list_size, grp_list); + if (grp_id < -1) { + perror("getgroups"); + goto free_list; + } + + for (i = 0; i < grp_list_size; i++) { + if (grp_list[i] == grp_tracing->gr_gid) { + ret = 0; + break; + } + } + +free_list: + free(grp_list); + +end: + return ret; +} + +/* + * Set sessiond socket path by putting it in the global sessiond_sock_path + * variable. + */ +static int set_session_daemon_path(void) +{ + int ret; + + /* Are we in the tracing group ? */ + ret = check_tracing_group(tracing_group); + if (ret < 0 && getuid() != 0) { + if (snprintf(sessiond_sock_path, PATH_MAX, + DEFAULT_HOME_CLIENT_UNIX_SOCK, + getenv("HOME")) < 0) { + return -ENOMEM; + } + } else { + copy_string(sessiond_sock_path, DEFAULT_GLOBAL_CLIENT_UNIX_SOCK, + PATH_MAX); + } + + return 0; +} + +/* + * Connect to the LTTng session daemon. + * + * On success, return 0. On error, return -1. + */ +static int connect_sessiond(void) +{ + int ret; + + ret = set_session_daemon_path(); + if (ret < 0) { + return ret; + } + + /* Connect to the sesssion daemon */ + ret = lttcomm_connect_unix_sock(sessiond_sock_path); + if (ret < 0) { + return ret; + } + + sessiond_socket = ret; + connected = 1; + + return 0; +} + +/* + * Clean disconnect the session daemon. + */ +static int disconnect_sessiond(void) +{ + int ret = 0; + + if (connected) { + ret = lttcomm_close_unix_sock(sessiond_socket); + sessiond_socket = 0; + connected = 0; + } + + return ret; +} + +/* + * Reset the session message structure. + */ +static void reset_session_msg(void) +{ + memset(&lsm, 0, sizeof(struct lttcomm_session_msg)); +} + +/* + * ask_sessiond + * + * Ask the session daemon a specific command and put the data into buf. + * + * Return size of data (only payload, not header). + */ +static int ask_sessiond(enum lttcomm_sessiond_command lct, void **buf) +{ + int ret; + size_t size; + void *data = NULL; + + ret = connect_sessiond(); + if (ret < 0) { + goto end; + } + + lsm.cmd_type = lct; + + /* Send command to session daemon */ + ret = send_data_sessiond(); + if (ret < 0) { + goto end; + } + + /* Get header from data transmission */ + ret = recv_data_sessiond(&llm, sizeof(llm)); + if (ret < 0) { + goto end; + } + + /* Check error code if OK */ + if (llm.ret_code != LTTCOMM_OK) { + ret = -llm.ret_code; + goto end; + } + + size = llm.data_size; + if (size == 0) { + ret = 0; + goto end; + } + + data = (void*) malloc(size); + + /* Get payload data */ + ret = recv_data_sessiond(data, size); + if (ret < 0) { + free(data); + goto end; + } + + *buf = data; + ret = size; + +end: + disconnect_sessiond(); + reset_session_msg(); + return ret; +} + +/* + * Copy domain to lttcomm_session_msg domain. If unknown domain, default domain + * will be the kernel. + */ +static void copy_lttng_domain(struct lttng_domain *dom) +{ + if (dom) { + switch (dom->type) { + case LTTNG_DOMAIN_KERNEL: + case LTTNG_DOMAIN_UST: + case LTTNG_DOMAIN_UST_EXEC_NAME: + case LTTNG_DOMAIN_UST_PID: + case LTTNG_DOMAIN_UST_PID_FOLLOW_CHILDREN: + memcpy(&lsm.domain, dom, sizeof(struct lttng_domain)); + break; + default: + lsm.domain.type = LTTNG_DOMAIN_KERNEL; + break; + } + } +} + +/* + * Start tracing for all trace of the session. + */ +int lttng_start_tracing(const char *session_name) +{ + copy_string(lsm.session.name, session_name, NAME_MAX); + return ask_sessiond(LTTNG_START_TRACE, NULL); +} + +/* + * Stop tracing for all trace of the session. + */ +int lttng_stop_tracing(const char *session_name) +{ + copy_string(lsm.session.name, session_name, NAME_MAX); + return ask_sessiond(LTTNG_STOP_TRACE, NULL); +} + +/* + * lttng_add_context + */ +int lttng_add_context(struct lttng_domain *domain, + struct lttng_event_context *ctx, const char *event_name, + const char *channel_name) +{ + copy_string(lsm.u.context.channel_name, channel_name, NAME_MAX); + copy_string(lsm.u.context.event_name, event_name, NAME_MAX); + copy_lttng_domain(domain); + + if (ctx) { + memcpy(&lsm.u.context.ctx, ctx, sizeof(struct lttng_event_context)); + } + + return ask_sessiond(LTTNG_ADD_CONTEXT, NULL); +} + +/* + * lttng_enable_event + */ +int lttng_enable_event(struct lttng_domain *domain, + struct lttng_event *ev, const char *channel_name) +{ + int ret; + + if (channel_name == NULL) { + copy_string(lsm.u.enable.channel_name, DEFAULT_CHANNEL_NAME, NAME_MAX); + } else { + copy_string(lsm.u.enable.channel_name, channel_name, NAME_MAX); + } + + copy_lttng_domain(domain); + + if (ev == NULL) { + ret = ask_sessiond(LTTNG_ENABLE_ALL_EVENT, NULL); + } else { + memcpy(&lsm.u.enable.event, ev, sizeof(struct lttng_event)); + ret = ask_sessiond(LTTNG_ENABLE_EVENT, NULL); + } + + return ret; +} + +/* + * Disable event of a channel and domain. + */ +int lttng_disable_event(struct lttng_domain *domain, const char *name, + const char *channel_name) +{ + int ret = -1; + + if (channel_name == NULL) { + copy_string(lsm.u.disable.channel_name, DEFAULT_CHANNEL_NAME, NAME_MAX); + } else { + copy_string(lsm.u.disable.channel_name, channel_name, NAME_MAX); + } + + copy_lttng_domain(domain); + + if (name == NULL) { + ret = ask_sessiond(LTTNG_DISABLE_ALL_EVENT, NULL); + } else { + copy_string(lsm.u.disable.name, name, NAME_MAX); + ret = ask_sessiond(LTTNG_DISABLE_EVENT, NULL); + } + + return ret; +} + +/* + * Enable channel per domain + */ +int lttng_enable_channel(struct lttng_domain *domain, + struct lttng_channel *chan) +{ + if (chan) { + memcpy(&lsm.u.channel.chan, chan, sizeof(struct lttng_channel)); + } + + copy_lttng_domain(domain); + + return ask_sessiond(LTTNG_ENABLE_CHANNEL, NULL); +} + +/* + * All tracing will be stopped for registered events of the channel. + */ +int lttng_disable_channel(struct lttng_domain *domain, const char *name) +{ + copy_string(lsm.u.disable.channel_name, name, NAME_MAX); + copy_lttng_domain(domain); + + return ask_sessiond(LTTNG_DISABLE_CHANNEL, NULL); +} + +/* + * List all available tracepoints of domain. + * + * Return the size (bytes) of the list and set the events array. + * On error, return negative value. + */ +int lttng_list_tracepoints(struct lttng_domain *domain, + struct lttng_event **events) +{ + int ret; + + copy_lttng_domain(domain); + + ret = ask_sessiond(LTTNG_LIST_TRACEPOINTS, (void **) events); + if (ret < 0) { + return ret; + } + + return ret / sizeof(struct lttng_event); +} + +/* + * Return a human readable string of code + */ +const char *lttng_get_readable_code(int code) +{ + if (code > -LTTCOMM_OK) { + return "Ended with errors"; + } + + return lttcomm_get_readable_code(code); +} + +/* + * Create a brand new session using name. + */ +int lttng_create_session(const char *name, const char *path) +{ + copy_string(lsm.session.name, name, NAME_MAX); + copy_string(lsm.session.path, path, PATH_MAX); + return ask_sessiond(LTTNG_CREATE_SESSION, NULL); +} + +/* + * Destroy session using name. + */ +int lttng_destroy_session(const char *name) +{ + copy_string(lsm.session.name, name, NAME_MAX); + return ask_sessiond(LTTNG_DESTROY_SESSION, NULL); +} + +/* + * Ask the session daemon for all available sessions. + * + * Return number of session. + * On error, return negative value. + */ +int lttng_list_sessions(struct lttng_session **sessions) +{ + int ret; + + ret = ask_sessiond(LTTNG_LIST_SESSIONS, (void**) sessions); + if (ret < 0) { + return ret; + } + + return ret / sizeof(struct lttng_session); +} + +/* + * List domain of a session. + */ +int lttng_list_domains(const char *session_name, struct lttng_domain **domains) +{ + int ret; + + copy_string(lsm.session.name, session_name, NAME_MAX); + ret = ask_sessiond(LTTNG_LIST_DOMAINS, (void**) domains); + if (ret < 0) { + return ret; + } + + return ret / sizeof(struct lttng_domain); +} + +/* + * List channels of a session + */ +int lttng_list_channels(struct lttng_domain *domain, + const char *session_name, struct lttng_channel **channels) +{ + int ret; + + copy_string(lsm.session.name, session_name, NAME_MAX); + copy_lttng_domain(domain); + + ret = ask_sessiond(LTTNG_LIST_CHANNELS, (void**) channels); + if (ret < 0) { + return ret; + } + + return ret / sizeof(struct lttng_channel); +} + +/* + * List events of a session channel. + */ +int lttng_list_events(struct lttng_domain *domain, + const char *session_name, const char *channel_name, + struct lttng_event **events) +{ + int ret; + + copy_string(lsm.session.name, session_name, NAME_MAX); + copy_string(lsm.u.list.channel_name, channel_name, NAME_MAX); + copy_lttng_domain(domain); + + ret = ask_sessiond(LTTNG_LIST_EVENTS, (void**) events); + if (ret < 0) { + return ret; + } + + return ret / sizeof(struct lttng_event); +} + +/* + * Set session name for the current lsm. + */ +void lttng_set_session_name(const char *name) +{ + copy_string(lsm.session.name, name, NAME_MAX); +} + +/* + * lttng_set_tracing_group + * + * Set tracing group variable with name. This function + * allocate memory pointed by tracing_group. + */ +int lttng_set_tracing_group(const char *name) +{ + if (asprintf(&tracing_group, "%s", name) < 0) { + return -ENOMEM; + } + + return 0; +} + +/* + * lttng_calibrate + */ +int lttng_calibrate(struct lttng_domain *domain, + struct lttng_calibrate *calibrate) +{ + int ret; + + copy_lttng_domain(domain); + + memcpy(&lsm.u.calibrate, calibrate, sizeof(struct lttng_calibrate)); + ret = ask_sessiond(LTTNG_CALIBRATE, NULL); + + return ret; +} + +/* + * lttng_check_session_daemon + * + * Yes, return 1 + * No, return 0 + * Error, return negative value + */ +int lttng_session_daemon_alive(void) +{ + int ret; + + ret = set_session_daemon_path(); + if (ret < 0) { + /* Error */ + return ret; + } + + /* If socket exist, we check if the daemon listens to connect. */ + ret = access(sessiond_sock_path, F_OK); + if (ret < 0) { + /* Not alive */ + return 0; + } + + ret = lttcomm_connect_unix_sock(sessiond_sock_path); + if (ret < 0) { + /* Not alive */ + return 0; + } + ret = lttcomm_close_unix_sock(ret); + if (ret < 0) + perror("lttcomm_close_unix_sock"); + + /* Is alive */ + return 1; +} + +/* + * lib constructor + */ +static void __attribute__((constructor)) init() +{ + /* Set default session group */ + lttng_set_tracing_group(LTTNG_DEFAULT_TRACING_GROUP); +} diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index 4180f8901..1e2841c01 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -38,8 +38,8 @@ #include #include "lttngerr.h" -#include "libkernelctl.h" -#include "liblttkconsumerd.h" +#include "kernelctl.h" +#include "lttkconsumerd.h" /* the two threads (receive fd and poll) */ pthread_t threads[2]; diff --git a/ltt-sessiond/kernel-ctl.c b/ltt-sessiond/kernel-ctl.c index 419d1af7b..cfea8942d 100644 --- a/ltt-sessiond/kernel-ctl.c +++ b/ltt-sessiond/kernel-ctl.c @@ -25,7 +25,7 @@ #include #include "lttngerr.h" -#include "libkernelctl.h" +#include "kernelctl.h" #include "kernel-ctl.h" /*