fix lttd mutexes
[ltt-control.git] / ltt-control / lttd / lttd.c
index 0deb40230a61007fffc235660d055f19d529e092..b00142639098c859b17e22d1c8f94c2db8f39312 100644 (file)
@@ -2,9 +2,10 @@
  *
  * Linux Trace Toolkit Daemon
  *
- * This is a simple daemon that reads a few relayfs channels and save them in a
- * trace.
+ * This is a simple daemon that reads a few relay+debugfs channels and save
+ * them in a trace.
  *
+ * CPU hot-plugging is supported using inotify.
  *
  * Copyright 2005 -
  *     Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
 #include <sys/mman.h>
 #include <signal.h>
 #include <pthread.h>
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <asm/ioctls.h>
+
+#include <linux/version.h>
 
 /* Relayfs IOCTL */
 #include <asm/ioctl.h>
 #include <asm/types.h>
 
 /* Get the next sub buffer that can be read. */
-#define RELAYFS_GET_SUBBUF        _IOR(0xF4, 0x00,__u32)
+#define RELAY_GET_SUBBUF        _IOR(0xF5, 0x00,__u32)
 /* Release the oldest reserved (by "get") sub buffer. */
-#define RELAYFS_PUT_SUBBUF        _IOW(0xF4, 0x01,__u32)
+#define RELAY_PUT_SUBBUF        _IOW(0xF5, 0x01,__u32)
 /* returns the number of sub buffers in the per cpu channel. */
-#define RELAYFS_GET_N_SUBBUFS     _IOR(0xF4, 0x02,__u32)
+#define RELAY_GET_N_SUBBUFS     _IOR(0xF5, 0x02,__u32)
 /* returns the size of the sub buffers. */
-#define RELAYFS_GET_SUBBUF_SIZE   _IOR(0xF4, 0x03,__u32)
+#define RELAY_GET_SUBBUF_SIZE   _IOR(0xF5, 0x03,__u32)
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
+#include <linux/inotify.h>
+/* From the inotify-tools 2.6 package */
+static inline int inotify_init (void)
+{
+       return syscall (__NR_inotify_init);
+}
+
+static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
+{
+       return syscall (__NR_inotify_add_watch, fd, name, mask);
+}
+
+static inline int inotify_rm_watch (int fd, __u32 wd)
+{
+       return syscall (__NR_inotify_rm_watch, fd, wd);
+}
+#define HAS_INOTIFY
+#else
+static inline int inotify_init (void)
+{
+       return -1;
+}
 
+static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
+{
+       return 0;
+}
 
