Consume with mmap if enabled
[lttng-tools.git] / kconsumerd / kconsumerd.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <fcntl.h>
22 #include <getopt.h>
23 #include <grp.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ipc.h>
31 #include <sys/shm.h>
32 #include <sys/socket.h>
33 #include <sys/stat.h>
34 #include <sys/types.h>
35 #include <urcu/list.h>
36 #include <poll.h>
37 #include <unistd.h>
38 #include <sys/mman.h>
39
40 #include "lttngerr.h"
41 #include "libkernelctl.h"
42 #include "liblttsessiondcomm.h"
43 #include "kconsumerd.h"
44
45 /* Init the list of FDs */
46 static struct ltt_kconsumerd_fd_list kconsumerd_fd_list = {
47 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
48 };
49
50 /* Number of element for the list below. */
51 static unsigned int fds_count;
52
53 /* If the local array of FDs needs update in the poll function */
54 static unsigned int update_fd_array = 1;
55
56 /* lock the fd array and structures */
57 static pthread_mutex_t kconsumerd_lock_fds;
58
59 /* the two threads (receive fd and poll) */
60 static pthread_t threads[2];
61
62 /* communication with splice */
63 static int thread_pipe[2];
64
65 /* pipe to wake the poll thread when necessary */
66 static int poll_pipe[2];
67
68 /* socket to communicate errors with sessiond */
69 static int error_socket = -1;
70
71 /* to count the number of time the user pressed ctrl+c */
72 static int sigintcount = 0;
73
74 /* flag to inform the polling thread to quit when all fd hung up */
75 static int quit = 0;
76
77 /* Argument variables */
78 int opt_quiet;
79 int opt_verbose;
80 static int opt_daemon;
81 static const char *progname;
82 static char command_sock_path[PATH_MAX]; /* Global command socket path */
83 static char error_sock_path[PATH_MAX]; /* Global error path */
84
85 /*
86 * del_fd
87 *
88 * Remove a fd from the global list protected by a mutex
89 */
90 static void del_fd(struct ltt_kconsumerd_fd *lcf)
91 {
92 DBG("Removing %d", lcf->consumerd_fd);
93 pthread_mutex_lock(&kconsumerd_lock_fds);
94 cds_list_del(&lcf->list);
95 if (fds_count > 0) {
96 fds_count--;
97 DBG("Removed ltt_kconsumerd_fd");
98 if (lcf != NULL) {
99 close(lcf->out_fd);
100 close(lcf->consumerd_fd);
101 free(lcf);
102 lcf = NULL;
103 }
104 }
105 pthread_mutex_unlock(&kconsumerd_lock_fds);
106 }
107
108 /*
109 * cleanup
110 *
111 * Cleanup the daemon's socket on exit
112 */
113 static void cleanup()
114 {
115 struct ltt_kconsumerd_fd *iter;
116
117 /* remove the socket file */
118 unlink(command_sock_path);
119
120 /* unblock the threads */
121 WARN("Terminating the threads before exiting");
122 pthread_cancel(threads[0]);
123 pthread_cancel(threads[1]);
124
125 /* close all outfd */
126 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
127 del_fd(iter);
128 }
129 }
130
131 /*
132 * send_error
133 *
134 * send return code to ltt-sessiond
135 */
136 static int send_error(enum lttcomm_return_code cmd)
137 {
138 if (error_socket > 0) {
139 return lttcomm_send_unix_sock(error_socket, &cmd,
140 sizeof(enum lttcomm_sessiond_command));
141 } else {
142 return 0;
143 }
144 }
145
146 /*
147 * add_fd
148 *
149 * Add a fd to the global list protected by a mutex
150 */
151 static int add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
152 {
153 struct ltt_kconsumerd_fd *tmp_fd;
154 int ret;
155
156 tmp_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
157 tmp_fd->sessiond_fd = buf->fd;
158 tmp_fd->consumerd_fd = consumerd_fd;
159 tmp_fd->state = buf->state;
160 tmp_fd->max_sb_size = buf->max_sb_size;
161 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
162
163 /* Opening the tracefile in write mode */
164 DBG("Opening %s for writing", tmp_fd->path_name);
165 ret = open(tmp_fd->path_name,
166 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
167 if (ret < 0) {
168 ERR("Opening %s", tmp_fd->path_name);
169 perror("open");
170 goto end;
171 }
172 tmp_fd->out_fd = ret;
173 tmp_fd->out_fd_offset = 0;
174
175 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
176 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
177
178 pthread_mutex_lock(&kconsumerd_lock_fds);
179 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
180 fds_count++;
181 pthread_mutex_unlock(&kconsumerd_lock_fds);
182
183 end:
184 return ret;
185 }
186
187
188 /*
189 * sighandler
190 *
191 * Signal handler for the daemon
192 */
193 static void sighandler(int sig)
194 {
195 if (sig == SIGINT && sigintcount++ == 0) {
196 DBG("ignoring first SIGINT");
197 return;
198 }
199
200 cleanup();
201
202 return;
203 }
204
205 /*
206 * set_signal_handler
207 *
208 * Setup signal handler for :
209 * SIGINT, SIGTERM, SIGPIPE
210 */
211 static int set_signal_handler(void)
212 {
213 int ret = 0;
214 struct sigaction sa;
215 sigset_t sigset;
216
217 if ((ret = sigemptyset(&sigset)) < 0) {
218 perror("sigemptyset");
219 return ret;
220 }
221
222 sa.sa_handler = sighandler;
223 sa.sa_mask = sigset;
224 sa.sa_flags = 0;
225 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
226 perror("sigaction");
227 return ret;
228 }
229
230 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
231 perror("sigaction");
232 return ret;
233 }
234
235 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
236 perror("sigaction");
237 return ret;
238 }
239
240 return ret;
241 }
242
243 /*
244 * on_read_subbuffer_mmap
245 *
246 * mmap the ring buffer, read it and write the data to the tracefile.
247 * Returns the number of bytes written
248 */
249 static int on_read_subbuffer_mmap(struct ltt_kconsumerd_fd *kconsumerd_fd,
250 unsigned long len)
251 {
252 unsigned long mmap_len;
253 unsigned long mmap_offset;
254 unsigned long padded_len;
255 unsigned long padding_len;
256 char *mmap_base;
257 char *padding = NULL;
258 long ret = 0;
259 off_t orig_offset = kconsumerd_fd->out_fd_offset;
260 int fd = kconsumerd_fd->consumerd_fd;
261 int outfd = kconsumerd_fd->out_fd;
262
263 /* get the padded subbuffer size to know the padding required */
264 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
265 if (ret != 0) {
266 ret = errno;
267 perror("kernctl_get_padded_subbuf_size");
268 goto end;
269 }
270 padding_len = padded_len - len;
271 padding = malloc(padding_len * sizeof(char));
272 memset(padding, '\0', padding_len);
273
274 /* get the len of the mmap region */
275 ret = kernctl_get_mmap_len(fd, &mmap_len);
276 if (ret != 0) {
277 ret = errno;
278 perror("kernctl_get_mmap_len");
279 goto end;
280 }
281
282 /* get the offset inside the fd to mmap */
283 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
284 if (ret != 0) {
285 ret = errno;
286 perror("kernctl_get_mmap_read_offset");
287 goto end;
288 }
289
290 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
291 if (mmap_base == MAP_FAILED) {
292 perror("Error mmaping");
293 ret = -1;
294 goto end;
295 }
296
297 while (len > 0) {
298 ret = write(outfd, mmap_base, len);
299 if (ret >= len) {
300 len = 0;
301 } else if (ret < 0) {
302 ret = errno;
303 perror("Error in file write");
304 goto end;
305 }
306 /* This won't block, but will start writeout asynchronously */
307 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
308 SYNC_FILE_RANGE_WRITE);
309 kconsumerd_fd->out_fd_offset += ret;
310 }
311
312 /* once all the data is written, write the padding to disk */
313 ret = write(outfd, padding, padding_len);
314 if (ret < 0) {
315 ret = errno;
316 perror("Error writing padding to file");
317 goto end;
318 }
319
320 /*
321 * This does a blocking write-and-wait on any page that belongs to the
322 * subbuffer prior to the one we just wrote.
323 * Don't care about error values, as these are just hints and ways to
324 * limit the amount of page cache used.
325 */
326 if (orig_offset >= kconsumerd_fd->max_sb_size) {
327 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
328 kconsumerd_fd->max_sb_size,
329 SYNC_FILE_RANGE_WAIT_BEFORE
330 | SYNC_FILE_RANGE_WRITE
331 | SYNC_FILE_RANGE_WAIT_AFTER);
332 /*
333 * Give hints to the kernel about how we access the file:
334 * POSIX_FADV_DONTNEED : we won't re-access data in a near
335 * future after we write it.
336 * We need to call fadvise again after the file grows because
337 * the kernel does not seem to apply fadvise to non-existing
338 * parts of the file.
339 * Call fadvise _after_ having waited for the page writeback to
340 * complete because the dirty page writeback semantic is not
341 * well defined. So it can be expected to lead to lower
342 * throughput in streaming.
343 */
344 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
345 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
346 }
347 goto end;
348
349 end:
350 if (padding != NULL) {
351 free(padding);
352 }
353 return ret;
354 }
355
356 /*
357 * on_read_subbuffer
358 *
359 * Splice the data from the ring buffer to the tracefile.
360 * Returns the number of bytes spliced
361 */
362 static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
363 unsigned long len)
364 {
365 long ret = 0;
366 loff_t offset = 0;
367 off_t orig_offset = kconsumerd_fd->out_fd_offset;
368 int fd = kconsumerd_fd->consumerd_fd;
369 int outfd = kconsumerd_fd->out_fd;
370
371 while (len > 0) {
372 DBG("splice chan to pipe offset %lu (fd : %d)",
373 (unsigned long)offset, fd);
374 ret = splice(fd, &offset, thread_pipe[1], NULL, len,
375 SPLICE_F_MOVE | SPLICE_F_MORE);
376 DBG("splice chan to pipe ret %ld", ret);
377 if (ret < 0) {
378 ret = errno;
379 perror("Error in relay splice");
380 goto splice_error;
381 }
382
383 ret = splice(thread_pipe[0], NULL, outfd, NULL, ret,
384 SPLICE_F_MOVE | SPLICE_F_MORE);
385 DBG("splice pipe to file %ld", ret);
386 if (ret < 0) {
387 ret = errno;
388 perror("Error in file splice");
389 goto splice_error;
390 }
391 if (ret >= len) {
392 len = 0;
393 }
394 /* This won't block, but will start writeout asynchronously */
395 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
396 SYNC_FILE_RANGE_WRITE);
397 kconsumerd_fd->out_fd_offset += ret;
398 }
399
400 /*
401 * This does a blocking write-and-wait on any page that belongs to the
402 * subbuffer prior to the one we just wrote.
403 * Don't care about error values, as these are just hints and ways to
404 * limit the amount of page cache used.
405 */
406 if (orig_offset >= kconsumerd_fd->max_sb_size) {
407 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
408 kconsumerd_fd->max_sb_size,
409 SYNC_FILE_RANGE_WAIT_BEFORE
410 | SYNC_FILE_RANGE_WRITE
411 | SYNC_FILE_RANGE_WAIT_AFTER);
412 /*
413 * Give hints to the kernel about how we access the file:
414 * POSIX_FADV_DONTNEED : we won't re-access data in a near
415 * future after we write it.
416 * We need to call fadvise again after the file grows because
417 * the kernel does not seem to apply fadvise to non-existing
418 * parts of the file.
419 * Call fadvise _after_ having waited for the page writeback to
420 * complete because the dirty page writeback semantic is not
421 * well defined. So it can be expected to lead to lower
422 * throughput in streaming.
423 */
424 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
425 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
426 }
427 goto end;
428
429 splice_error:
430 /* send the appropriate error description to sessiond */
431 switch(ret) {
432 case EBADF:
433 send_error(KCONSUMERD_SPLICE_EBADF);
434 break;
435 case EINVAL:
436 send_error(KCONSUMERD_SPLICE_EINVAL);
437 break;
438 case ENOMEM:
439 send_error(KCONSUMERD_SPLICE_ENOMEM);
440 break;
441 case ESPIPE:
442 send_error(KCONSUMERD_SPLICE_ESPIPE);
443 break;
444 }
445
446 end:
447 return ret;
448 }
449
450 /*
451 * read_subbuffer
452 *
453 * Consume data on a file descriptor and write it on a trace file
454 */
455 static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd)
456 {
457 unsigned long len;
458 int err;
459 long ret = 0;
460 int infd = kconsumerd_fd->consumerd_fd;
461
462 DBG("In read_subbuffer (infd : %d)", infd);
463 /* Get the next subbuffer */
464 err = kernctl_get_next_subbuf(infd);
465 if (err != 0) {
466 ret = errno;
467 perror("Reserving sub buffer failed (everything is normal, "
468 "it is due to concurrency)");
469 goto end;
470 }
471
472 if (DEFAULT_CHANNEL_OUTPUT == LTTNG_KERNEL_SPLICE) {
473 /* read the whole subbuffer */
474 err = kernctl_get_padded_subbuf_size(infd, &len);
475 if (err != 0) {
476 ret = errno;
477 perror("Getting sub-buffer len failed.");
478 goto end;
479 }
480
481 /* splice the subbuffer to the tracefile */
482 ret = on_read_subbuffer(kconsumerd_fd, len);
483 if (ret < 0) {
484 /*
485 * display the error but continue processing to try
486 * to release the subbuffer
487 */
488 ERR("Error splicing to tracefile");
489 }
490 } else if (DEFAULT_CHANNEL_OUTPUT == LTTNG_KERNEL_MMAP) {
491 /* read the used subbuffer size */
492 err = kernctl_get_subbuf_size(infd, &len);
493 if (err != 0) {
494 ret = errno;
495 perror("Getting sub-buffer len failed.");
496 goto end;
497 }
498
499 /* write the subbuffer to the tracefile */
500 ret = on_read_subbuffer_mmap(kconsumerd_fd, len);
501 if (ret < 0) {
502 /*
503 * display the error but continue processing to try
504 * to release the subbuffer
505 */
506 ERR("Error writing to tracefile");
507 }
508 } else {
509 ERR("Unknown output method");
510 ret = -1;
511 goto end;
512 }
513
514 err = kernctl_put_next_subbuf(infd);
515 if (err != 0) {
516 ret = errno;
517 if (errno == EFAULT) {
518 perror("Error in unreserving sub buffer\n");
519 } else if (errno == EIO) {
520 /* Should never happen with newer LTTng versions */
521 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
522 }
523 goto end;
524 }
525
526 end:
527 return ret;
528 }
529
530 /*
531 * change_fd_state
532 *
533 * Update a fd according to what we just received
534 */
535 static void change_fd_state(int sessiond_fd,
536 enum kconsumerd_fd_state state)
537 {
538 struct ltt_kconsumerd_fd *iter;
539 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
540 if (iter->sessiond_fd == sessiond_fd) {
541 iter->state = state;
542 break;
543 }
544 }
545 }
546
547 /*
548 * consumerd_recv_fd
549 *
550 * Receives an array of file descriptors and the associated
551 * structures describing each fd (path name).
552 * Returns the size of received data
553 */
554 static int consumerd_recv_fd(int sfd, int size,
555 enum kconsumerd_command cmd_type)
556 {
557 struct msghdr msg;
558 struct iovec iov[1];
559 int ret = 0, i, tmp2;
560 struct cmsghdr *cmsg;
561 int nb_fd;
562 char recv_fd[CMSG_SPACE(sizeof(int))];
563 struct lttcomm_kconsumerd_msg lkm;
564
565 /* the number of fds we are about to receive */
566 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
567
568 for (i = 0; i < nb_fd; i++) {
569 memset(&msg, 0, sizeof(msg));
570
571 /* Prepare to receive the structures */
572 iov[0].iov_base = &lkm;
573 iov[0].iov_len = sizeof(lkm);
574 msg.msg_iov = iov;
575 msg.msg_iovlen = 1;
576
577 msg.msg_control = recv_fd;
578 msg.msg_controllen = sizeof(recv_fd);
579
580 DBG("Waiting to receive fd");
581 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
582 perror("recvmsg");
583 continue;
584 }
585
586 if (ret != (size / nb_fd)) {
587 ERR("Received only %d, expected %d", ret, size);
588 send_error(KCONSUMERD_ERROR_RECV_FD);
589 goto end;
590 }
591
592 cmsg = CMSG_FIRSTHDR(&msg);
593 if (!cmsg) {
594 ERR("Invalid control message header");
595 ret = -1;
596 send_error(KCONSUMERD_ERROR_RECV_FD);
597 goto end;
598 }
599
600 /* if we received fds */
601 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
602 switch (cmd_type) {
603 case ADD_STREAM:
604 DBG("add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
605 ret = add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
606 if (ret < 0) {
607 send_error(KCONSUMERD_OUTFD_ERROR);
608 goto end;
609 }
610 break;
611 case UPDATE_STREAM:
612 change_fd_state(lkm.fd, lkm.state);
613 break;
614 default:
615 break;
616 }
617 /* flag to tell the polling thread to update its fd array */
618 update_fd_array = 1;
619 /* signal the poll thread */
620 tmp2 = write(poll_pipe[1], "4", 1);
621 } else {
622 ERR("Didn't received any fd");
623 send_error(KCONSUMERD_ERROR_RECV_FD);
624 ret = -1;
625 goto end;
626 }
627 }
628
629 end:
630 DBG("consumerd_recv_fd thread exiting");
631 return ret;
632 }
633
634 /*
635 * thread_receive_fds
636 *
637 * This thread listens on the consumerd socket and
638 * receives the file descriptors from ltt-sessiond
639 */
640 static void *thread_receive_fds(void *data)
641 {
642 int sock, client_socket, ret;
643 struct lttcomm_kconsumerd_header tmp;
644
645 DBG("Creating command socket %s", command_sock_path);
646 unlink(command_sock_path);
647 client_socket = lttcomm_create_unix_sock(command_sock_path);
648 if (client_socket < 0) {
649 ERR("Cannot create command socket");
650 goto end;
651 }
652
653 ret = lttcomm_listen_unix_sock(client_socket);
654 if (ret < 0) {
655 goto end;
656 }
657
658 DBG("Sending ready command to ltt-sessiond");
659 ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
660 if (ret < 0) {
661 ERR("Error sending ready command to ltt-sessiond");
662 goto end;
663 }
664
665 /* Blocking call, waiting for transmission */
666 sock = lttcomm_accept_unix_sock(client_socket);
667 if (sock <= 0) {
668 WARN("On accept");
669 goto end;
670 }
671 while (1) {
672 /* We first get the number of fd we are about to receive */
673 ret = lttcomm_recv_unix_sock(sock, &tmp,
674 sizeof(struct lttcomm_kconsumerd_header));
675 if (ret <= 0) {
676 ERR("Communication interrupted on command socket");
677 goto end;
678 }
679 if (tmp.cmd_type == STOP) {
680 DBG("Received STOP command");
681 goto end;
682 }
683 /* we received a command to add or update fds */
684 ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
685 if (ret <= 0) {
686 ERR("Receiving the FD, exiting");
687 goto end;
688 }
689 }
690
691 end:
692 DBG("thread_receive_fds exiting");
693 quit = 1;
694 ret = write(poll_pipe[1], "4", 1);
695 if (ret < 0) {
696 perror("poll pipe write");
697 }
698 return NULL;
699 }
700
701 /*
702 * update_poll_array
703 *
704 * Allocate the pollfd structure and the local view of the out fds
705 * to avoid doing a lookup in the linked list and concurrency issues
706 * when writing is needed.
707 * Returns the number of fds in the structures
708 */
709 static int update_poll_array(struct pollfd **pollfd,
710 struct ltt_kconsumerd_fd **local_kconsumerd_fd)
711 {
712 struct ltt_kconsumerd_fd *iter;
713 int i = 0;
714
715
716 DBG("Updating poll fd array");
717 pthread_mutex_lock(&kconsumerd_lock_fds);
718
719 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
720 DBG("Inside for each");
721 if (iter->state == ACTIVE_FD) {
722 DBG("Active FD %d", iter->consumerd_fd);
723 (*pollfd)[i].fd = iter->consumerd_fd;
724 (*pollfd)[i].events = POLLIN | POLLPRI;
725 local_kconsumerd_fd[i] = iter;
726 i++;
727 }
728 }
729 /*
730 * insert the poll_pipe at the end of the array and don't increment i
731 * so nb_fd is the number of real FD
732 */
733 (*pollfd)[i].fd = poll_pipe[0];
734 (*pollfd)[i].events = POLLIN;
735
736 update_fd_array = 0;
737 pthread_mutex_unlock(&kconsumerd_lock_fds);
738 return i;
739
740 }
741
742 /*
743 * thread_poll_fds
744 *
745 * This thread polls the fds in the ltt_fd_list to consume the data
746 * and write it to tracefile if necessary.
747 */
748 static void *thread_poll_fds(void *data)
749 {
750 int num_rdy, num_hup, high_prio, ret, i;
751 struct pollfd *pollfd = NULL;
752 /* local view of the fds */
753 struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
754 /* local view of fds_count */
755 int nb_fd = 0;
756 char tmp;
757 int tmp2;
758
759 ret = pipe(thread_pipe);
760 if (ret < 0) {
761 perror("Error creating pipe");
762 goto end;
763 }
764
765 local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
766
767 while (1) {
768 high_prio = 0;
769 num_hup = 0;
770
771 /*
772 * the ltt_fd_list has been updated, we need to update our
773 * local array as well
774 */
775 if (update_fd_array == 1) {
776 if (pollfd != NULL) {
777 free(pollfd);
778 pollfd = NULL;
779 }
780 if (local_kconsumerd_fd != NULL) {
781 free(local_kconsumerd_fd);
782 local_kconsumerd_fd = NULL;
783 }
784 /* allocate for all fds + 1 for the poll_pipe */
785 pollfd = malloc((fds_count + 1) * sizeof(struct pollfd));
786 if (pollfd == NULL) {
787 perror("pollfd malloc");
788 goto end;
789 }
790 /* allocate for all fds + 1 for the poll_pipe */
791 local_kconsumerd_fd = malloc((fds_count + 1) * sizeof(struct ltt_kconsumerd_fd));
792 if (local_kconsumerd_fd == NULL) {
793 perror("local_kconsumerd_fd malloc");
794 goto end;
795 }
796
797 ret = update_poll_array(&pollfd, local_kconsumerd_fd);
798 if (ret < 0) {
799 ERR("Error in allocating pollfd or local_outfds");
800 send_error(KCONSUMERD_POLL_ERROR);
801 goto end;
802 }
803 nb_fd = ret;
804 }
805
806 /* poll on the array of fds */
807 DBG("polling on %d fd", nb_fd + 1);
808 num_rdy = poll(pollfd, nb_fd + 1, -1);
809 DBG("poll num_rdy : %d", num_rdy);
810 if (num_rdy == -1) {
811 perror("Poll error");
812 send_error(KCONSUMERD_POLL_ERROR);
813 goto end;
814 }
815
816 /* No FDs and quit, cleanup the thread */
817 if (nb_fd == 0 && quit == 1) {
818 goto end;
819 }
820
821 /*
822 * if only the poll_pipe triggered poll to return just return to the
823 * beginning of the loop to update the array
824 */
825 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
826 DBG("poll_pipe wake up");
827 tmp2 = read(poll_pipe[0], &tmp, 1);
828 continue;
829 }
830
831 /* Take care of high priority channels first. */
832 for (i = 0; i < nb_fd; i++) {
833 switch(pollfd[i].revents) {
834 case POLLERR:
835 ERR("Error returned in polling fd %d.", pollfd[i].fd);
836 del_fd(local_kconsumerd_fd[i]);
837 update_fd_array = 1;
838 num_hup++;
839 break;
840 case POLLHUP:
841 ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
842 del_fd(local_kconsumerd_fd[i]);
843 update_fd_array = 1;
844 num_hup++;
845 break;
846 case POLLNVAL:
847 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
848 del_fd(local_kconsumerd_fd[i]);
849 update_fd_array = 1;
850 num_hup++;
851 break;
852 case POLLPRI:
853 DBG("Urgent read on fd %d", pollfd[i].fd);
854 high_prio = 1;
855 ret = read_subbuffer(local_kconsumerd_fd[i]);
856 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
857 if (ret == EAGAIN) {
858 ret = 0;
859 }
860 break;
861 }
862 }
863
864 /* If every buffer FD has hung up, we end the read loop here */
865 if (nb_fd > 0 && num_hup == nb_fd) {
866 DBG("every buffer FD has hung up\n");
867 if (quit == 1) {
868 goto end;
869 }
870 continue;
871 }
872
873 /* Take care of low priority channels. */
874 if (high_prio == 0) {
875 for (i = 0; i < nb_fd; i++) {
876 if (pollfd[i].revents == POLLIN) {
877 DBG("Normal read on fd %d", pollfd[i].fd);
878 ret = read_subbuffer(local_kconsumerd_fd[i]);
879 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
880 if (ret == EAGAIN) {
881 ret = 0;
882 }
883 }
884 }
885 }
886 }
887 end:
888 DBG("polling thread exiting");
889 if (pollfd != NULL) {
890 free(pollfd);
891 pollfd = NULL;
892 }
893 if (local_kconsumerd_fd != NULL) {
894 free(local_kconsumerd_fd);
895 local_kconsumerd_fd = NULL;
896 }
897 cleanup();
898 return NULL;
899 }
900
901 /*
902 * usage function on stderr
903 */
904 static void usage(void)
905 {
906 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
907 fprintf(stderr, " -h, --help "
908 "Display this usage.\n");
909 fprintf(stderr, " -c, --kconsumerd-cmd-sock PATH "
910 "Specify path for the command socket\n");
911 fprintf(stderr, " -e, --kconsumerd-err-sock PATH "
912 "Specify path for the error socket\n");
913 fprintf(stderr, " -d, --daemonize "
914 "Start as a daemon.\n");
915 fprintf(stderr, " -q, --quiet "
916 "No output at all.\n");
917 fprintf(stderr, " -v, --verbose "
918 "Verbose mode. Activate DBG() macro.\n");
919 fprintf(stderr, " -V, --version "
920 "Show version number.\n");
921 }
922
923 /*
924 * daemon argument parsing
925 */
926 static void parse_args(int argc, char **argv)
927 {
928 int c;
929
930 static struct option long_options[] = {
931 { "kconsumerd-cmd-sock", 1, 0, 'c' },
932 { "kconsumerd-err-sock", 1, 0, 'e' },
933 { "daemonize", 0, 0, 'd' },
934 { "help", 0, 0, 'h' },
935 { "quiet", 0, 0, 'q' },
936 { "verbose", 0, 0, 'v' },
937 { "version", 0, 0, 'V' },
938 { NULL, 0, 0, 0 }
939 };
940
941 while (1) {
942 int option_index = 0;
943 c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index);
944 if (c == -1) {
945 break;
946 }
947
948 switch (c) {
949 case 0:
950 fprintf(stderr, "option %s", long_options[option_index].name);
951 if (optarg) {
952 fprintf(stderr, " with arg %s\n", optarg);
953 }
954 break;
955 case 'c':
956 snprintf(command_sock_path, PATH_MAX, "%s", optarg);
957 break;
958 case 'e':
959 snprintf(error_sock_path, PATH_MAX, "%s", optarg);
960 break;
961 case 'd':
962 opt_daemon = 1;
963 break;
964 case 'h':
965 usage();
966 exit(EXIT_FAILURE);
967 case 'q':
968 opt_quiet = 1;
969 break;
970 case 'v':
971 opt_verbose = 1;
972 break;
973 case 'V':
974 fprintf(stdout, "%s\n", VERSION);
975 exit(EXIT_SUCCESS);
976 default:
977 usage();
978 exit(EXIT_FAILURE);
979 }
980 }
981 }
982
983
984 /*
985 * main
986 */
987 int main(int argc, char **argv)
988 {
989 int i;
990 int ret = 0;
991 void *status;
992
993 /* Parse arguments */
994 progname = argv[0];
995 parse_args(argc, argv);
996
997 /* Daemonize */
998 if (opt_daemon) {
999 ret = daemon(0, 0);
1000 if (ret < 0) {
1001 perror("daemon");
1002 goto error;
1003 }
1004 }
1005
1006 if (strlen(command_sock_path) == 0) {
1007 snprintf(command_sock_path, PATH_MAX,
1008 KCONSUMERD_CMD_SOCK_PATH);
1009 }
1010 if (strlen(error_sock_path) == 0) {
1011 snprintf(error_sock_path, PATH_MAX,
1012 KCONSUMERD_ERR_SOCK_PATH);
1013 }
1014
1015 if (set_signal_handler() < 0) {
1016 goto error;
1017 }
1018
1019 /* create the pipe to wake to polling thread when needed */
1020 ret = pipe(poll_pipe);
1021 if (ret < 0) {
1022 perror("Error creating poll pipe");
1023 goto end;
1024 }
1025
1026 /* Connect to the socket created by ltt-sessiond to report errors */
1027 DBG("Connecting to error socket %s", error_sock_path);
1028 error_socket = lttcomm_connect_unix_sock(error_sock_path);
1029 /* not a fatal error, but all communication with ltt-sessiond will fail */
1030 if (error_socket < 0) {
1031 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
1032 }
1033
1034 /* Create the thread to manage the receive of fd */
1035 ret = pthread_create(&threads[0], NULL, thread_receive_fds, (void *) NULL);
1036 if (ret != 0) {
1037 perror("pthread_create");
1038 goto error;
1039 }
1040
1041 /* Create thread to manage the polling/writing of traces */
1042 ret = pthread_create(&threads[1], NULL, thread_poll_fds, (void *) NULL);
1043 if (ret != 0) {
1044 perror("pthread_create");
1045 goto error;
1046 }
1047
1048 for (i = 0; i < 2; i++) {
1049 ret = pthread_join(threads[i], &status);
1050 if (ret != 0) {
1051 perror("pthread_join");
1052 goto error;
1053 }
1054 }
1055 ret = EXIT_SUCCESS;
1056 send_error(KCONSUMERD_EXIT_SUCCESS);
1057 goto end;
1058
1059 error:
1060 ret = EXIT_FAILURE;
1061 send_error(KCONSUMERD_EXIT_FAILURE);
1062
1063 end:
1064 cleanup();
1065
1066 return ret;
1067 }
This page took 0.083118 seconds and 5 git commands to generate.