Fix handling of multiple FDs
[lttng-tools.git] / kconsumerd / kconsumerd.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <fcntl.h>
22 #include <getopt.h>
23 #include <grp.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ipc.h>
31 #include <sys/shm.h>
32 #include <sys/socket.h>
33 #include <sys/stat.h>
34 #include <sys/types.h>
35 #include <urcu/list.h>
36 #include <poll.h>
37 #include <unistd.h>
38
39 #include "lttngerr.h"
40 #include "libkernelctl.h"
41 #include "liblttsessiondcomm.h"
42 #include "kconsumerd.h"
43
44 /* Init the list of FDs */
45 static struct ltt_kconsumerd_fd_list kconsumerd_fd_list = {
46 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
47 };
48
49 /* Number of element for the list below. */
50 static unsigned int fds_count;
51
52 /* If the local array of FDs needs update in the poll function */
53 static unsigned int update_fd_array = 1;
54
55 /* lock the fd array and structures */
56 static pthread_mutex_t kconsumerd_lock_fds;
57
58 /* the two threads (receive fd and poll) */
59 static pthread_t threads[2];
60
61 /* communication with splice */
62 static int thread_pipe[2];
63
64 /* socket to communicate errors with sessiond */
65 static int error_socket = -1;
66
67 /* Argument variables */
68 int opt_quiet;
69 int opt_verbose;
70 static int opt_daemon;
71 static const char *progname;
72 static char command_sock_path[PATH_MAX]; /* Global command socket path */
73 static char error_sock_path[PATH_MAX]; /* Global error path */
74
75 /*
76 * del_fd
77 *
78 * Remove a fd from the global list protected by a mutex
79 */
80 static void del_fd(struct ltt_kconsumerd_fd *lcf)
81 {
82 DBG("Removing %d", lcf->consumerd_fd);
83 pthread_mutex_lock(&kconsumerd_lock_fds);
84 cds_list_del(&lcf->list);
85 if (fds_count > 0) {
86 fds_count--;
87 DBG("Removed ltt_kconsumerd_fd");
88 if (lcf != NULL) {
89 close(lcf->out_fd);
90 close(lcf->consumerd_fd);
91 free(lcf);
92 lcf = NULL;
93 }
94 }
95 pthread_mutex_unlock(&kconsumerd_lock_fds);
96 }
97
98 /*
99 * cleanup
100 *
101 * Cleanup the daemon's socket on exit
102 */
103 static void cleanup()
104 {
105 struct ltt_kconsumerd_fd *iter;
106
107 /* remove the socket file */
108 unlink(command_sock_path);
109
110 /* unblock the threads */
111 WARN("Terminating the threads before exiting");
112 pthread_cancel(threads[0]);
113 pthread_cancel(threads[1]);
114
115 /* close all outfd */
116 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
117 del_fd(iter);
118 }
119 }
120
121 /*
122 * send_error
123 *
124 * send return code to ltt-sessiond
125 */
126 static int send_error(enum lttcomm_return_code cmd)
127 {
128 if (error_socket > 0) {
129 return lttcomm_send_unix_sock(error_socket, &cmd,
130 sizeof(enum lttcomm_sessiond_command));
131 } else {
132 return 0;
133 }
134 }
135
136 /*
137 * add_fd
138 *
139 * Add a fd to the global list protected by a mutex
140 */
141 static int add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
142 {
143 struct ltt_kconsumerd_fd *tmp_fd;
144 int ret;
145
146 tmp_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
147 tmp_fd->sessiond_fd = buf->fd;
148 tmp_fd->consumerd_fd = consumerd_fd;
149 tmp_fd->state = buf->state;
150 tmp_fd->max_sb_size = buf->max_sb_size;
151 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
152
153 /* Opening the tracefile in write mode */
154 DBG("Opening %s for writing", tmp_fd->path_name);
155 ret = open(tmp_fd->path_name,
156 O_WRONLY|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO);
157 if (ret < 0) {
158 ERR("Opening %s", tmp_fd->path_name);
159 perror("open");
160 goto end;
161 }
162 tmp_fd->out_fd = ret;
163 tmp_fd->out_fd_offset = 0;
164
165 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
166 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
167
168 pthread_mutex_lock(&kconsumerd_lock_fds);
169 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
170 fds_count++;
171 pthread_mutex_unlock(&kconsumerd_lock_fds);
172
173 end:
174 return ret;
175 }
176
177
178 /*
179 * sighandler
180 *
181 * Signal handler for the daemon
182 */
183 static void sighandler(int sig)
184 {
185 cleanup();
186
187 return;
188 }
189
190 /*
191 * set_signal_handler
192 *
193 * Setup signal handler for :
194 * SIGINT, SIGTERM, SIGPIPE
195 */
196 static int set_signal_handler(void)
197 {
198 int ret = 0;
199 struct sigaction sa;
200 sigset_t sigset;
201
202 if ((ret = sigemptyset(&sigset)) < 0) {
203 perror("sigemptyset");
204 return ret;
205 }
206
207 sa.sa_handler = sighandler;
208 sa.sa_mask = sigset;
209 sa.sa_flags = 0;
210 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
211 perror("sigaction");
212 return ret;
213 }
214
215 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
216 perror("sigaction");
217 return ret;
218 }
219
220 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
221 perror("sigaction");
222 return ret;
223 }
224
225 return ret;
226 }
227
228 /*
229 * on_read_subbuffer
230 *
231 * Splice the data from the ring buffer to the tracefile.
232 * Returns the number of bytes spliced
233 */
234 static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
235 unsigned long len)
236 {
237 long ret = 0;
238 loff_t offset = 0;
239 off_t orig_offset = kconsumerd_fd->out_fd_offset;
240 int fd = kconsumerd_fd->consumerd_fd;
241 int outfd = kconsumerd_fd->out_fd;
242
243 while (len > 0) {
244 DBG("splice chan to pipe offset %lu (fd : %d)",
245 (unsigned long)offset, fd);
246 ret = splice(fd, &offset, thread_pipe[1], NULL, len,
247 SPLICE_F_MOVE | SPLICE_F_MORE);
248 DBG("splice chan to pipe ret %ld", ret);
249 if (ret < 0) {
250 ret = errno;
251 perror("Error in relay splice");
252 goto splice_error;
253 }
254
255 ret = splice(thread_pipe[0], NULL, outfd, NULL, ret,
256 SPLICE_F_MOVE | SPLICE_F_MORE);
257 DBG("splice pipe to file %ld", ret);
258 if (ret < 0) {
259 ret = errno;
260 perror("Error in file splice");
261 goto splice_error;
262 }
263 if (ret >= len) {
264 len = 0;
265 }
266 /* This won't block, but will start writeout asynchronously */
267 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
268 SYNC_FILE_RANGE_WRITE);
269 kconsumerd_fd->out_fd_offset += ret;
270 }
271
272 /*
273 * This does a blocking write-and-wait on any page that belongs to the
274 * subbuffer prior to the one we just wrote.
275 * Don't care about error values, as these are just hints and ways to
276 * limit the amount of page cache used.
277 */
278 if (orig_offset >= kconsumerd_fd->max_sb_size) {
279 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
280 kconsumerd_fd->max_sb_size,
281 SYNC_FILE_RANGE_WAIT_BEFORE
282 | SYNC_FILE_RANGE_WRITE
283 | SYNC_FILE_RANGE_WAIT_AFTER);
284 /*
285 * Give hints to the kernel about how we access the file:
286 * POSIX_FADV_DONTNEED : we won't re-access data in a near
287 * future after we write it.
288 * We need to call fadvise again after the file grows because
289 * the kernel does not seem to apply fadvise to non-existing
290 * parts of the file.
291 * Call fadvise _after_ having waited for the page writeback to
292 * complete because the dirty page writeback semantic is not
293 * well defined. So it can be expected to lead to lower
294 * throughput in streaming.
295 */
296 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
297 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
298 }
299 goto end;
300
301 splice_error:
302 /* send the appropriate error description to sessiond */
303 switch(ret) {
304 case EBADF:
305 send_error(KCONSUMERD_SPLICE_EBADF);
306 break;
307 case EINVAL:
308 send_error(KCONSUMERD_SPLICE_EINVAL);
309 break;
310 case ENOMEM:
311 send_error(KCONSUMERD_SPLICE_ENOMEM);
312 break;
313 case ESPIPE:
314 send_error(KCONSUMERD_SPLICE_ESPIPE);
315 break;
316 }
317
318 end:
319 return ret;
320 }
321
322 /*
323 * read_subbuffer
324 *
325 * Consume data on a file descriptor and write it on a trace file
326 */
327 static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd)
328 {
329 unsigned long len;
330 int err;
331 long ret = 0;
332 int infd = kconsumerd_fd->consumerd_fd;
333
334 DBG("In read_subbuffer (infd : %d)", infd);
335 /* Get the next subbuffer */
336 err = kernctl_get_next_subbuf(infd);
337 if (err != 0) {
338 ret = errno;
339 perror("Reserving sub buffer failed (everything is normal, "
340 "it is due to concurrency)");
341 goto end;
342 }
343
344 /* read the whole subbuffer */
345 err = kernctl_get_padded_subbuf_size(infd, &len);
346 if (err != 0) {
347 ret = errno;
348 perror("Getting sub-buffer len failed.");
349 goto end;
350 }
351
352 /* splice the subbuffer to the tracefile */
353 ret = on_read_subbuffer(kconsumerd_fd, len);
354 if (ret < 0) {
355 /*
356 * display the error but continue processing to try
357 * to release the subbuffer
358 */
359 ERR("Error splicing to tracefile");
360 }
361
362 err = kernctl_put_next_subbuf(infd);
363 if (err != 0) {
364 ret = errno;
365 if (errno == EFAULT) {
366 perror("Error in unreserving sub buffer\n");
367 } else if (errno == EIO) {
368 /* Should never happen with newer LTTng versions */
369 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
370 }
371 goto end;
372 }
373
374 end:
375 return ret;
376 }
377
378 /*
379 * change_fd_state
380 *
381 * Update a fd according to what we just received
382 */
383 static void change_fd_state(int sessiond_fd,
384 enum lttcomm_kconsumerd_fd_state state)
385 {
386 struct ltt_kconsumerd_fd *iter;
387 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
388 if (iter->sessiond_fd == sessiond_fd) {
389 iter->state = state;
390 break;
391 }
392 }
393 }
394
395 /*
396 * consumerd_recv_fd
397 *
398 * Receives an array of file descriptors and the associated
399 * structures describing each fd (path name).
400 * Returns the size of received data
401 */
402 static int consumerd_recv_fd(int sfd, int size,
403 enum lttcomm_consumerd_command cmd_type)
404 {
405 struct msghdr msg;
406 struct iovec iov[1];
407 int ret, i;
408 struct cmsghdr *cmsg;
409 int nb_fd;
410 char tmp[CMSG_SPACE(size)];
411 struct lttcomm_kconsumerd_msg *buf;
412 /* the number of fds we are about to receive */
413 nb_fd = size/sizeof(struct lttcomm_kconsumerd_msg);
414
415 buf = malloc(size);
416
417 memset(&msg, 0, sizeof(msg));
418
419 /* Prepare to receive the structures */
420 iov[0].iov_base = buf;
421 iov[0].iov_len = size;
422 msg.msg_iov = iov;
423 msg.msg_iovlen = 1;
424
425 msg.msg_control = tmp;
426 msg.msg_controllen = sizeof(tmp);
427
428 DBG("Waiting to receive fds");
429 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
430 perror("recvmsg");
431 }
432 if (ret != size) {
433 ERR("Received only %d, expected %d", ret, size);
434 send_error(KCONSUMERD_ERROR_RECV_FD);
435 goto end;
436 }
437
438 cmsg = CMSG_FIRSTHDR(&msg);
439 if (!cmsg) {
440 ERR("Invalid control message header");
441 ret = -1;
442 send_error(KCONSUMERD_ERROR_RECV_FD);
443 goto end;
444 }
445
446 /* if we received fds */
447 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
448 DBG("Receive : expecting %d fds", nb_fd);
449 for (i = 0; i < nb_fd; i++) {
450 switch (cmd_type) {
451 case LTTCOMM_ADD_STREAM:
452 DBG("add_fd %s (%d)", buf[i].path_name, ((int *)CMSG_DATA(cmsg))[i]);
453 ret = add_fd(&buf[i], ((int *)CMSG_DATA(cmsg))[i]);
454 if (ret < 0) {
455 send_error(KCONSUMERD_OUTFD_ERROR);
456 goto end;
457 }
458 break;
459 case LTTCOMM_UPDATE_STREAM:
460 change_fd_state(buf[i].fd, buf[i].state);
461 break;
462 default:
463 break;
464 }
465 }
466 /* flag to tell the polling thread to update its fd array */
467 update_fd_array = 1;
468 send_error(KCONSUMERD_SUCCESS_RECV_FD);
469 } else {
470 ERR("Didn't received any fd");
471 send_error(KCONSUMERD_ERROR_RECV_FD);
472 ret = -1;
473 goto end;
474 }
475
476 end:
477 if (buf != NULL) {
478 free(buf);
479 buf = NULL;
480 }
481 return ret;
482 }
483
484 /*
485 * thread_receive_fds
486 *
487 * This thread listens on the consumerd socket and
488 * receives the file descriptors from ltt-sessiond
489 */
490 static void *thread_receive_fds(void *data)
491 {
492 int sock, client_socket, ret;
493 struct lttcomm_kconsumerd_header tmp;
494
495 DBG("Creating command socket %s", command_sock_path);
496 unlink(command_sock_path);
497 client_socket = lttcomm_create_unix_sock(command_sock_path);
498 if (client_socket < 0) {
499 ERR("Cannot create command socket");
500 goto error;
501 }
502
503 ret = lttcomm_listen_unix_sock(client_socket);
504 if (ret < 0) {
505 goto error;
506 }
507
508 DBG("Sending ready command to ltt-sessiond");
509 ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
510 if (ret < 0) {
511 ERR("Error sending ready command to ltt-sessiond");
512 goto error;
513 }
514
515 /* Blocking call, waiting for transmission */
516 sock = lttcomm_accept_unix_sock(client_socket);
517 if (sock <= 0) {
518 WARN("On accept, retrying");
519 goto error;
520 }
521 while (1) {
522 /* We first get the number of fd we are about to receive */
523 ret = lttcomm_recv_unix_sock(sock, &tmp,
524 sizeof(struct lttcomm_kconsumerd_header));
525 if (ret <= 0) {
526 ERR("Receiving the lttcomm_kconsumerd_header, exiting");
527 goto error;
528 }
529 ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
530 if (ret <= 0) {
531 ERR("Receiving the FD, exiting");
532 goto error;
533 }
534 }
535
536 error:
537 return NULL;
538 }
539
540 /*
541 * update_poll_array
542 *
543 * Allocate the pollfd structure and the local view of the out fds
544 * to avoid doing a lookup in the linked list and concurrency issues
545 * when writing is needed.
546 * Returns the number of fds in the structures
547 */
548 static int update_poll_array(struct pollfd **pollfd,
549 struct ltt_kconsumerd_fd **local_kconsumerd_fd)
550 {
551 struct ltt_kconsumerd_fd *iter;
552 int i = 0;
553
554 if (*pollfd != NULL) {
555 free(*pollfd);
556 *pollfd = NULL;
557 }
558
559 if (*local_kconsumerd_fd != NULL) {
560 free(*local_kconsumerd_fd);
561 *local_kconsumerd_fd = NULL;
562 }
563
564 *pollfd = malloc(fds_count * sizeof(struct pollfd));
565 if (*pollfd == NULL) {
566 perror("pollfd malloc");
567 goto error_mem;
568 }
569
570 *local_kconsumerd_fd = malloc(fds_count * sizeof(struct ltt_kconsumerd_fd));
571 if (*local_kconsumerd_fd == NULL) {
572 perror("local_kconsumerd_fd malloc");
573 goto error_mem;
574 }
575
576 DBG("Updating poll fd array");
577 pthread_mutex_lock(&kconsumerd_lock_fds);
578
579 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
580 DBG("Inside for each");
581 if (iter->state == ACTIVE_FD) {
582 DBG("Active FD %d", iter->consumerd_fd);
583 (*pollfd)[i].fd = iter->consumerd_fd;
584 (*pollfd)[i].events = POLLIN | POLLPRI;
585 local_kconsumerd_fd[i] = iter;
586 i++;
587 } else if (iter->state == DELETE_FD) {
588 del_fd(iter);
589 }
590 }
591 update_fd_array = 0;
592 pthread_mutex_unlock(&kconsumerd_lock_fds);
593 return i;
594
595 error_mem:
596 return -ENOMEM;
597 }
598
599 /*
600 * thread_poll_fds
601 *
602 * This thread polls the fds in the ltt_fd_list to consume the data
603 * and write it to tracefile if necessary.
604 */
605 static void *thread_poll_fds(void *data)
606 {
607 int num_rdy, num_hup, high_prio, ret, i;
608 struct pollfd *pollfd = NULL;
609 /* local view of the fds */
610 struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
611 /* local view of fds_count */
612 int nb_fd = 0;
613
614 ret = pipe(thread_pipe);
615 if (ret < 0) {
616 perror("Error creating pipe");
617 goto end;
618 }
619
620 local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
621
622 while (1) {
623 high_prio = 0;
624 num_hup = 0;
625
626 /*
627 * the ltt_fd_list has been updated, we need to update our
628 * local array as well
629 */
630 if (update_fd_array) {
631 ret = update_poll_array(&pollfd, local_kconsumerd_fd);
632 if (ret < 0) {
633 ERR("Error in allocating pollfd or local_outfds");
634 send_error(KCONSUMERD_POLL_ERROR);
635 goto end;
636 }
637 nb_fd = ret;
638 }
639
640 /* poll on the array of fds */
641 DBG("polling on %d fd", nb_fd);
642 num_rdy = poll(pollfd, nb_fd, POLL_TIMEOUT);
643 DBG("poll num_rdy : %d", num_rdy);
644 if (num_rdy == -1) {
645 perror("Poll error");
646 send_error(KCONSUMERD_POLL_ERROR);
647 goto end;
648 }
649
650 /* Take care of high priority channels first. */
651 for (i = 0; i < nb_fd; i++) {
652 switch(pollfd[i].revents) {
653 case POLLERR:
654 ERR("Error returned in polling fd %d.", pollfd[i].fd);
655 num_hup++;
656 send_error(KCONSUMERD_POLL_ERROR);
657 break;
658 case POLLHUP:
659 ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
660 num_hup++;
661 break;
662 case POLLNVAL:
663 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
664 send_error(KCONSUMERD_POLL_NVAL);
665 num_hup++;
666 break;
667 case POLLPRI:
668 DBG("Urgent read on fd %d", pollfd[i].fd);
669 high_prio = 1;
670 ret = read_subbuffer(local_kconsumerd_fd[i]);
671 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
672 if (ret == EAGAIN) {
673 ret = 0;
674 }
675 break;
676 }
677 }
678
679 /* If every buffer FD has hung up, we end the read loop here */
680 if (nb_fd > 0 && num_hup == nb_fd) {
681 DBG("every buffer FD has hung up\n");
682 send_error(KCONSUMERD_POLL_HUP);
683 goto end;
684 }
685
686 /* Take care of low priority channels. */
687 if (!high_prio) {
688 for (i = 0; i < nb_fd; i++) {
689 switch(pollfd[i].revents) {
690 case POLLIN:
691 DBG("Normal read on fd %d", pollfd[i].fd);
692 ret = read_subbuffer(local_kconsumerd_fd[i]);
693 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
694 if (ret == EAGAIN) {
695 ret = 0;
696 }
697 break;
698 }
699 }
700 }
701 }
702 end:
703 if (pollfd != NULL) {
704 free(pollfd);
705 pollfd = NULL;
706 }
707 if (local_kconsumerd_fd != NULL) {
708 free(local_kconsumerd_fd);
709 local_kconsumerd_fd = NULL;
710 }
711 cleanup();
712 return NULL;
713 }
714
715 /*
716 * usage function on stderr
717 */
718 static void usage(void)
719 {
720 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
721 fprintf(stderr, " -h, --help "
722 "Display this usage.\n");
723 fprintf(stderr, " -c, --kconsumerd-cmd-sock PATH "
724 "Specify path for the command socket\n");
725 fprintf(stderr, " -e, --kconsumerd-err-sock PATH "
726 "Specify path for the error socket\n");
727 fprintf(stderr, " -d, --daemonize "
728 "Start as a daemon.\n");
729 fprintf(stderr, " -q, --quiet "
730 "No output at all.\n");
731 fprintf(stderr, " -v, --verbose "
732 "Verbose mode. Activate DBG() macro.\n");
733 fprintf(stderr, " -V, --version "
734 "Show version number.\n");
735 }
736
737 /*
738 * daemon argument parsing
739 */
740 static void parse_args(int argc, char **argv)
741 {
742 int c;
743
744 static struct option long_options[] = {
745 { "kconsumerd-cmd-sock", 1, 0, 'c' },
746 { "kconsumerd-err-sock", 1, 0, 'e' },
747 { "daemonize", 0, 0, 'd' },
748 { "help", 0, 0, 'h' },
749 { "quiet", 0, 0, 'q' },
750 { "verbose", 0, 0, 'v' },
751 { "version", 0, 0, 'V' },
752 { NULL, 0, 0, 0 }
753 };
754
755 while (1) {
756 int option_index = 0;
757 c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index);
758 if (c == -1) {
759 break;
760 }
761
762 switch (c) {
763 case 0:
764 fprintf(stderr, "option %s", long_options[option_index].name);
765 if (optarg) {
766 fprintf(stderr, " with arg %s\n", optarg);
767 }
768 break;
769 case 'c':
770 snprintf(command_sock_path, PATH_MAX, "%s", optarg);
771 break;
772 case 'e':
773 snprintf(error_sock_path, PATH_MAX, "%s", optarg);
774 break;
775 case 'd':
776 opt_daemon = 1;
777 break;
778 case 'h':
779 usage();
780 exit(EXIT_FAILURE);
781 case 'q':
782 opt_quiet = 1;
783 break;
784 case 'v':
785 opt_verbose = 1;
786 break;
787 case 'V':
788 fprintf(stdout, "%s\n", VERSION);
789 exit(EXIT_SUCCESS);
790 default:
791 usage();
792 exit(EXIT_FAILURE);
793 }
794 }
795 }
796
797
798 /*
799 * main
800 */
801 int main(int argc, char **argv)
802 {
803 int i;
804 int ret = 0;
805 void *status;
806
807 /* Parse arguments */
808 progname = argv[0];
809 parse_args(argc, argv);
810
811 /* Daemonize */
812 if (opt_daemon) {
813 ret = daemon(0, 0);
814 if (ret < 0) {
815 perror("daemon");
816 goto error;
817 }
818 }
819
820 if (strlen(command_sock_path) == 0) {
821 snprintf(command_sock_path, PATH_MAX,
822 KCONSUMERD_CMD_SOCK_PATH);
823 }
824 if (strlen(error_sock_path) == 0) {
825 snprintf(error_sock_path, PATH_MAX,
826 KCONSUMERD_ERR_SOCK_PATH);
827 }
828
829 if (set_signal_handler() < 0) {
830 goto error;
831 }
832
833 /* Connect to the socket created by ltt-sessiond to report errors */
834 DBG("Connecting to error socket %s", error_sock_path);
835 error_socket = lttcomm_connect_unix_sock(error_sock_path);
836 /* not a fatal error, but all communication with ltt-sessiond will fail */
837 if (error_socket < 0) {
838 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
839 }
840
841 /* Create the thread to manage the receive of fd */
842 ret = pthread_create(&threads[0], NULL, thread_receive_fds, (void *) NULL);
843 if (ret != 0) {
844 perror("pthread_create");
845 goto error;
846 }
847
848 /* Create thread to manage the polling/writing of traces */
849 ret = pthread_create(&threads[1], NULL, thread_poll_fds, (void *) NULL);
850 if (ret != 0) {
851 perror("pthread_create");
852 goto error;
853 }
854
855 for (i = 0; i < 2; i++) {
856 ret = pthread_join(threads[i], &status);
857 if (ret != 0) {
858 perror("pthread_join");
859 goto error;
860 }
861 }
862 ret = EXIT_SUCCESS;
863 send_error(KCONSUMERD_EXIT_SUCCESS);
864 goto end;
865
866 error:
867 ret = EXIT_FAILURE;
868 send_error(KCONSUMERD_EXIT_FAILURE);
869
870 end:
871 cleanup();
872
873 return ret;
874 }
This page took 0.076498 seconds and 5 git commands to generate.