+static inline int inotify_rm_watch (int fd, __u32 wd)
+{
+       return 0;
+}
+#undef HAS_INOTIFY
+#endif
 
 enum {
        GET_SUBBUF,
@@ -67,17 +107,40 @@ struct channel_trace_fd {
        int num_pairs;
 };
 
-static char *trace_name = NULL;
-static char *channel_name = NULL;
-static int     daemon_mode = 0;
-static int     append_mode = 0;
-static unsigned long num_threads = 1;
+struct inotify_watch {
+       int wd;
+       char path_channel[PATH_MAX];
+       char path_trace[PATH_MAX];
+};
+
+struct inotify_watch_array {
+       struct inotify_watch *elem;
+       int num;
+};
+
+
+
+struct channel_trace_fd fd_pairs = { NULL, 0 };
+int inotify_fd = -1;
+struct inotify_watch_array inotify_watch_array = { NULL, 0 };
+
+/* protects fd_pairs and inotify_watch_array */
+pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER;
+
+
+static char            *trace_name = NULL;
+static char            *channel_name = NULL;
+static int             daemon_mode = 0;
+static int             append_mode = 0;
+static unsigned long   num_threads = 1;
 volatile static int    quit_program = 0;       /* For signal handler */
+static int             dump_flight_only = 0;
+static int             dump_normal_only = 0;
 
 /* Args :
  *
  * -t directory                Directory name of the trace to write to. Will be created.
- * -c directory                Root directory of the relayfs trace channels.
+ * -c directory                Root directory of the debugfs trace channels.
  * -d                          Run in background (daemon).
  * -a                                                  Trace append mode.
  * -s                                                  Send SIGUSR1 to parent when ready for IO.
@@ -88,10 +151,12 @@ void show_arguments(void)
        printf("\n");
        printf("-t directory  Directory name of the trace to write to.\n"
                                 "              It will be created.\n");
-       printf("-c directory  Root directory of the relayfs trace channels.\n");
+       printf("-c directory  Root directory of the debugfs trace channels.\n");
        printf("-d            Run in background (daemon).\n");
        printf("-a            Append to an possibly existing trace.\n");
        printf("-N            Number of threads to start.\n");
+       printf("-f            Dump only flight recorder channels.\n");
+       printf("-n            Dump only normal channels.\n");
        printf("\n");
 }
 
@@ -143,6 +208,12 @@ int parse_arguments(int argc, char **argv)
                                                        argn++;
                                                }
                                                break;
+                                       case 'f':
+                                               dump_flight_only = 1;
+                                               break;
+                                       case 'n':
+                                               dump_normal_only = 1;
+                                               break;
                                        default:
                                                printf("Invalid argument '%s'.\n", argv[argn]);
                                                printf("\n");
@@ -174,9 +245,9 @@ int parse_arguments(int argc, char **argv)
 
 void show_info(void)
 {
-       printf("Linux Trace Toolkit Trace Daemon\n");
+       printf("Linux Trace Toolkit Trace Daemon " VERSION "\n");
        printf("\n");
-       printf("Reading from relayfs directory : %s\n", channel_name);
+       printf("Reading from debugfs directory : %s\n", channel_name);
        printf("Writing to trace directory : %s\n", trace_name);
        printf("\n");
 }
@@ -191,9 +262,72 @@ static void handler(int signo)
 }
 
 
+int open_buffer_file(char *filename, char *path_channel, char *path_trace,
+       struct channel_trace_fd *fd_pairs)
+{
+       int open_ret = 0;
+       int ret = 0;
+       struct stat stat_buf;
+
+       if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) {
+               if(dump_flight_only) {
+                       printf("Skipping normal channel %s\n", path_channel);
+                       return 0;
+               }
+       } else {
+               if(dump_normal_only) {
+                       printf("Skipping flight channel %s\n", path_channel);
+                       return 0;
+               }
+       }
+       printf("Opening file.\n");
+       
+       fd_pairs->pair = realloc(fd_pairs->pair,
+                       ++fd_pairs->num_pairs * sizeof(struct fd_pair));
+
+       /* Open the channel in read mode */
+       fd_pairs->pair[fd_pairs->num_pairs-1].channel = 
+               open(path_channel, O_RDONLY | O_NONBLOCK);
+       if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) {
+               perror(path_channel);
+               fd_pairs->num_pairs--;
+               return 0;       /* continue */
+       }
+       /* Open the trace in write mode, only append if append_mode */
+       ret = stat(path_trace, &stat_buf);
+       if(ret == 0) {
+               if(append_mode) {
+                       printf("Appending to file %s as requested\n", path_trace);
+
+                       fd_pairs->pair[fd_pairs->num_pairs-1].trace = 
+                               open(path_trace, O_WRONLY|O_APPEND,
+                                               S_IRWXU|S_IRWXG|S_IRWXO);
+
+                       if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
+                               perror(path_trace);
+                       }
+               } else {
+                       printf("File %s exists, cannot open. Try append mode.\n", path_trace);
+                       open_ret = -1;
+                       goto end;
+               }
+       } else {
+               if(errno == ENOENT) {
+                       fd_pairs->pair[fd_pairs->num_pairs-1].trace = 
+                               open(path_trace, O_WRONLY|O_CREAT|O_EXCL,
+                                               S_IRWXU|S_IRWXG|S_IRWXO);
+                       if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
+                               perror(path_trace);
+                       }
+               }
+       }
+end:
+       return open_ret;
+}
 
 int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
