61d24688f4283170f8cf98cb3ffe0b5075305d7c
[ltt-control.git] / lttd / lttd.c
1 /* lttd
2 *
3 * Linux Trace Toolkit Daemon
4 *
5 * This is a simple daemon that reads a few relay+debugfs channels and save
6 * them in a trace.
7 *
8 * CPU hot-plugging is supported using inotify.
9 *
10 * Copyright 2005 -
11 * Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
12 */
13
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17
18 #define _REENTRANT
19 #define _GNU_SOURCE
20 #include <features.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <errno.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <stdlib.h>
27 #include <dirent.h>
28 #include <string.h>
29 #include <fcntl.h>
30 #include <sys/poll.h>
31 #include <sys/mman.h>
32 #include <signal.h>
33 #include <pthread.h>
34 #include <sys/syscall.h>
35 #include <unistd.h>
36 #include <asm/ioctls.h>
37
38 #include <linux/version.h>
39
40 /* Relayfs IOCTL */
41 #include <asm/ioctl.h>
42 #include <asm/types.h>
43
44 /* Get the next sub buffer that can be read. */
45 #define RELAY_GET_SUBBUF _IOR(0xF5, 0x00,__u32)
46 /* Release the oldest reserved (by "get") sub buffer. */
47 #define RELAY_PUT_SUBBUF _IOW(0xF5, 0x01,__u32)
48 /* returns the number of sub buffers in the per cpu channel. */
49 #define RELAY_GET_N_SUBBUFS _IOR(0xF5, 0x02,__u32)
50 /* returns the size of the sub buffers. */
51 #define RELAY_GET_SUBBUF_SIZE _IOR(0xF5, 0x03,__u32)
52
53 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
54 #include <sys/inotify.h>
55 #if 0 /* should now be provided by libc. */
56 /* From the inotify-tools 2.6 package */
57 static inline int inotify_init (void)
58 {
59 return syscall (__NR_inotify_init);
60 }
61
62 static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
63 {
64 return syscall (__NR_inotify_add_watch, fd, name, mask);
65 }
66
67 static inline int inotify_rm_watch (int fd, __u32 wd)
68 {
69 return syscall (__NR_inotify_rm_watch, fd, wd);
70 }
71 #endif //0
72 #define HAS_INOTIFY
73 #else
74 static inline int inotify_init (void)
75 {
76 return -1;
77 }
78
79 static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
80 {
81 return 0;
82 }
83
84 static inline int inotify_rm_watch (int fd, __u32 wd)
85 {
86 return 0;
87 }
88 #undef HAS_INOTIFY
89 #endif
90
91 enum {
92 GET_SUBBUF,
93 PUT_SUBBUF,
94 GET_N_BUBBUFS,
95 GET_SUBBUF_SIZE
96 };
97
98 struct fd_pair {
99 int channel;
100 int trace;
101 unsigned int n_subbufs;
102 unsigned int subbuf_size;
103 void *mmap;
104 pthread_mutex_t mutex;
105 };
106
107 struct channel_trace_fd {
108 struct fd_pair *pair;
109 int num_pairs;
110 };
111
112 struct inotify_watch {
113 int wd;
114 char path_channel[PATH_MAX];
115 char path_trace[PATH_MAX];
116 };
117
118 struct inotify_watch_array {
119 struct inotify_watch *elem;
120 int num;
121 };
122
123 static __thread int thread_pipe[2];
124
125 struct channel_trace_fd fd_pairs = { NULL, 0 };
126 int inotify_fd = -1;
127 struct inotify_watch_array inotify_watch_array = { NULL, 0 };
128
129 /* protects fd_pairs and inotify_watch_array */
130 pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER;
131
132
133 static char *trace_name = NULL;
134 static char *channel_name = NULL;
135 static int daemon_mode = 0;
136 static int append_mode = 0;
137 static unsigned long num_threads = 1;
138 volatile static int quit_program = 0; /* For signal handler */
139 static int dump_flight_only = 0;
140 static int dump_normal_only = 0;
141 static int verbose_mode = 0;
142
143 #define printf_verbose(fmt, args...) \
144 do { \
145 if (verbose_mode) \
146 printf(fmt, ##args); \
147 } while (0)
148
149 /* Args :
150 *
151 * -t directory Directory name of the trace to write to. Will be created.
152 * -c directory Root directory of the debugfs trace channels.
153 * -d Run in background (daemon).
154 * -a Trace append mode.
155 * -s Send SIGUSR1 to parent when ready for IO.
156 */
157 void show_arguments(void)
158 {
159 printf("Please use the following arguments :\n");
160 printf("\n");
161 printf("-t directory Directory name of the trace to write to.\n"
162 " It will be created.\n");
163 printf("-c directory Root directory of the debugfs trace channels.\n");
164 printf("-d Run in background (daemon).\n");
165 printf("-a Append to an possibly existing trace.\n");
166 printf("-N Number of threads to start.\n");
167 printf("-f Dump only flight recorder channels.\n");
168 printf("-n Dump only normal channels.\n");
169 printf("-v Verbose mode.\n");
170 printf("\n");
171 }
172
173
174 /* parse_arguments
175 *
176 * Parses the command line arguments.
177 *
178 * Returns 1 if the arguments were correct, but doesn't ask for program
179 * continuation. Returns -1 if the arguments are incorrect, or 0 if OK.
180 */
181 int parse_arguments(int argc, char **argv)
182 {
183 int ret = 0;
184 int argn = 1;
185
186 if(argc == 2) {
187 if(strcmp(argv[1], "-h") == 0) {
188 return 1;
189 }
190 }
191
192 while(argn < argc) {
193
194 switch(argv[argn][0]) {
195 case '-':
196 switch(argv[argn][1]) {
197 case 't':
198 if(argn+1 < argc) {
199 trace_name = argv[argn+1];
200 argn++;
201 }
202 break;
203 case 'c':
204 if(argn+1 < argc) {
205 channel_name = argv[argn+1];
206 argn++;
207 }
208 break;
209 case 'd':
210 daemon_mode = 1;
211 break;
212 case 'a':
213 append_mode = 1;
214 break;
215 case 'N':
216 if(argn+1 < argc) {
217 num_threads = strtoul(argv[argn+1], NULL, 0);
218 argn++;
219 }
220 break;
221 case 'f':
222 dump_flight_only = 1;
223 break;
224 case 'n':
225 dump_normal_only = 1;
226 break;
227 case 'v':
228 verbose_mode = 1;
229 break;
230 default:
231 printf("Invalid argument '%s'.\n", argv[argn]);
232 printf("\n");
233 ret = -1;
234 }
235 break;
236 default:
237 printf("Invalid argument '%s'.\n", argv[argn]);
238 printf("\n");
239 ret = -1;
240 }
241 argn++;
242 }
243
244 if(trace_name == NULL) {
245 printf("Please specify a trace name.\n");
246 printf("\n");
247 ret = -1;
248 }
249
250 if(channel_name == NULL) {
251 printf("Please specify a channel name.\n");
252 printf("\n");
253 ret = -1;
254 }
255
256 return ret;
257 }
258
259 void show_info(void)
260 {
261 printf("Linux Trace Toolkit Trace Daemon " VERSION "\n");
262 printf("\n");
263 printf("Reading from debugfs directory : %s\n", channel_name);
264 printf("Writing to trace directory : %s\n", trace_name);
265 printf("\n");
266 }
267
268
269 /* signal handling */
270
271 static void handler(int signo)
272 {
273 printf("Signal %d received : exiting cleanly\n", signo);
274 quit_program = 1;
275 }
276
277
278 int open_buffer_file(char *filename, char *path_channel, char *path_trace,
279 struct channel_trace_fd *fd_pairs)
280 {
281 int open_ret = 0;
282 int ret = 0;
283 struct stat stat_buf;
284
285 if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) {
286 if(dump_flight_only) {
287 printf_verbose("Skipping normal channel %s\n",
288 path_channel);
289 return 0;
290 }
291 } else {
292 if(dump_normal_only) {
293 printf_verbose("Skipping flight channel %s\n",
294 path_channel);
295 return 0;
296 }
297 }
298 printf_verbose("Opening file.\n");
299
300 fd_pairs->pair = realloc(fd_pairs->pair,
301 ++fd_pairs->num_pairs * sizeof(struct fd_pair));
302
303 /* Open the channel in read mode */
304 fd_pairs->pair[fd_pairs->num_pairs-1].channel =
305 open(path_channel, O_RDONLY | O_NONBLOCK);
306 if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) {
307 perror(path_channel);
308 fd_pairs->num_pairs--;
309 return 0; /* continue */
310 }
311 /* Open the trace in write mode, only append if append_mode */
312 ret = stat(path_trace, &stat_buf);
313 if(ret == 0) {
314 if(append_mode) {
315 printf_verbose("Appending to file %s as requested\n",
316 path_trace);
317
318 fd_pairs->pair[fd_pairs->num_pairs-1].trace =
319 open(path_trace, O_WRONLY,
320 S_IRWXU|S_IRWXG|S_IRWXO);
321 if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
322 perror(path_trace);
323 }
324 ret = lseek(fd_pairs->pair[fd_pairs->num_pairs-1].trace,
325 0, SEEK_END);
326 if (ret < 0) {
327 perror(path_trace);
328 }
329 } else {
330 printf("File %s exists, cannot open. Try append mode.\n", path_trace);
331 open_ret = -1;
332 goto end;
333 }
334 } else {
335 if(errno == ENOENT) {
336 fd_pairs->pair[fd_pairs->num_pairs-1].trace =
337 open(path_trace, O_WRONLY|O_CREAT|O_EXCL,
338 S_IRWXU|S_IRWXG|S_IRWXO);
339 if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
340 perror(path_trace);
341 }
342 }
343 }
344 end:
345 return open_ret;
346 }
347
348 int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
349 struct channel_trace_fd *fd_pairs, int *inotify_fd,
350 struct inotify_watch_array *iwatch_array)
351 {
352 DIR *channel_dir = opendir(subchannel_name);
353 struct dirent *entry;
354 struct stat stat_buf;
355 int ret;
356 char path_channel[PATH_MAX];
357 int path_channel_len;
358 char *path_channel_ptr;
359 char path_trace[PATH_MAX];
360 int path_trace_len;
361 char *path_trace_ptr;
362 int open_ret = 0;
363
364 if(channel_dir == NULL) {
365 perror(subchannel_name);
366 open_ret = ENOENT;
367 goto end;
368 }
369
370 printf_verbose("Creating trace subdirectory %s\n", subtrace_name);
371 ret = mkdir(subtrace_name, S_IRWXU|S_IRWXG|S_IRWXO);
372 if(ret == -1) {
373 if(errno != EEXIST) {
374 perror(subtrace_name);
375 open_ret = -1;
376 goto end;
377 }
378 }
379
380 strncpy(path_channel, subchannel_name, PATH_MAX-1);
381 path_channel_len = strlen(path_channel);
382 path_channel[path_channel_len] = '/';
383 path_channel_len++;
384 path_channel_ptr = path_channel + path_channel_len;
385
386 strncpy(path_trace, subtrace_name, PATH_MAX-1);
387 path_trace_len = strlen(path_trace);
388 path_trace[path_trace_len] = '/';
389 path_trace_len++;
390 path_trace_ptr = path_trace + path_trace_len;
391
392 #ifdef HAS_INOTIFY
393 iwatch_array->elem = realloc(iwatch_array->elem,
394 ++iwatch_array->num * sizeof(struct inotify_watch));
395
396 printf_verbose("Adding inotify for channel %s\n", path_channel);
397 iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE);
398 strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel);
399 strcpy(iwatch_array->elem[iwatch_array->num-1].path_trace, path_trace);
400 printf_verbose("Added inotify for channel %s, wd %u\n",
401 iwatch_array->elem[iwatch_array->num-1].path_channel,
402 iwatch_array->elem[iwatch_array->num-1].wd);
403 #endif
404
405 while((entry = readdir(channel_dir)) != NULL) {
406
407 if(entry->d_name[0] == '.') continue;
408
409 strncpy(path_channel_ptr, entry->d_name, PATH_MAX - path_channel_len);
410 strncpy(path_trace_ptr, entry->d_name, PATH_MAX - path_trace_len);
411
412 ret = stat(path_channel, &stat_buf);
413 if(ret == -1) {
414 perror(path_channel);
415 continue;
416 }
417
418 printf_verbose("Channel file : %s\n", path_channel);
419
420 if(S_ISDIR(stat_buf.st_mode)) {
421
422 printf_verbose("Entering channel subdirectory...\n");
423 ret = open_channel_trace_pairs(path_channel, path_trace, fd_pairs,
424 inotify_fd, iwatch_array);
425 if(ret < 0) continue;
426 } else if(S_ISREG(stat_buf.st_mode)) {
427 open_ret = open_buffer_file(entry->d_name, path_channel, path_trace,
428 fd_pairs);
429 if(open_ret)
430 goto end;
431 }
432 }
433
434 end:
435 closedir(channel_dir);
436
437 return open_ret;
438 }
439
440
441 int read_subbuffer(struct fd_pair *pair)
442 {
443 unsigned int consumed_old;
444 int err;
445 long ret;
446 unsigned long len;
447 off_t offset;
448
449
450 err = ioctl(pair->channel, RELAY_GET_SUBBUF, &consumed_old);
451 printf_verbose("cookie : %u\n", consumed_old);
452 if(err != 0) {
453 ret = errno;
454 perror("Reserving sub buffer failed (everything is normal, it is due to concurrency)");
455 goto get_error;
456 }
457 #if 0
458 err = TEMP_FAILURE_RETRY(write(pair->trace,
459 pair->mmap
460 + (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)),
461 pair->subbuf_size));
462
463 if(err < 0) {
464 ret = errno;
465 perror("Error in writing to file");
466 goto write_error;
467 }
468 #endif //0
469 len = pair->subbuf_size;
470 offset = 0;
471 while (len > 0) {
472 printf_verbose("splice chan to pipe offset %lu\n",
473 (unsigned long)offset);
474 ret = splice(pair->channel, &offset, thread_pipe[1], NULL,
475 len, SPLICE_F_MOVE | SPLICE_F_MORE);
476 printf_verbose("splice chan to pipe ret %ld\n", ret);
477 if (ret < 0) {
478 perror("Error in relay splice");
479 goto write_error;
480 }
481 ret = splice(thread_pipe[0], NULL, pair->trace, NULL,
482 ret, SPLICE_F_MOVE | SPLICE_F_MORE);
483 printf_verbose("splice pipe to file %ld\n", ret);
484 if (ret < 0) {
485 perror("Error in file splice");
486 goto write_error;
487 }
488 len -= ret;
489 }
490
491 #if 0
492 err = fsync(pair->trace);
493 if(err < 0) {
494 ret = errno;
495 perror("Error in writing to file");
496 goto write_error;
497 }
498 #endif //0
499 write_error:
500 ret = 0;
501 err = ioctl(pair->channel, RELAY_PUT_SUBBUF, &consumed_old);
502 if(err != 0) {
503 ret = errno;
504 if(errno == EFAULT) {
505 perror("Error in unreserving sub buffer\n");
506 } else if(errno == EIO) {
507 perror("Reader has been pushed by the writer, last subbuffer corrupted.");
508 /* FIXME : we may delete the last written buffer if we wish. */
509 }
510 goto get_error;
511 }
512
513 get_error:
514 return ret;
515 }
516
517
518 int map_channels(struct channel_trace_fd *fd_pairs,
519 int idx_begin, int idx_end)
520 {
521 int i,j;
522 int ret=0;
523
524 if(fd_pairs->num_pairs <= 0) {
525 printf("No channel to read\n");
526 goto end;
527 }
528
529 /* Get the subbuf sizes and number */
530
531 for(i=idx_begin;i<idx_end;i++) {
532 struct fd_pair *pair = &fd_pairs->pair[i];
533
534 ret = ioctl(pair->channel, RELAY_GET_N_SUBBUFS,
535 &pair->n_subbufs);
536 if(ret != 0) {
537 perror("Error in getting the number of subbuffers");
538 goto end;
539 }
540 ret = ioctl(pair->channel, RELAY_GET_SUBBUF_SIZE,
541 &pair->subbuf_size);
542 if(ret != 0) {
543 perror("Error in getting the size of the subbuffers");
544 goto end;
545 }
546 ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */
547 if(ret != 0) {
548 perror("Error in mutex init");
549 goto end;
550 }
551 }
552
553 #if 0
554 /* Mmap each FD */
555 for(i=idx_begin;i<idx_end;i++) {
556 struct fd_pair *pair = &fd_pairs->pair[i];
557
558 pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ,
559 MAP_SHARED, pair->channel, 0);
560 if(pair->mmap == MAP_FAILED) {
561 perror("Mmap error");
562 goto munmap;
563 }
564 }
565
566 goto end; /* success */
567
568 /* Error handling */
569 /* munmap only the successfully mmapped indexes */
570 munmap:
571 /* Munmap each FD */
572 for(j=idx_begin;j<i;j++) {
573 struct fd_pair *pair = &fd_pairs->pair[j];
574 int err_ret;
575
576 err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
577 if(err_ret != 0) {
578 perror("Error in munmap");
579 }
580 ret |= err_ret;
581 }
582
583 #endif //0
584 end:
585 return ret;
586 }
587
588 int unmap_channels(struct channel_trace_fd *fd_pairs)
589 {
590 int j;
591 int ret=0;
592
593 /* Munmap each FD */
594 for(j=0;j<fd_pairs->num_pairs;j++) {
595 struct fd_pair *pair = &fd_pairs->pair[j];
596 int err_ret;
597
598 #if 0
599 err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
600 if(err_ret != 0) {
601 perror("Error in munmap");
602 }
603 ret |= err_ret;
604 #endif //0
605 err_ret = pthread_mutex_destroy(&pair->mutex);
606 if(err_ret != 0) {
607 perror("Error in mutex destroy");
608 }
609 ret |= err_ret;
610 }
611
612 return ret;
613 }
614
615 #ifdef HAS_INOTIFY
616 /* Inotify event arrived.
617 *
618 * Only support add file for now.
619 */
620
621 int read_inotify(int inotify_fd,
622 struct channel_trace_fd *fd_pairs,
623 struct inotify_watch_array *iwatch_array)
624 {
625 char buf[sizeof(struct inotify_event) + PATH_MAX];
626 char path_channel[PATH_MAX];
627 char path_trace[PATH_MAX];
628 ssize_t len;
629 struct inotify_event *ievent;
630 size_t offset;
631 unsigned int i;
632 int ret;
633 int old_num;
634
635 offset = 0;
636 len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX);
637 if(len < 0) {
638
639 if(errno == EAGAIN)
640 return 0; /* another thread got the data before us */
641
642 printf("Error in read from inotify FD %s.\n", strerror(len));
643 return -1;
644 }
645 while(offset < len) {
646 ievent = (struct inotify_event *)&(buf[offset]);
647 for(i=0; i<iwatch_array->num; i++) {
648 if(iwatch_array->elem[i].wd == ievent->wd &&
649 ievent->mask == IN_CREATE) {
650 printf_verbose(
651 "inotify wd %u event mask : %u for %s%s\n",
652 ievent->wd, ievent->mask,
653 iwatch_array->elem[i].path_channel,
654 ievent->name);
655 old_num = fd_pairs->num_pairs;
656 strcpy(path_channel, iwatch_array->elem[i].path_channel);
657 strcat(path_channel, ievent->name);
658 strcpy(path_trace, iwatch_array->elem[i].path_trace);
659 strcat(path_trace, ievent->name);
660 if(ret = open_buffer_file(ievent->name, path_channel,
661 path_trace, fd_pairs)) {
662 printf("Error opening buffer file\n");
663 return -1;
664 }
665 if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) {
666 printf("Error mapping channel\n");
667 return -1;
668 }
669
670 }
671 }
672 offset += sizeof(*ievent) + ievent->len;
673 }
674 }
675 #endif //HAS_INOTIFY
676
677 /* read_channels
678 *
679 * Thread worker.
680 *
681 * Read the debugfs channels and write them in the paired tracefiles.
682 *
683 * @fd_pairs : paired channels and trace files.
684 *
685 * returns 0 on success, -1 on error.
686 *
687 * Note that the high priority polled channels are consumed first. We then poll
688 * again to see if these channels are still in priority. Only when no
689 * high priority channel is left, we start reading low priority channels.
690 *
691 * Note that a channel is considered high priority when the buffer is almost
692 * full.
693 */
694
695 int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs,
696 int inotify_fd, struct inotify_watch_array *iwatch_array)
697 {
698 struct pollfd *pollfd = NULL;
699 int num_pollfd;
700 int i,j;
701 int num_rdy, num_hup;
702 int high_prio;
703 int ret = 0;
704 int inotify_fds;
705 unsigned int old_num;
706
707 #ifdef HAS_INOTIFY
708 inotify_fds = 1;
709 #else
710 inotify_fds = 0;
711 #endif
712
713 pthread_rwlock_rdlock(&fd_pairs_lock);
714
715 /* Start polling the FD. Keep one fd for inotify */
716 pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
717
718 #ifdef HAS_INOTIFY
719 pollfd[0].fd = inotify_fd;
720 pollfd[0].events = POLLIN|POLLPRI;
721 #endif
722
723 for(i=0;i<fd_pairs->num_pairs;i++) {
724 pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
725 pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
726 }
727 num_pollfd = inotify_fds + fd_pairs->num_pairs;
728
729
730 pthread_rwlock_unlock(&fd_pairs_lock);
731
732 while(1) {
733 high_prio = 0;
734 num_hup = 0;
735 #ifdef DEBUG
736 printf("Press a key for next poll...\n");
737 char buf[1];
738 read(STDIN_FILENO, &buf, 1);
739 printf("Next poll (polling %d fd) :\n", num_pollfd);
740 #endif //DEBUG
741
742 /* Have we received a signal ? */
743 if(quit_program) break;
744
745 num_rdy = poll(pollfd, num_pollfd, -1);
746
747 if(num_rdy == -1) {
748 perror("Poll error");
749 goto free_fd;
750 }
751
752 printf_verbose("Data received\n");
753 #ifdef HAS_INOTIFY
754 switch(pollfd[0].revents) {
755 case POLLERR:
756 printf_verbose(
757 "Error returned in polling inotify fd %d.\n",
758 pollfd[0].fd);
759 break;
760 case POLLHUP:
761 printf_verbose(
762 "Polling inotify fd %d tells it has hung up.\n",
763 pollfd[0].fd);
764 break;
765 case POLLNVAL:
766 printf_verbose(
767 "Polling inotify fd %d tells fd is not open.\n",
768 pollfd[0].fd);
769 break;
770 case POLLPRI:
771 case POLLIN:
772 printf_verbose(
773 "Polling inotify fd %d : data ready.\n",
774 pollfd[0].fd);
775
776 pthread_rwlock_wrlock(&fd_pairs_lock);
777 read_inotify(inotify_fd, fd_pairs, iwatch_array);
778 pthread_rwlock_unlock(&fd_pairs_lock);
779
780 break;
781 }
782 #endif
783
784 for(i=inotify_fds;i<num_pollfd;i++) {
785 switch(pollfd[i].revents) {
786 case POLLERR:
787 printf_verbose(
788 "Error returned in polling fd %d.\n",
789 pollfd[i].fd);
790 num_hup++;
791 break;
792 case POLLHUP:
793 printf_verbose(
794 "Polling fd %d tells it has hung up.\n",
795 pollfd[i].fd);
796 num_hup++;
797 break;
798 case POLLNVAL:
799 printf_verbose(
800 "Polling fd %d tells fd is not open.\n",
801 pollfd[i].fd);
802 num_hup++;
803 break;
804 case POLLPRI:
805 pthread_rwlock_rdlock(&fd_pairs_lock);
806 if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
807 printf_verbose(
808 "Urgent read on fd %d\n",
809 pollfd[i].fd);
810 /* Take care of high priority channels first. */
811 high_prio = 1;
812 /* it's ok to have an unavailable subbuffer */
813 ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
814 if(ret == EAGAIN) ret = 0;
815
816 ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
817 if(ret)
818 printf("Error in mutex unlock : %s\n", strerror(ret));
819 }
820 pthread_rwlock_unlock(&fd_pairs_lock);
821 break;
822 }
823 }
824 /* If every buffer FD has hung up, we end the read loop here */
825 if(num_hup == num_pollfd - inotify_fds) break;
826
827 if(!high_prio) {
828 for(i=inotify_fds;i<num_pollfd;i++) {
829 switch(pollfd[i].revents) {
830 case POLLIN:
831 pthread_rwlock_rdlock(&fd_pairs_lock);
832 if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
833 /* Take care of low priority channels. */
834 printf_verbose(
835 "Normal read on fd %d\n",
836 pollfd[i].fd);
837 /* it's ok to have an unavailable subbuffer */
838 ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
839 if(ret == EAGAIN) ret = 0;
840
841 ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
842 if(ret)
843 printf("Error in mutex unlock : %s\n", strerror(ret));
844 }
845 pthread_rwlock_unlock(&fd_pairs_lock);
846 break;
847 }
848 }
849 }
850
851 /* Update pollfd array if an entry was added to fd_pairs */
852 pthread_rwlock_rdlock(&fd_pairs_lock);
853 if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) {
854 pollfd = realloc(pollfd,
855 (inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
856 for(i=num_pollfd-inotify_fds;i<fd_pairs->num_pairs;i++) {
857 pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
858 pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
859 }
860 num_pollfd = fd_pairs->num_pairs + inotify_fds;
861 }
862 pthread_rwlock_unlock(&fd_pairs_lock);
863
864 /* NB: If the fd_pairs structure is updated by another thread from this
865 * point forward, the current thread will wait in the poll without
866 * monitoring the new channel. However, this thread will add the
867 * new channel on next poll (and this should not take too much time
868 * on a loaded system).
869 *
870 * This event is quite unlikely and can only occur if a CPU is
871 * hot-plugged while multple lttd threads are running.
872 */
873 }
874
875 free_fd:
876 free(pollfd);
877
878 end:
879 return ret;
880 }
881
882
883 void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd,
884 struct inotify_watch_array *iwatch_array)
885 {
886 int i;
887 int ret;
888
889 for(i=0;i<fd_pairs->num_pairs;i++) {
890 ret = close(fd_pairs->pair[i].channel);
891 if(ret == -1) perror("Close error on channel");
892 ret = close(fd_pairs->pair[i].trace);
893 if(ret == -1) perror("Close error on trace");
894 }
895 free(fd_pairs->pair);
896 free(iwatch_array->elem);
897 }
898
899 /* Thread worker */
900 void * thread_main(void *arg)
901 {
902 long ret;
903 unsigned long thread_num = (unsigned long)arg;
904
905 ret = pipe(thread_pipe);
906 if (ret < 0) {
907 perror("Error creating pipe");
908 return (void*)ret;
909 }
910 ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array);
911 close(thread_pipe[0]); /* close read end */
912 close(thread_pipe[1]); /* close write end */
913 return (void*)ret;
914 }
915
916
917 int channels_init()
918 {
919 int ret = 0;
920
921 inotify_fd = inotify_init();
922 fcntl(inotify_fd, F_SETFL, O_NONBLOCK);
923
924 if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs,
925 &inotify_fd, &inotify_watch_array))
926 goto close_channel;
927 if (fd_pairs.num_pairs == 0) {
928 printf("No channel available for reading, exiting\n");
929 ret = -ENOENT;
930 goto close_channel;
931 }
932 if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs))
933 goto close_channel;
934 return 0;
935
936 close_channel:
937 close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
938 if(inotify_fd >= 0)
939 close(inotify_fd);
940 return ret;
941 }
942
943
944 int main(int argc, char ** argv)
945 {
946 int ret = 0;
947 struct sigaction act;
948 pthread_t *tids;
949 unsigned long i;
950 void *tret;
951
952 ret = parse_arguments(argc, argv);
953
954 if(ret != 0) show_arguments();
955 if(ret < 0) return EINVAL;
956 if(ret > 0) return 0;
957
958 show_info();
959
960 /* Connect the signal handlers */
961 act.sa_handler = handler;
962 act.sa_flags = 0;
963 sigemptyset(&(act.sa_mask));
964 sigaddset(&(act.sa_mask), SIGTERM);
965 sigaddset(&(act.sa_mask), SIGQUIT);
966 sigaddset(&(act.sa_mask), SIGINT);
967 sigaction(SIGTERM, &act, NULL);
968 sigaction(SIGQUIT, &act, NULL);
969 sigaction(SIGINT, &act, NULL);
970
971 if(ret = channels_init())
972 return ret;
973
974 if(daemon_mode) {
975 ret = daemon(0, 0);
976
977 if(ret == -1) {
978 perror("An error occured while daemonizing.");
979 exit(-1);
980 }
981 }
982
983 tids = malloc(sizeof(pthread_t) * num_threads);
984 for(i=0; i<num_threads; i++) {
985
986 ret = pthread_create(&tids[i], NULL, thread_main, (void*)i);
987 if(ret) {
988 perror("Error creating thread");
989 break;
990 }
991 }
992
993 for(i=0; i<num_threads; i++) {
994 ret = pthread_join(tids[i], &tret);
995 if(ret) {
996 perror("Error joining thread");
997 break;
998 }
999 if((long)tret != 0) {
1000 printf("Error %s occured in thread %u\n",
1001 strerror((long)tret), i);
1002 }
1003 }
1004
1005 free(tids);
1006 ret = unmap_channels(&fd_pairs);
1007 close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
1008 if(inotify_fd >= 0)
1009 close(inotify_fd);
1010
1011 return ret;
1012 }
This page took 0.048286 seconds and 3 git commands to generate.