From 008e25154ca2db688516bafcb134c984f08a5f10 Mon Sep 17 00:00:00 2001 From: Michael Sills Lavoie Date: Fri, 26 Mar 2010 14:53:37 -0400 Subject: [PATCH] Move the code from lttd to liblttd and adapt everything so it works Signed-off-by: Mathieu Desnoyers --- Makefile.am | 2 +- configure.in | 3 + liblttd/Makefile.am | 7 + liblttd/liblttd.c | 747 ++++++++++++++++++++++++++++++++++++++++ liblttd/liblttd.h | 218 ++++++++++++ lttd/Makefile.am | 3 + lttd/lttd.c | 812 ++++---------------------------------------- 7 files changed, 1054 insertions(+), 738 deletions(-) create mode 100644 liblttd/Makefile.am create mode 100644 liblttd/liblttd.c create mode 100644 liblttd/liblttd.h diff --git a/Makefile.am b/Makefile.am index 8f9974d..06500fd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,2 +1,2 @@ -SUBDIRS = liblttctl lttctl lttd specs +SUBDIRS = liblttctl lttctl liblttd lttd specs diff --git a/configure.in b/configure.in index 27727a7..e9550e0 100644 --- a/configure.in +++ b/configure.in @@ -76,8 +76,10 @@ DEFAULT_INCLUDES="-I\$(top_srcdir) -I\$(top_builddir)" #AC_SUBST(CPPFLAGS) lttctlincludedir="${includedir}/liblttctl" +liblttdincludedir="${includedir}/liblttd" AC_SUBST(lttctlincludedir) +AC_SUBST(liblttdincludedir) AC_SUBST(UTIL_LIBS) AC_SUBST(THREAD_LIBS) AC_SUBST(DEFAULT_INCLUDES) @@ -85,6 +87,7 @@ AC_SUBST(DEFAULT_INCLUDES) AC_CONFIG_FILES([Makefile liblttctl/Makefile lttctl/Makefile + liblttd/Makefile lttd/Makefile specs/Makefile]) AC_OUTPUT diff --git a/liblttd/Makefile.am b/liblttd/Makefile.am new file mode 100644 index 0000000..3c1eeda --- /dev/null +++ b/liblttd/Makefile.am @@ -0,0 +1,7 @@ + + +lib_LTLIBRARIES = liblttd.la +liblttd_la_SOURCES = liblttd.c + +liblttdinclude_HEADERS = \ + liblttd.h diff --git a/liblttd/liblttd.c b/liblttd/liblttd.c new file mode 100644 index 0000000..0f0e5b5 --- /dev/null +++ b/liblttd/liblttd.c @@ -0,0 +1,747 @@ +/* lttd + * + * Linux Trace Toolkit Daemon + * + * This is a simple daemon that reads a few relay+debugfs channels and save + * them in a trace. + * + * CPU hot-plugging is supported using inotify. + * + * Copyright 2005 - + * Mathieu Desnoyers + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include "liblttd.h" + +#define _REENTRANT +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/* Relayfs IOCTL */ +#include +#include + +/* Get the next sub buffer that can be read. */ +#define RELAY_GET_SB _IOR(0xF5, 0x00,__u32) +/* Release the oldest reserved (by "get") sub buffer. */ +#define RELAY_PUT_SB _IOW(0xF5, 0x01,__u32) +/* returns the number of sub buffers in the per cpu channel. */ +#define RELAY_GET_N_SB _IOR(0xF5, 0x02,__u32) +/* returns the size of the current sub buffer. */ +#define RELAY_GET_SB_SIZE _IOR(0xF5, 0x03, __u32) +/* returns the size of data to consume in the current sub-buffer. */ +#define RELAY_GET_MAX_SB_SIZE _IOR(0xF5, 0x04, __u32) + + +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14) +#include + +#define HAS_INOTIFY +#else +static inline int inotify_init (void) +{ + return -1; +} + +static inline int inotify_add_watch (int fd, const char *name, __u32 mask) +{ + return 0; +} + +static inline int inotify_rm_watch (int fd, __u32 wd) +{ + return 0; +} +#undef HAS_INOTIFY +#endif + +struct liblttd_callbacks *callbacks; + +struct channel_trace_fd { + struct fd_pair *pair; + int num_pairs; +}; + +struct inotify_watch { + int wd; + char path_channel[PATH_MAX]; + char *base_path_channel; +}; + +struct inotify_watch_array { + struct inotify_watch *elem; + int num; +}; + +struct channel_trace_fd fd_pairs = { NULL, 0 }; +int inotify_fd = -1; +struct inotify_watch_array inotify_watch_array = { NULL, 0 }; + +/* protects fd_pairs and inotify_watch_array */ +pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER; + +static char *channel_name = NULL; +static unsigned long num_threads = 1; +volatile static int quit_program = 0; /* For signal handler */ +static int dump_flight_only = 0; +static int dump_normal_only = 0; +static int verbose_mode = 0; + +#define printf_verbose(fmt, args...) \ + do { \ + if (verbose_mode) \ + printf(fmt, ##args); \ + } while (0) + + +int open_buffer_file(char *filename, char *path_channel, + char *base_path_channel, struct channel_trace_fd *fd_pairs) +{ + int open_ret = 0; + int ret = 0; + + if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) { + if(dump_flight_only) { + printf_verbose("Skipping normal channel %s\n", + path_channel); + return 0; + } + } else { + if(dump_normal_only) { + printf_verbose("Skipping flight channel %s\n", + path_channel); + return 0; + } + } + printf_verbose("Opening file.\n"); + + fd_pairs->pair = realloc(fd_pairs->pair, + ++fd_pairs->num_pairs * sizeof(struct fd_pair)); + + /* Open the channel in read mode */ + fd_pairs->pair[fd_pairs->num_pairs-1].channel = + open(path_channel, O_RDONLY | O_NONBLOCK); + if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) { + perror(path_channel); + fd_pairs->num_pairs--; + return 0; /* continue */ + } + + if(callbacks->on_open_channel) ret = callbacks->on_open_channel( + callbacks, &fd_pairs->pair[fd_pairs->num_pairs-1], + base_path_channel); + + if(ret != 0) { + open_ret = -1; + close(fd_pairs->pair[fd_pairs->num_pairs-1].channel); + fd_pairs->num_pairs--; + goto end; + } + +end: + return open_ret; +} + +int open_channel_trace_pairs(char *subchannel_name, + char *base_subchannel_name, + struct channel_trace_fd *fd_pairs, int *inotify_fd, + struct inotify_watch_array *iwatch_array) +{ + DIR *channel_dir = opendir(subchannel_name); + struct dirent *entry; + struct stat stat_buf; + int ret; + char path_channel[PATH_MAX]; + int path_channel_len; + char *path_channel_ptr; + char *base_subchannel_ptr; + + int open_ret = 0; + + if(channel_dir == NULL) { + perror(subchannel_name); + open_ret = ENOENT; + goto end; + } + + printf_verbose("Calling on new channels folder"); + if(callbacks->on_new_channels_folder) ret = callbacks-> + on_new_channels_folder(callbacks, + base_subchannel_name); + if(ret == -1) { + open_ret = -1; + goto end; + } + + strncpy(path_channel, subchannel_name, PATH_MAX-1); + path_channel_len = strlen(path_channel); + path_channel[path_channel_len] = '/'; + path_channel_len++; + path_channel_ptr = path_channel + path_channel_len; + base_subchannel_ptr = path_channel + + (base_subchannel_name - subchannel_name); + +#ifdef HAS_INOTIFY + iwatch_array->elem = realloc(iwatch_array->elem, + ++iwatch_array->num * sizeof(struct inotify_watch)); + + printf_verbose("Adding inotify for channel %s\n", path_channel); + iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE); + strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel); + iwatch_array->elem[iwatch_array->num-1].base_path_channel = + iwatch_array->elem[iwatch_array->num-1].path_channel + + (base_subchannel_name - subchannel_name); + printf_verbose("Added inotify for channel %s, wd %u\n", + iwatch_array->elem[iwatch_array->num-1].path_channel, + iwatch_array->elem[iwatch_array->num-1].wd); +#endif + + while((entry = readdir(channel_dir)) != NULL) { + + if(entry->d_name[0] == '.') continue; + + strncpy(path_channel_ptr, entry->d_name, PATH_MAX - path_channel_len); + + ret = stat(path_channel, &stat_buf); + if(ret == -1) { + perror(path_channel); + continue; + } + + printf_verbose("Channel file : %s\n", path_channel); + + if(S_ISDIR(stat_buf.st_mode)) { + + printf_verbose("Entering channel subdirectory...\n"); + ret = open_channel_trace_pairs(path_channel, base_subchannel_ptr, fd_pairs, + inotify_fd, iwatch_array); + if(ret < 0) continue; + } else if(S_ISREG(stat_buf.st_mode)) { + open_ret = open_buffer_file(entry->d_name, path_channel, base_subchannel_ptr, + fd_pairs); + if(open_ret) + goto end; + } + } + +end: + closedir(channel_dir); + + return open_ret; +} + + +int read_subbuffer(struct fd_pair *pair) +{ + unsigned int consumed_old, len; + int err; + long ret; + off_t offset; + + + err = ioctl(pair->channel, RELAY_GET_SB, &consumed_old); + printf_verbose("cookie : %u\n", consumed_old); + if(err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, it is due to concurrency)"); + goto get_error; + } + + err = ioctl(pair->channel, RELAY_GET_SB_SIZE, &len); + if(err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto get_error; + } + + if(callbacks->on_read_subbuffer) ret = callbacks->on_read_subbuffer( + callbacks, pair, len); + +write_error: + ret = 0; + err = ioctl(pair->channel, RELAY_PUT_SB, &consumed_old); + if(err != 0) { + ret = errno; + if(errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if(errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto get_error; + } + +get_error: + return ret; +} + + +int map_channels(struct channel_trace_fd *fd_pairs, + int idx_begin, int idx_end) +{ + int i,j; + int ret=0; + + if(fd_pairs->num_pairs <= 0) { + printf("No channel to read\n"); + goto end; + } + + /* Get the subbuf sizes and number */ + + for(i=idx_begin;ipair[i]; + + ret = ioctl(pair->channel, RELAY_GET_N_SB, &pair->n_sb); + if(ret != 0) { + perror("Error in getting the number of sub-buffers"); + goto end; + } + ret = ioctl(pair->channel, RELAY_GET_MAX_SB_SIZE, + &pair->max_sb_size); + if(ret != 0) { + perror("Error in getting the max sub-buffer size"); + goto end; + } + ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */ + if(ret != 0) { + perror("Error in mutex init"); + goto end; + } + } + +end: + return ret; +} + +int unmap_channels(struct channel_trace_fd *fd_pairs) +{ + int j; + int ret=0; + + /* Munmap each FD */ + for(j=0;jnum_pairs;j++) { + struct fd_pair *pair = &fd_pairs->pair[j]; + int err_ret; + + err_ret = pthread_mutex_destroy(&pair->mutex); + if(err_ret != 0) { + perror("Error in mutex destroy"); + } + ret |= err_ret; + } + + return ret; +} + +#ifdef HAS_INOTIFY +/* Inotify event arrived. + * + * Only support add file for now. + */ + +int read_inotify(int inotify_fd, + struct channel_trace_fd *fd_pairs, + struct inotify_watch_array *iwatch_array) +{ + char buf[sizeof(struct inotify_event) + PATH_MAX]; + char path_channel[PATH_MAX]; + ssize_t len; + struct inotify_event *ievent; + size_t offset; + unsigned int i; + int ret; + int old_num; + + offset = 0; + len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX); + if(len < 0) { + + if(errno == EAGAIN) + return 0; /* another thread got the data before us */ + + printf("Error in read from inotify FD %s.\n", strerror(len)); + return -1; + } + while(offset < len) { + ievent = (struct inotify_event *)&(buf[offset]); + for(i=0; inum; i++) { + if(iwatch_array->elem[i].wd == ievent->wd && + ievent->mask == IN_CREATE) { + printf_verbose( + "inotify wd %u event mask : %u for %s%s\n", + ievent->wd, ievent->mask, + iwatch_array->elem[i].path_channel, + ievent->name); + old_num = fd_pairs->num_pairs; + strcpy(path_channel, iwatch_array->elem[i].path_channel); + strcat(path_channel, ievent->name); + if(ret = open_buffer_file(ievent->name, path_channel, + path_channel + (iwatch_array->elem[i].base_path_channel - + iwatch_array->elem[i].path_channel), fd_pairs)) { + printf("Error opening buffer file\n"); + return -1; + } + if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) { + printf("Error mapping channel\n"); + return -1; + } + + } + } + offset += sizeof(*ievent) + ievent->len; + } +} +#endif //HAS_INOTIFY + +/* read_channels + * + * Thread worker. + * + * Read the debugfs channels and write them in the paired tracefiles. + * + * @fd_pairs : paired channels and trace files. + * + * returns 0 on success, -1 on error. + * + * Note that the high priority polled channels are consumed first. We then poll + * again to see if these channels are still in priority. Only when no + * high priority channel is left, we start reading low priority channels. + * + * Note that a channel is considered high priority when the buffer is almost + * full. + */ + +int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs, + int inotify_fd, struct inotify_watch_array *iwatch_array) +{ + struct pollfd *pollfd = NULL; + int num_pollfd; + int i,j; + int num_rdy, num_hup; + int high_prio; + int ret = 0; + int inotify_fds; + unsigned int old_num; + +#ifdef HAS_INOTIFY + inotify_fds = 1; +#else + inotify_fds = 0; +#endif + + pthread_rwlock_rdlock(&fd_pairs_lock); + + /* Start polling the FD. Keep one fd for inotify */ + pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd)); + +#ifdef HAS_INOTIFY + pollfd[0].fd = inotify_fd; + pollfd[0].events = POLLIN|POLLPRI; +#endif + + for(i=0;inum_pairs;i++) { + pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel; + pollfd[inotify_fds+i].events = POLLIN|POLLPRI; + } + num_pollfd = inotify_fds + fd_pairs->num_pairs; + + + pthread_rwlock_unlock(&fd_pairs_lock); + + while(1) { + high_prio = 0; + num_hup = 0; +#ifdef DEBUG + printf("Press a key for next poll...\n"); + char buf[1]; + read(STDIN_FILENO, &buf, 1); + printf("Next poll (polling %d fd) :\n", num_pollfd); +#endif //DEBUG + + /* Have we received a signal ? */ + if(quit_program) break; + + num_rdy = poll(pollfd, num_pollfd, -1); + + if(num_rdy == -1) { + perror("Poll error"); + goto free_fd; + } + + printf_verbose("Data received\n"); +#ifdef HAS_INOTIFY + switch(pollfd[0].revents) { + case POLLERR: + printf_verbose( + "Error returned in polling inotify fd %d.\n", + pollfd[0].fd); + break; + case POLLHUP: + printf_verbose( + "Polling inotify fd %d tells it has hung up.\n", + pollfd[0].fd); + break; + case POLLNVAL: + printf_verbose( + "Polling inotify fd %d tells fd is not open.\n", + pollfd[0].fd); + break; + case POLLPRI: + case POLLIN: + printf_verbose( + "Polling inotify fd %d : data ready.\n", + pollfd[0].fd); + + pthread_rwlock_wrlock(&fd_pairs_lock); + read_inotify(inotify_fd, fd_pairs, iwatch_array); + pthread_rwlock_unlock(&fd_pairs_lock); + + break; + } +#endif + + for(i=inotify_fds;ipair[i-inotify_fds].mutex) == 0) { + printf_verbose( + "Urgent read on fd %d\n", + pollfd[i].fd); + /* Take care of high priority channels first. */ + high_prio = 1; + /* it's ok to have an unavailable sub-buffer */ + ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]); + if(ret == EAGAIN) ret = 0; + + ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } + pthread_rwlock_unlock(&fd_pairs_lock); + break; + } + } + /* If every buffer FD has hung up, we end the read loop here */ + if(num_hup == num_pollfd - inotify_fds) break; + + if(!high_prio) { + for(i=inotify_fds;ipair[i-inotify_fds].mutex) == 0) { + /* Take care of low priority channels. */ + printf_verbose( + "Normal read on fd %d\n", + pollfd[i].fd); + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]); + if(ret == EAGAIN) ret = 0; + + ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } + pthread_rwlock_unlock(&fd_pairs_lock); + break; + } + } + } + + /* Update pollfd array if an entry was added to fd_pairs */ + pthread_rwlock_rdlock(&fd_pairs_lock); + if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) { + pollfd = realloc(pollfd, + (inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd)); + for(i=num_pollfd-inotify_fds;inum_pairs;i++) { + pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel; + pollfd[inotify_fds+i].events = POLLIN|POLLPRI; + } + num_pollfd = fd_pairs->num_pairs + inotify_fds; + } + pthread_rwlock_unlock(&fd_pairs_lock); + + /* NB: If the fd_pairs structure is updated by another thread from this + * point forward, the current thread will wait in the poll without + * monitoring the new channel. However, this thread will add the + * new channel on next poll (and this should not take too much time + * on a loaded system). + * + * This event is quite unlikely and can only occur if a CPU is + * hot-plugged while multple lttd threads are running. + */ + } + +free_fd: + free(pollfd); + +end: + return ret; +} + + +void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd, + struct inotify_watch_array *iwatch_array) +{ + int i; + int ret; + + for(i=0;inum_pairs;i++) { + ret = close(fd_pairs->pair[i].channel); + if(ret == -1) perror("Close error on channel"); + if(callbacks->on_close_channel) { + ret = callbacks->on_close_channel( + callbacks, &fd_pairs->pair[i]); + if(ret != 0) perror("Error on close channel callback"); + } + } + free(fd_pairs->pair); + free(iwatch_array->elem); +} + +/* Thread worker */ +void * thread_main(void *arg) +{ + long ret = 0; + unsigned long thread_num = (unsigned long)arg; + + if(callbacks->on_new_thread) + ret = callbacks->on_new_thread(callbacks, thread_num); + + if (ret < 0) { + return (void*)ret; + } + ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array); + + if(callbacks->on_close_thread) + callbacks->on_close_thread(callbacks, thread_num); + + return (void*)ret; +} + +/*on_close_thread has to be reentrant, it'll be called by many threads*/ +int(*on_close_thread)(struct liblttd_callbacks *data, unsigned long thread_num); + +int channels_init() +{ + int ret = 0; + + inotify_fd = inotify_init(); + fcntl(inotify_fd, F_SETFL, O_NONBLOCK); + + if(ret = open_channel_trace_pairs(channel_name, + channel_name + strlen(channel_name), &fd_pairs, + &inotify_fd, &inotify_watch_array)) + goto close_channel; + if (fd_pairs.num_pairs == 0) { + printf("No channel available for reading, exiting\n"); + ret = -ENOENT; + goto close_channel; + } + if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs)) + goto close_channel; + return 0; + +close_channel: + close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array); + if(inotify_fd >= 0) + close(inotify_fd); + return ret; +} + +int liblttd_start(char *channel_path, unsigned long n_threads, + int flight_only, int normal_only, int verbose, + struct liblttd_callbacks *user_data){ + int ret = 0; + pthread_t *tids; + unsigned long i; + void *tret; + + channel_name = channel_path; + num_threads = n_threads; + dump_flight_only = flight_only; + dump_normal_only = normal_only; + verbose_mode = verbose; + callbacks = user_data; + + if(ret = channels_init()) + return ret; + + tids = malloc(sizeof(pthread_t) * num_threads); + for(i=0; i= 0) + close(inotify_fd); + + if(callbacks->on_trace_end) callbacks->on_trace_end(callbacks); + + return ret; +} + +int liblttd_stop() { + quit_program = 1; + return 0; +} + diff --git a/liblttd/liblttd.h b/liblttd/liblttd.h new file mode 100644 index 0000000..bd76eeb --- /dev/null +++ b/liblttd/liblttd.h @@ -0,0 +1,218 @@ +/* liblttd header file + * + * Copyright 2010- + * Oumarou Dicko + * Michael Sills-Lavoie + * + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + */ + +#ifndef _LIBLTTD_H +#define _LIBLTTD_H + +#include + +/** +* This structure contains the data associated with the channel file descriptor. +* The lib user can use user_data to store the data associated to the specified +* channel. The lib user can read but MUST NOT change the other attributes. +*/ +struct fd_pair { + /** + * This is the channel file descriptor. + */ + int channel; + + /** + * This is the number of subbuffer for this channel. + */ + unsigned int n_sb; + + /** + * This is the subbuffer size for this channel. + */ + unsigned int max_sb_size; + + /** + * Not used anymore. + */ + void *mmap; + + /** + * This is a mutex for internal library usage. + */ + pthread_mutex_t mutex; + + /** + * Library user data. + */ + void *user_data; +}; + +/** +* This structure contains the necessary callbacks for a tracing session. The +* user can set the unnecessary functions to NULL if he does not need them. +*/ +struct liblttd_callbacks { + /** + * This callback is called after a channel file is open. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * @args pair This structure contains the data associated with the + * channel file descriptor. The lib user can use user_data to + * store the data associated to the specified channel. + * @args relative_channel_path This argument represents a relative path + * to the channel file. This path is relative to the root + * folder of the trace channels. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_open_channel)(struct liblttd_callbacks *data, + struct fd_pair *pair, char *relative_channel_path); + + /** + * This callback is called after a channel file is closed. + * + * @remarks After a channel file has been closed, it will never be read + * again. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * @args pair This structure contains the data associated with the + * channel file descriptor. The lib user should clean + * user_data at this time. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_close_channel)(struct liblttd_callbacks *data, + struct fd_pair *pair); + + + /** + * This callback is called when the library enter in a new subfolder + * while it is scanning the trace channel tree. It can be used to create + * the output file structure of the trace. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * @args relative_folder_path This argument represents a relative path + * to the channel folder. This path is relative to the root + * folder of the trace channels. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_new_channels_folder)(struct liblttd_callbacks *data, + char *relative_folder_path); + + /** + * This callback is called after a subbuffer is a reserved. + * + * @attention It has to be thread safe, it'll be called by many threads. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * @args pair This structure contains the data associated with the + * channel file descriptor. The lib user should clean + * user_data at this time. + * @args len This argument represents the length the data that has to be + * read. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_read_subbuffer)(struct liblttd_callbacks *data, + struct fd_pair *pair, unsigned int len); + + /** + * This callback is called at the very end of the tracing session. At + * this time, all the channels have been closed and the threads have been + * destroyed. + * + * @remarks After this callback is called, no other callback will be + * called again. + * + * @attention It has to be thread safe, it'll be called by many threads. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_trace_end)(struct liblttd_callbacks *data); + + /** + * This callback is called after a new thread has been created. + * + * @attention It has to be thread safe, it'll be called by many threads. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * @args thread_num This argument represents the id of the thread. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_new_thread)(struct liblttd_callbacks *data, + unsigned long thread_num); + + /** + * This callback is called just before a thread is destroyed. + * + * @attention It has to be thread safe, it'll be called by many threads. + * + * @args data This argument is a pointeur to the callbacks struct that + * has been passed to the lib. + * @args thread_num This argument represents the number of the thread. + * + * @return Should return 0 if the callback succeeds else not 0. + */ + int(*on_close_thread)(struct liblttd_callbacks *data, + unsigned long thread_num); + + /** + * This is where the user can put the library's data. + */ + void *user_data; +}; + +/** +* This function is called to start a new tracing session. +* +* @attention It has to be thread safe, it'll be called by many threads. +* +* @args channel_path This argument is a path to the root folder of the trace's +* channels. +* @args n_threads This argument represents the number of threads that will be +* used by the library. +* @args flight_only If this argument to set to 1, only the channel that are in +* flight recorder mode will be recorded. +* @args normal_only If this argument to set to 1, only the channel that are in +* normal mode will be recorded. +* @args verbose If this argument to set to 1, more informations will be printed. +* @args user_data This argument is a pointeur to the callbacks struct that +* contains the user's functions. +* +* @return Return 0 if the function succeeds else not 0. +*/ +int liblttd_start(char *channel_path, unsigned long n_threads, + int flight_only, int normal_only, int verbose, + struct liblttd_callbacks *user_data); + +/** +* This function is called to stop a tracing session. +* +* @return Return 0 if the function succeeds. +*/ +int liblttd_stop(); + +#endif /*_LIBLTTD_H */ + diff --git a/lttd/Makefile.am b/lttd/Makefile.am index bb860bc..fc9b219 100644 --- a/lttd/Makefile.am +++ b/lttd/Makefile.am @@ -6,3 +6,6 @@ bin_PROGRAMS = lttd lttd_SOURCES = lttd.c +lttd_DEPENDENCIES = ../liblttd/liblttd.la +lttd_LDADD = $(lttd_DEPENDENCIES) + diff --git a/lttd/lttd.c b/lttd/lttd.c index 78c2c80..ec757ae 100644 --- a/lttd/lttd.c +++ b/lttd/lttd.c @@ -30,125 +30,36 @@ #define _REENTRANT #define _GNU_SOURCE -#include + #include -#include -#include -#include -#include #include -#include #include -#include -#include -#include #include -#include -#include -#include -#include - -#include - -/* Relayfs IOCTL */ -#include -#include - -/* Get the next sub buffer that can be read. */ -#define RELAY_GET_SB _IOR(0xF5, 0x00,__u32) -/* Release the oldest reserved (by "get") sub buffer. */ -#define RELAY_PUT_SB _IOW(0xF5, 0x01,__u32) -/* returns the number of sub buffers in the per cpu channel. */ -#define RELAY_GET_N_SB _IOR(0xF5, 0x02,__u32) -/* returns the size of the current sub buffer. */ -#define RELAY_GET_SB_SIZE _IOR(0xF5, 0x03, __u32) -/* returns the size of data to consume in the current sub-buffer. */ -#define RELAY_GET_MAX_SB_SIZE _IOR(0xF5, 0x04, __u32) - - -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14) -#include -#if 0 /* should now be provided by libc. */ -/* From the inotify-tools 2.6 package */ -static inline int inotify_init (void) -{ - return syscall (__NR_inotify_init); -} - -static inline int inotify_add_watch (int fd, const char *name, __u32 mask) -{ - return syscall (__NR_inotify_add_watch, fd, name, mask); -} - -static inline int inotify_rm_watch (int fd, __u32 wd) -{ - return syscall (__NR_inotify_rm_watch, fd, wd); -} -#endif //0 -#define HAS_INOTIFY -#else -static inline int inotify_init (void) -{ - return -1; -} +#include +#include +#include +#include -static inline int inotify_add_watch (int fd, const char *name, __u32 mask) -{ - return 0; -} +#include -static inline int inotify_rm_watch (int fd, __u32 wd) -{ - return 0; -} -#undef HAS_INOTIFY -#endif - -struct fd_pair { - int channel; +struct lttd_channel_data { int trace; - unsigned int n_sb; - unsigned int max_sb_size; - void *mmap; - pthread_mutex_t mutex; -}; - -struct channel_trace_fd { - struct fd_pair *pair; - int num_pairs; -}; - -struct inotify_watch { - int wd; - char path_channel[PATH_MAX]; - char path_trace[PATH_MAX]; -}; - -struct inotify_watch_array { - struct inotify_watch *elem; - int num; }; -static __thread int thread_pipe[2]; - -struct channel_trace_fd fd_pairs = { NULL, 0 }; -int inotify_fd = -1; -struct inotify_watch_array inotify_watch_array = { NULL, 0 }; - -/* protects fd_pairs and inotify_watch_array */ -pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER; - - +static char path_trace[PATH_MAX]; +static char *end_path_trace; +static int path_trace_len = 0; static char *trace_name = NULL; static char *channel_name = NULL; static int daemon_mode = 0; static int append_mode = 0; static unsigned long num_threads = 1; -volatile static int quit_program = 0; /* For signal handler */ static int dump_flight_only = 0; static int dump_normal_only = 0; static int verbose_mode = 0; +static __thread int thread_pipe[2]; + #define printf_verbose(fmt, args...) \ do { \ if (verbose_mode) \ @@ -191,7 +102,7 @@ int parse_arguments(int argc, char **argv) { int ret = 0; int argn = 1; - + if(argc == 2) { if(strcmp(argv[1], "-h") == 0) { return 1; @@ -249,19 +160,19 @@ int parse_arguments(int argc, char **argv) } argn++; } - + if(trace_name == NULL) { printf("Please specify a trace name.\n"); printf("\n"); ret = -1; } - + if(channel_name == NULL) { printf("Please specify a channel name.\n"); printf("\n"); ret = -1; } - + return ret; } @@ -280,223 +191,96 @@ void show_info(void) static void handler(int signo) { printf("Signal %d received : exiting cleanly\n", signo); - quit_program = 1; + liblttd_stop(); } - -int open_buffer_file(char *filename, char *path_channel, char *path_trace, - struct channel_trace_fd *fd_pairs) +int lttd_on_open_channel(struct liblttd_callbacks *data, struct fd_pair *pair, char *relative_channel_path) { int open_ret = 0; - int ret = 0; + int ret; struct stat stat_buf; + struct lttd_channel_data *channel_data; - if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) { - if(dump_flight_only) { - printf_verbose("Skipping normal channel %s\n", - path_channel); - return 0; - } - } else { - if(dump_normal_only) { - printf_verbose("Skipping flight channel %s\n", - path_channel); - return 0; - } - } - printf_verbose("Opening file.\n"); - - fd_pairs->pair = realloc(fd_pairs->pair, - ++fd_pairs->num_pairs * sizeof(struct fd_pair)); + pair->user_data = malloc(sizeof(struct lttd_channel_data)); + channel_data = pair->user_data; + + strncpy(end_path_trace, relative_channel_path, PATH_MAX - path_trace_len); + printf_verbose("Creating trace file %s\n", path_trace); - /* Open the channel in read mode */ - fd_pairs->pair[fd_pairs->num_pairs-1].channel = - open(path_channel, O_RDONLY | O_NONBLOCK); - if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) { - perror(path_channel); - fd_pairs->num_pairs--; - return 0; /* continue */ - } - /* Open the trace in write mode, only append if append_mode */ ret = stat(path_trace, &stat_buf); if(ret == 0) { if(append_mode) { printf_verbose("Appending to file %s as requested\n", path_trace); - fd_pairs->pair[fd_pairs->num_pairs-1].trace = - open(path_trace, O_WRONLY, - S_IRWXU|S_IRWXG|S_IRWXO); - if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) { + channel_data->trace = open(path_trace, O_WRONLY, S_IRWXU|S_IRWXG|S_IRWXO); + if(channel_data->trace == -1) { perror(path_trace); open_ret = -1; - close(fd_pairs->pair[fd_pairs->num_pairs-1].channel); - fd_pairs->num_pairs--; goto end; } - ret = lseek(fd_pairs->pair[fd_pairs->num_pairs-1].trace, - 0, SEEK_END); + ret = lseek(channel_data->trace, 0, SEEK_END); if (ret < 0) { perror(path_trace); open_ret = -1; - close(fd_pairs->pair[fd_pairs->num_pairs-1].channel); - close(fd_pairs->pair[fd_pairs->num_pairs-1].trace); - fd_pairs->num_pairs--; + close(channel_data->trace); goto end; } } else { printf("File %s exists, cannot open. Try append mode.\n", path_trace); open_ret = -1; - close(fd_pairs->pair[fd_pairs->num_pairs-1].channel); - fd_pairs->num_pairs--; goto end; } } else { if(errno == ENOENT) { - fd_pairs->pair[fd_pairs->num_pairs-1].trace = - open(path_trace, O_WRONLY|O_CREAT|O_EXCL, - S_IRWXU|S_IRWXG|S_IRWXO); - if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) { + channel_data->trace = open(path_trace, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU|S_IRWXG|S_IRWXO); + if(channel_data->trace == -1) { perror(path_trace); open_ret = -1; - close(fd_pairs->pair[fd_pairs->num_pairs-1].channel); - fd_pairs->num_pairs--; goto end; } } } + end: return open_ret; + } -int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, - struct channel_trace_fd *fd_pairs, int *inotify_fd, - struct inotify_watch_array *iwatch_array) +int lttd_on_close_channel(struct liblttd_callbacks *data, struct fd_pair *pair) +{ + int ret; + ret = close(((struct lttd_channel_data *)(pair->user_data))->trace); + free(pair->user_data); + return ret; +} + +int lttd_on_new_channels_folder(struct liblttd_callbacks *data, char *relative_folder_path) { - DIR *channel_dir = opendir(subchannel_name); - struct dirent *entry; - struct stat stat_buf; int ret; - char path_channel[PATH_MAX]; - int path_channel_len; - char *path_channel_ptr; - char path_trace[PATH_MAX]; - int path_trace_len; - char *path_trace_ptr; int open_ret = 0; - if(channel_dir == NULL) { - perror(subchannel_name); - open_ret = ENOENT; - goto end; - } + strncpy(end_path_trace, relative_folder_path, PATH_MAX - path_trace_len); + printf_verbose("Creating trace subdirectory %s\n", path_trace); - printf_verbose("Creating trace subdirectory %s\n", subtrace_name); - ret = mkdir(subtrace_name, S_IRWXU|S_IRWXG|S_IRWXO); + ret = mkdir(path_trace, S_IRWXU|S_IRWXG|S_IRWXO); if(ret == -1) { if(errno != EEXIST) { - perror(subtrace_name); + perror(path_trace); open_ret = -1; goto end; } } - strncpy(path_channel, subchannel_name, PATH_MAX-1); - path_channel_len = strlen(path_channel); - path_channel[path_channel_len] = '/'; - path_channel_len++; - path_channel_ptr = path_channel + path_channel_len; - - strncpy(path_trace, subtrace_name, PATH_MAX-1); - path_trace_len = strlen(path_trace); - path_trace[path_trace_len] = '/'; - path_trace_len++; - path_trace_ptr = path_trace + path_trace_len; - -#ifdef HAS_INOTIFY - iwatch_array->elem = realloc(iwatch_array->elem, - ++iwatch_array->num * sizeof(struct inotify_watch)); - - printf_verbose("Adding inotify for channel %s\n", path_channel); - iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE); - strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel); - strcpy(iwatch_array->elem[iwatch_array->num-1].path_trace, path_trace); - printf_verbose("Added inotify for channel %s, wd %u\n", - iwatch_array->elem[iwatch_array->num-1].path_channel, - iwatch_array->elem[iwatch_array->num-1].wd); -#endif - - while((entry = readdir(channel_dir)) != NULL) { - - if(entry->d_name[0] == '.') continue; - - strncpy(path_channel_ptr, entry->d_name, PATH_MAX - path_channel_len); - strncpy(path_trace_ptr, entry->d_name, PATH_MAX - path_trace_len); - - ret = stat(path_channel, &stat_buf); - if(ret == -1) { - perror(path_channel); - continue; - } - - printf_verbose("Channel file : %s\n", path_channel); - - if(S_ISDIR(stat_buf.st_mode)) { - - printf_verbose("Entering channel subdirectory...\n"); - ret = open_channel_trace_pairs(path_channel, path_trace, fd_pairs, - inotify_fd, iwatch_array); - if(ret < 0) continue; - } else if(S_ISREG(stat_buf.st_mode)) { - open_ret = open_buffer_file(entry->d_name, path_channel, path_trace, - fd_pairs); - if(open_ret) - goto end; - } - } - end: - closedir(channel_dir); - return open_ret; } - -int read_subbuffer(struct fd_pair *pair) +int lttd_on_read_subbuffer(struct liblttd_callbacks *data, struct fd_pair *pair, unsigned int len) { - unsigned int consumed_old, len; - int err; long ret; - off_t offset; - + off_t offset = 0; - err = ioctl(pair->channel, RELAY_GET_SB, &consumed_old); - printf_verbose("cookie : %u\n", consumed_old); - if(err != 0) { - ret = errno; - perror("Reserving sub buffer failed (everything is normal, it is due to concurrency)"); - goto get_error; - } -#if 0 - err = TEMP_FAILURE_RETRY(write(pair->trace, - pair->mmap - + (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)), - pair->subbuf_size)); - - if(err < 0) { - ret = errno; - perror("Error in writing to file"); - goto write_error; - } -#endif //0 - err = ioctl(pair->channel, RELAY_GET_SB_SIZE, &len); - if(err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto get_error; - } - - offset = 0; while (len > 0) { printf_verbose("splice chan to pipe offset %lu\n", (unsigned long)offset); @@ -507,8 +291,9 @@ int read_subbuffer(struct fd_pair *pair) perror("Error in relay splice"); goto write_error; } - ret = splice(thread_pipe[0], NULL, pair->trace, NULL, - ret, SPLICE_F_MOVE | SPLICE_F_MORE); + ret = splice(thread_pipe[0], NULL, + ((struct lttd_channel_data *)(pair->user_data))->trace, + NULL, ret, SPLICE_F_MOVE | SPLICE_F_MORE); printf_verbose("splice pipe to file %ld\n", ret); if (ret < 0) { perror("Error in file splice"); @@ -517,466 +302,42 @@ int read_subbuffer(struct fd_pair *pair) len -= ret; } -#if 0 - err = fsync(pair->trace); - if(err < 0) { - ret = errno; - perror("Error in writing to file"); - goto write_error; - } -#endif //0 write_error: - ret = 0; - err = ioctl(pair->channel, RELAY_PUT_SB, &consumed_old); - if(err != 0) { - ret = errno; - if(errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if(errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto get_error; - } - -get_error: - return ret; -} - - -int map_channels(struct channel_trace_fd *fd_pairs, - int idx_begin, int idx_end) -{ - int i,j; - int ret=0; - - if(fd_pairs->num_pairs <= 0) { - printf("No channel to read\n"); - goto end; - } - - /* Get the subbuf sizes and number */ - - for(i=idx_begin;ipair[i]; - - ret = ioctl(pair->channel, RELAY_GET_N_SB, &pair->n_sb); - if(ret != 0) { - perror("Error in getting the number of sub-buffers"); - goto end; - } - ret = ioctl(pair->channel, RELAY_GET_MAX_SB_SIZE, - &pair->max_sb_size); - if(ret != 0) { - perror("Error in getting the max sub-buffer size"); - goto end; - } - ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */ - if(ret != 0) { - perror("Error in mutex init"); - goto end; - } - } - -#if 0 - /* Mmap each FD */ - for(i=idx_begin;ipair[i]; - - pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ, - MAP_SHARED, pair->channel, 0); - if(pair->mmap == MAP_FAILED) { - perror("Mmap error"); - goto munmap; - } - } - - goto end; /* success */ - - /* Error handling */ - /* munmap only the successfully mmapped indexes */ -munmap: - /* Munmap each FD */ - for(j=idx_begin;jpair[j]; - int err_ret; - - err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); - if(err_ret != 0) { - perror("Error in munmap"); - } - ret |= err_ret; - } - -#endif //0 -end: return ret; } -int unmap_channels(struct channel_trace_fd *fd_pairs) -{ - int j; - int ret=0; - - /* Munmap each FD */ - for(j=0;jnum_pairs;j++) { - struct fd_pair *pair = &fd_pairs->pair[j]; - int err_ret; - -#if 0 - err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); - if(err_ret != 0) { - perror("Error in munmap"); - } - ret |= err_ret; -#endif //0 - err_ret = pthread_mutex_destroy(&pair->mutex); - if(err_ret != 0) { - perror("Error in mutex destroy"); - } - ret |= err_ret; - } - - return ret; -} - -#ifdef HAS_INOTIFY -/* Inotify event arrived. - * - * Only support add file for now. - */ - -int read_inotify(int inotify_fd, - struct channel_trace_fd *fd_pairs, - struct inotify_watch_array *iwatch_array) -{ - char buf[sizeof(struct inotify_event) + PATH_MAX]; - char path_channel[PATH_MAX]; - char path_trace[PATH_MAX]; - ssize_t len; - struct inotify_event *ievent; - size_t offset; - unsigned int i; - int ret; - int old_num; - - offset = 0; - len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX); - if(len < 0) { - - if(errno == EAGAIN) - return 0; /* another thread got the data before us */ - - printf("Error in read from inotify FD %s.\n", strerror(len)); - return -1; - } - while(offset < len) { - ievent = (struct inotify_event *)&(buf[offset]); - for(i=0; inum; i++) { - if(iwatch_array->elem[i].wd == ievent->wd && - ievent->mask == IN_CREATE) { - printf_verbose( - "inotify wd %u event mask : %u for %s%s\n", - ievent->wd, ievent->mask, - iwatch_array->elem[i].path_channel, - ievent->name); - old_num = fd_pairs->num_pairs; - strcpy(path_channel, iwatch_array->elem[i].path_channel); - strcat(path_channel, ievent->name); - strcpy(path_trace, iwatch_array->elem[i].path_trace); - strcat(path_trace, ievent->name); - if(ret = open_buffer_file(ievent->name, path_channel, - path_trace, fd_pairs)) { - printf("Error opening buffer file\n"); - return -1; - } - if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) { - printf("Error mapping channel\n"); - return -1; - } - - } - } - offset += sizeof(*ievent) + ievent->len; - } -} -#endif //HAS_INOTIFY - -/* read_channels - * - * Thread worker. - * - * Read the debugfs channels and write them in the paired tracefiles. - * - * @fd_pairs : paired channels and trace files. - * - * returns 0 on success, -1 on error. - * - * Note that the high priority polled channels are consumed first. We then poll - * again to see if these channels are still in priority. Only when no - * high priority channel is left, we start reading low priority channels. - * - * Note that a channel is considered high priority when the buffer is almost - * full. - */ - -int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs, - int inotify_fd, struct inotify_watch_array *iwatch_array) -{ - struct pollfd *pollfd = NULL; - int num_pollfd; - int i,j; - int num_rdy, num_hup; - int high_prio; - int ret = 0; - int inotify_fds; - unsigned int old_num; - -#ifdef HAS_INOTIFY - inotify_fds = 1; -#else - inotify_fds = 0; -#endif - - pthread_rwlock_rdlock(&fd_pairs_lock); - - /* Start polling the FD. Keep one fd for inotify */ - pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd)); - -#ifdef HAS_INOTIFY - pollfd[0].fd = inotify_fd; - pollfd[0].events = POLLIN|POLLPRI; -#endif - - for(i=0;inum_pairs;i++) { - pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel; - pollfd[inotify_fds+i].events = POLLIN|POLLPRI; - } - num_pollfd = inotify_fds + fd_pairs->num_pairs; - - - pthread_rwlock_unlock(&fd_pairs_lock); - - while(1) { - high_prio = 0; - num_hup = 0; -#ifdef DEBUG - printf("Press a key for next poll...\n"); - char buf[1]; - read(STDIN_FILENO, &buf, 1); - printf("Next poll (polling %d fd) :\n", num_pollfd); -#endif //DEBUG - - /* Have we received a signal ? */ - if(quit_program) break; - - num_rdy = poll(pollfd, num_pollfd, -1); - - if(num_rdy == -1) { - perror("Poll error"); - goto free_fd; - } - - printf_verbose("Data received\n"); -#ifdef HAS_INOTIFY - switch(pollfd[0].revents) { - case POLLERR: - printf_verbose( - "Error returned in polling inotify fd %d.\n", - pollfd[0].fd); - break; - case POLLHUP: - printf_verbose( - "Polling inotify fd %d tells it has hung up.\n", - pollfd[0].fd); - break; - case POLLNVAL: - printf_verbose( - "Polling inotify fd %d tells fd is not open.\n", - pollfd[0].fd); - break; - case POLLPRI: - case POLLIN: - printf_verbose( - "Polling inotify fd %d : data ready.\n", - pollfd[0].fd); - - pthread_rwlock_wrlock(&fd_pairs_lock); - read_inotify(inotify_fd, fd_pairs, iwatch_array); - pthread_rwlock_unlock(&fd_pairs_lock); - - break; - } -#endif - - for(i=inotify_fds;ipair[i-inotify_fds].mutex) == 0) { - printf_verbose( - "Urgent read on fd %d\n", - pollfd[i].fd); - /* Take care of high priority channels first. */ - high_prio = 1; - /* it's ok to have an unavailable sub-buffer */ - ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]); - if(ret == EAGAIN) ret = 0; - - ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex); - if(ret) - printf("Error in mutex unlock : %s\n", strerror(ret)); - } - pthread_rwlock_unlock(&fd_pairs_lock); - break; - } - } - /* If every buffer FD has hung up, we end the read loop here */ - if(num_hup == num_pollfd - inotify_fds) break; - - if(!high_prio) { - for(i=inotify_fds;ipair[i-inotify_fds].mutex) == 0) { - /* Take care of low priority channels. */ - printf_verbose( - "Normal read on fd %d\n", - pollfd[i].fd); - /* it's ok to have an unavailable subbuffer */ - ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]); - if(ret == EAGAIN) ret = 0; - - ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex); - if(ret) - printf("Error in mutex unlock : %s\n", strerror(ret)); - } - pthread_rwlock_unlock(&fd_pairs_lock); - break; - } - } - } - - /* Update pollfd array if an entry was added to fd_pairs */ - pthread_rwlock_rdlock(&fd_pairs_lock); - if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) { - pollfd = realloc(pollfd, - (inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd)); - for(i=num_pollfd-inotify_fds;inum_pairs;i++) { - pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel; - pollfd[inotify_fds+i].events = POLLIN|POLLPRI; - } - num_pollfd = fd_pairs->num_pairs + inotify_fds; - } - pthread_rwlock_unlock(&fd_pairs_lock); - - /* NB: If the fd_pairs structure is updated by another thread from this - * point forward, the current thread will wait in the poll without - * monitoring the new channel. However, this thread will add the - * new channel on next poll (and this should not take too much time - * on a loaded system). - * - * This event is quite unlikely and can only occur if a CPU is - * hot-plugged while multple lttd threads are running. - */ - } - -free_fd: - free(pollfd); - -end: - return ret; -} - - -void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd, - struct inotify_watch_array *iwatch_array) -{ - int i; +int on_new_thread(struct liblttd_callbacks *data, unsigned long thread_num) { int ret; - - for(i=0;inum_pairs;i++) { - ret = close(fd_pairs->pair[i].channel); - if(ret == -1) perror("Close error on channel"); - ret = close(fd_pairs->pair[i].trace); - if(ret == -1) perror("Close error on trace"); - } - free(fd_pairs->pair); - free(iwatch_array->elem); -} - -/* Thread worker */ -void * thread_main(void *arg) -{ - long ret; - unsigned long thread_num = (unsigned long)arg; - ret = pipe(thread_pipe); if (ret < 0) { perror("Error creating pipe"); - return (void*)ret; + return ret; } - ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array); - close(thread_pipe[0]); /* close read end */ - close(thread_pipe[1]); /* close write end */ - return (void*)ret; + return 0; } - -int channels_init() -{ - int ret = 0; - - inotify_fd = inotify_init(); - fcntl(inotify_fd, F_SETFL, O_NONBLOCK); - - if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs, - &inotify_fd, &inotify_watch_array)) - goto close_channel; - if (fd_pairs.num_pairs == 0) { - printf("No channel available for reading, exiting\n"); - ret = -ENOENT; - goto close_channel; - } - if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs)) - goto close_channel; +int on_close_thread(struct liblttd_callbacks *data, unsigned long thread_num) { + close(thread_pipe[0]); /* close read end */ + close(thread_pipe[1]); /* close write end */ return 0; - -close_channel: - close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array); - if(inotify_fd >= 0) - close(inotify_fd); - return ret; } - int main(int argc, char ** argv) { int ret = 0; struct sigaction act; - pthread_t *tids; - unsigned long i; - void *tret; - + + struct liblttd_callbacks callbacks = { + lttd_on_open_channel, + lttd_on_close_channel, + lttd_on_new_channels_folder, + lttd_on_read_subbuffer, + NULL, + on_new_thread, + on_close_thread, + NULL + }; + ret = parse_arguments(argc, argv); if(ret != 0) show_arguments(); @@ -996,9 +357,6 @@ int main(int argc, char ** argv) sigaction(SIGQUIT, &act, NULL); sigaction(SIGINT, &act, NULL); - if(ret = channels_init()) - return ret; - if(daemon_mode) { ret = daemon(0, 0); @@ -1008,33 +366,13 @@ int main(int argc, char ** argv) } } - tids = malloc(sizeof(pthread_t) * num_threads); - for(i=0; i= 0) - close(inotify_fd); - return ret; } + -- 2.34.1