-               struct channel_trace_fd *fd_pairs)
+               struct channel_trace_fd *fd_pairs, int *inotify_fd,
+               struct inotify_watch_array *iwatch_array)
 {
        DIR *channel_dir = opendir(subchannel_name);
        struct dirent *entry;
@@ -205,12 +339,12 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
        char path_trace[PATH_MAX];
        int path_trace_len;
        char *path_trace_ptr;
-  int open_ret = 0;
+       int open_ret = 0;
 
        if(channel_dir == NULL) {
                perror(subchannel_name);
                open_ret = ENOENT;
-    goto end;
+               goto end;
        }
 
        printf("Creating trace subdirectory %s\n", subtrace_name);
@@ -218,7 +352,7 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
        if(ret == -1) {
                if(errno != EEXIST) {
                        perror(subtrace_name);
-      open_ret = -1;
+                       open_ret = -1;
                        goto end;
                }
        }
@@ -235,6 +369,18 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
        path_trace_len++;
        path_trace_ptr = path_trace + path_trace_len;
        
+#ifdef HAS_INOTIFY
+       iwatch_array->elem = realloc(iwatch_array->elem,
+               ++iwatch_array->num * sizeof(struct inotify_watch));
+       
+       printf("Adding inotify for channel %s\n", path_channel);
+       iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE);
+       strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel);
+       strcpy(iwatch_array->elem[iwatch_array->num-1].path_trace, path_trace);
+       printf("Added inotify for channel %s, wd %u\n", iwatch_array->elem[iwatch_array->num-1].path_channel,
+               iwatch_array->elem[iwatch_array->num-1].wd);
+#endif
+
        while((entry = readdir(channel_dir)) != NULL) {
 
                if(entry->d_name[0] == '.') continue;
@@ -253,50 +399,14 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
                if(S_ISDIR(stat_buf.st_mode)) {
 
                        printf("Entering channel subdirectory...\n");
-                       ret = open_channel_trace_pairs(path_channel, path_trace, fd_pairs);
+                       ret = open_channel_trace_pairs(path_channel, path_trace, fd_pairs,
+                               inotify_fd, iwatch_array);
                        if(ret < 0) continue;
                } else if(S_ISREG(stat_buf.st_mode)) {
-                       printf("Opening file.\n");
-                       
-                       fd_pairs->pair = realloc(fd_pairs->pair,
-                                       ++fd_pairs->num_pairs * sizeof(struct fd_pair));
-
-                       /* Open the channel in read mode */
-                       fd_pairs->pair[fd_pairs->num_pairs-1].channel = 
-                               open(path_channel, O_RDONLY | O_NONBLOCK);
-                       if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) {
-                               perror(path_channel);
-                               fd_pairs->num_pairs--;
-                               continue;
-                       }
-                       /* Open the trace in write mode, only append if append_mode */
-                       ret = stat(path_trace, &stat_buf);
-                       if(ret == 0) {
-                               if(append_mode) {
-                                       printf("Appending to file %s as requested\n", path_trace);
-
-                                       fd_pairs->pair[fd_pairs->num_pairs-1].trace = 
-                                               open(path_trace, O_WRONLY|O_APPEND,
-                                                               S_IRWXU|S_IRWXG|S_IRWXO);
-
-                                       if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
-                                               perror(path_trace);
-                                       }
-                               } else {
-                                       printf("File %s exists, cannot open. Try append mode.\n", path_trace);
-                                       open_ret = -1;
-          goto end;
-                               }
-                       } else {
-                               if(errno == ENOENT) {
-                                       fd_pairs->pair[fd_pairs->num_pairs-1].trace = 
-                                               open(path_trace, O_WRONLY|O_CREAT|O_EXCL,
-                                                               S_IRWXU|S_IRWXG|S_IRWXO);
-                                       if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
-                                               perror(path_trace);
-                                       }
-                               }
-                       }
+                       open_ret = open_buffer_file(entry->d_name, path_channel, path_trace,
+                               fd_pairs);
+                       if(open_ret)
+                               goto end;
                }
        }
        
@@ -313,7 +423,7 @@ int read_subbuffer(struct fd_pair *pair)
        int err, ret=0;
 
 
-       err = ioctl(pair->channel, RELAYFS_GET_SUBBUF, 
+       err = ioctl(pair->channel, RELAY_GET_SUBBUF, 
                                                                &consumed_old);
        printf("cookie : %u\n", consumed_old);
        if(err != 0) {
@@ -341,7 +451,7 @@ int read_subbuffer(struct fd_pair *pair)
        }
 #endif //0
 write_error:
-       err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old);
+       err = ioctl(pair->channel, RELAY_PUT_SUBBUF, &consumed_old);
        if(err != 0) {
                ret = errno;
                if(errno == EFAULT) {
@@ -359,7 +469,8 @@ get_error:
 
 
 
-int map_channels(struct channel_trace_fd *fd_pairs)
+int map_channels(struct channel_trace_fd *fd_pairs,
+       int idx_begin, int idx_end)
 {
        int i,j;
        int ret=0;
@@ -371,16 +482,16 @@ int map_channels(struct channel_trace_fd *fd_pairs)
        
        /* Get the subbuf sizes and number */
 
-       for(i=0;i<fd_pairs->num_pairs;i++) {
+       for(i=idx_begin;i<idx_end;i++) {
                struct fd_pair *pair = &fd_pairs->pair[i];
 
-               ret = ioctl(pair->channel, RELAYFS_GET_N_SUBBUFS, 
+               ret = ioctl(pair->channel, RELAY_GET_N_SUBBUFS, 
                                                        &pair->n_subbufs);
                if(ret != 0) {
                        perror("Error in getting the number of subbuffers");
                        goto end;
                }
-               ret = ioctl(pair->channel, RELAYFS_GET_SUBBUF_SIZE, 
+               ret = ioctl(pair->channel, RELAY_GET_SUBBUF_SIZE, 
                                                        &pair->subbuf_size);
                if(ret != 0) {
                        perror("Error in getting the size of the subbuffers");
@@ -394,7 +505,7 @@ int map_channels(struct channel_trace_fd *fd_pairs)
        }
 
        /* Mmap each FD */
-       for(i=0;i<fd_pairs->num_pairs;i++) {
+       for(i=idx_begin;i<idx_end;i++) {
                struct fd_pair *pair = &fd_pairs->pair[i];
 
                pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ,
@@ -411,7 +522,7 @@ int map_channels(struct channel_trace_fd *fd_pairs)
        /* munmap only the successfully mmapped indexes */
 munmap:
                /* Munmap each FD */
-       for(j=0;j<i;j++) {
+       for(j=idx_begin;j<i;j++) {
                struct fd_pair *pair = &fd_pairs->pair[j];
                int err_ret;
 
@@ -454,16 +565,75 @@ int unmap_channels(struct channel_trace_fd *fd_pairs)
        return ret;
 }
 
+#ifdef HAS_INOTIFY
+/* Inotify event arrived.
+ *
+ * Only support add file for now.
+ */
+
+int read_inotify(int inotify_fd,
+       struct channel_trace_fd *fd_pairs,
+       struct inotify_watch_array *iwatch_array)
+{
+       char buf[sizeof(struct inotify_event) + PATH_MAX];
+       char path_channel[PATH_MAX];
+       char path_trace[PATH_MAX];
+       ssize_t len;
+       struct inotify_event *ievent;
+       size_t offset;
+       unsigned int i;
+       int ret;
+       int old_num;
+       
+       offset = 0;
+       len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX);
+       if(len < 0) {
+
+               if(errno == EAGAIN)
+                       return 0;  /* another thread got the data before us */
+
+               printf("Error in read from inotify FD %s.\n", strerror(len));
+               return -1;
+       }
+       while(offset < len) {
+               ievent = (struct inotify_event *)&(buf[offset]);
+               for(i=0; i<iwatch_array->num; i++) {
+                       if(iwatch_array->elem[i].wd == ievent->wd &&
+                               ievent->mask == IN_CREATE) {
+                               printf("inotify wd %u event mask : %u for %s%s\n",
+                                       ievent->wd, ievent->mask,
+                                       iwatch_array->elem[i].path_channel, ievent->name);
+                               old_num = fd_pairs->num_pairs;
+                               strcpy(path_channel, iwatch_array->elem[i].path_channel);
+                               strcat(path_channel, ievent->name);
+                               strcpy(path_trace, iwatch_array->elem[i].path_trace);
+                               strcat(path_trace, ievent->name);
+                               if(ret = open_buffer_file(ievent->name, path_channel,
+                                       path_trace, fd_pairs)) {
+                                       printf("Error opening buffer file\n");
+                                       return -1;
+                               }
+                               if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) {
+                                       printf("Error mapping channel\n");
+                                       return -1;
+                               }
+
+                       }
+               }
+               offset += sizeof(*ievent) + ievent->len;
+       }
+}
+#endif //HAS_INOTIFY
 
 /* read_channels
  *
  * Thread worker.
  *
- * Read the relayfs channels and write them in the paired tracefiles.
+ * Read the debugfs channels and write them in the paired tracefiles.
  *
  * @fd_pairs : paired channels and trace files.
  *
- * returns (void*)0 on success, (void*)-1 on error.
+ * returns 0 on success, -1 on error.
  *
  * Note that the high priority polled channels are consumed first. We then poll
  * again to see if these channels are still in priority. Only when no
@@ -473,24 +643,42 @@ int unmap_channels(struct channel_trace_fd *fd_pairs)
  * full.
  */
 
-void * read_channels(void *arg)
+int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs,
+       int inotify_fd, struct inotify_watch_array *iwatch_array)
 {
-       struct pollfd *pollfd;
+       struct pollfd *pollfd = NULL;
+       int num_pollfd;
        int i,j;
        int num_rdy, num_hup;
        int high_prio;
        int ret = 0;
-       struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg;
+       int inotify_fds;
+       unsigned int old_num;
 
-       /* Start polling the FD */
-       
-       pollfd = malloc(fd_pairs->num_pairs * sizeof(struct pollfd));
+#ifdef HAS_INOTIFY
+       inotify_fds = 1;
+#else
+       inotify_fds = 0;
+#endif
+
+       pthread_rwlock_rdlock(&fd_pairs_lock);
+
+       /* Start polling the FD. Keep one fd for inotify */
+       pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
+
+#ifdef HAS_INOTIFY
+       pollfd[0].fd = inotify_fd;
+       pollfd[0].events = POLLIN|POLLPRI;
+#endif
 
-       /* Note : index in pollfd is the same index as fd_pair->pair */
        for(i=0;i<fd_pairs->num_pairs;i++) {
-               pollfd[i].fd = fd_pairs->pair[i].channel;
-               pollfd[i].events = POLLIN|POLLPRI;
+               pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
+               pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
        }
+       num_pollfd = inotify_fds + fd_pairs->num_pairs;
+
+
+       pthread_rwlock_unlock(&fd_pairs_lock);
 
        while(1) {
                high_prio = 0;
@@ -499,21 +687,45 @@ void * read_channels(void *arg)
                printf("Press a key for next poll...\n");
                char buf[1];
                read(STDIN_FILENO, &buf, 1);
-               printf("Next poll (polling %d fd) :\n", fd_pairs->num_pairs);
+               printf("Next poll (polling %d fd) :\n", num_pollfd);
 #endif //DEBUG
-               
+
                /* Have we received a signal ? */
                if(quit_program) break;
                
-               num_rdy = poll(pollfd, fd_pairs->num_pairs, -1);
+               num_rdy = poll(pollfd, num_pollfd, -1);
+
                if(num_rdy == -1) {
                        perror("Poll error");
                        goto free_fd;
                }
 
                printf("Data received\n");
+#ifdef HAS_INOTIFY
+               switch(pollfd[0].revents) {
+                       case POLLERR:
+                               printf("Error returned in polling inotify fd %d.\n", pollfd[0].fd);
+                               break;
+                       case POLLHUP:
+                               printf("Polling inotify fd %d tells it has hung up.\n", pollfd[0].fd);
+                               break;
+                       case POLLNVAL:
+                               printf("Polling inotify fd %d tells fd is not open.\n", pollfd[0].fd);
+                               break;
+                       case POLLPRI:
+                       case POLLIN:
+
+                               printf("Polling inotify fd %d : data ready.\n", pollfd[0].fd);
+
+                               pthread_rwlock_wrlock(&fd_pairs_lock);
+                               read_inotify(inotify_fd, fd_pairs, iwatch_array);
+                               pthread_rwlock_unlock(&fd_pairs_lock);
 
-               for(i=0;i<fd_pairs->num_pairs;i++) {
+                       break;
+               }
+#endif
+
+               for(i=inotify_fds;i<num_pollfd;i++) {
                        switch(pollfd[i].revents) {
                                case POLLERR:
                                        printf("Error returned in polling fd %d.\n", pollfd[i].fd);
@@ -528,55 +740,82 @@ void * read_channels(void *arg)
                                        num_hup++;
                                        break;
                                case POLLPRI:
-                                       if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+                                       pthread_rwlock_rdlock(&fd_pairs_lock);
+                                       if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
                                                printf("Urgent read on fd %d\n", pollfd[i].fd);
                                                /* Take care of high priority channels first. */
                                                high_prio = 1;
                                                /* it's ok to have an unavailable subbuffer */
-                                               ret = read_subbuffer(&fd_pairs->pair[i]);
+                                               ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
                                                if(ret == EAGAIN) ret = 0;
 
-                                               ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+                                               ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
                                                if(ret)
                                                        printf("Error in mutex unlock : %s\n", strerror(ret));
                                        }
+                                       pthread_rwlock_unlock(&fd_pairs_lock);
                                        break;
                        }
                }
-               /* If every FD has hung up, we end the read loop here */
-               if(num_hup == fd_pairs->num_pairs) break;
+               /* If every buffer FD has hung up, we end the read loop here */
+               if(num_hup == num_pollfd - inotify_fds) break;
 
                if(!high_prio) {
-                       for(i=0;i<fd_pairs->num_pairs;i++) {
+                       for(i=inotify_fds;i<num_pollfd;i++) {
                                switch(pollfd[i].revents) {
                                        case POLLIN:
-                                               if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+                                               pthread_rwlock_rdlock(&fd_pairs_lock);
+                                               if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
                                                        /* Take care of low priority channels. */
                                                        printf("Normal read on fd %d\n", pollfd[i].fd);
                                                        /* it's ok to have an unavailable subbuffer */
-                                                       ret = read_subbuffer(&fd_pairs->pair[i]);
+                                                       ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
                                                        if(ret == EAGAIN) ret = 0;
 
-                                                       ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+                                                       ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
                                                        if(ret)
                                                                printf("Error in mutex unlock : %s\n", strerror(ret));
                                                }
+                                               pthread_rwlock_unlock(&fd_pairs_lock);
                                                break;
                                }
                        }
                }
 
+               /* Update pollfd array if an entry was added to fd_pairs */
+               pthread_rwlock_rdlock(&fd_pairs_lock);
+               if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) {
+                       pollfd = realloc(pollfd,
+                                       (inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
+                       for(i=num_pollfd-inotify_fds;i<fd_pairs->num_pairs;i++) {
+                               pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
+                               pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
+                       }
+                       num_pollfd = fd_pairs->num_pairs + inotify_fds;
+               }
+               pthread_rwlock_unlock(&fd_pairs_lock);
+
+               /* NB: If the fd_pairs structure is updated by another thread from this
+                *     point forward, the current thread will wait in the poll without
+                *     monitoring the new channel. However, this thread will add the
+                *     new channel on next poll (and this should not take too much time
+                *     on a loaded system).
+                *
+                *     This event is quite unlikely and can only occur if a CPU is
+                *     hot-plugged while multple lttd threads are running.
+                */
        }
 
 free_fd:
        free(pollfd);
 
 end:
-       return (void*)ret;
+       return ret;
 }
 
 
-void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs)
+void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd,
+       struct inotify_watch_array *iwatch_array)
 {
        int i;
        int ret;
@@ -588,15 +827,51 @@ void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs)
                if(ret == -1) perror("Close error on trace");
        }
        free(fd_pairs->pair);
+       free(iwatch_array->elem);
 }
 
+/* Thread worker */
+void * thread_main(void *arg)
+{
+       long ret;
+       unsigned long thread_num = (unsigned long)arg;
+
+       ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array);
+
+       return (void*)ret;
+}
+
+
+int channels_init()
+{
+       int ret = 0;
+
+       inotify_fd = inotify_init();
+       fcntl(inotify_fd, F_SETFL, O_NONBLOCK);
+
+       if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs,
+                       &inotify_fd, &inotify_watch_array))
+               goto close_channel;
+
+       if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs))
+               goto close_channel;
+
+       return 0;
+
+close_channel:
+       close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
+       if(inotify_fd >= 0)
+               close(inotify_fd);
+       return ret;
+}
+
+
 int main(int argc, char ** argv)
 {
        int ret = 0;
-       struct channel_trace_fd fd_pairs = { NULL, 0 };
        struct sigaction act;
        pthread_t *tids;
-       unsigned int i;
+       unsigned long i;
        void *tret;
        
        ret = parse_arguments(argc, argv);
@@ -609,12 +884,12 @@ int main(int argc, char ** argv)
 
        if(daemon_mode) {
                ret = daemon(0, 0);
-    
-    if(ret == -1) {
-      perror("An error occured while daemonizing.");
-      exit(-1);
-    }
-  }
+               
+               if(ret == -1) {
+                       perror("An error occured while daemonizing.");
+                       exit(-1);
+               }
+       }
 
        /* Connect the signal handlers */
        act.sa_handler = handler;
@@ -627,16 +902,13 @@ int main(int argc, char ** argv)
        sigaction(SIGQUIT, &act, NULL);
        sigaction(SIGINT, &act, NULL);
 
+       if(ret = channels_init())
+               return ret;
 
-       if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs))
-               goto close_channel;
-
-       if(ret = map_channels(&fd_pairs))
-               goto close_channel;
-       
        tids = malloc(sizeof(pthread_t) * num_threads);
        for(i=0; i<num_threads; i++) {
-               ret = pthread_create(&tids[i], NULL, read_channels, &fd_pairs);
+
+               ret = pthread_create(&tids[i], NULL, thread_main, (void*)i);
                if(ret) {
                        perror("Error creating thread");
                        break;
@@ -649,17 +921,18 @@ int main(int argc, char ** argv)
                        perror("Error joining thread");
                        break;
                }
-               if((int)tret != 0) {
-                       printf("Error %s occured in thread %u\n", strerror((int)tret), i);
+               if((long)tret != 0) {
+                       printf("Error %s occured in thread %u\n",
+                               strerror((long)tret), i);
                }
        }
 
        free(tids);
-                       
-       ret |= unmap_channels(&fd_pairs);
-
-close_channel:
-       close_channel_trace_pairs(&fd_pairs);
 
+       ret = unmap_channels(&fd_pairs);
+       close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
+       if(inotify_fd >= 0)
+               close(inotify_fd);
+                       
        return ret;
 }
This page took 0.031959 seconds and 4 git commands to generate.