Add channel output method selection
[lttng-tools.git] / kconsumerd / kconsumerd.c
CommitLineData
d4a1283e
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <getopt.h>
23#include <grp.h>
24#include <limits.h>
25#include <pthread.h>
26#include <signal.h>
27#include <stdio.h>
28#include <stdlib.h>
29#include <string.h>
30#include <sys/ipc.h>
31#include <sys/shm.h>
32#include <sys/socket.h>
33#include <sys/stat.h>
34#include <sys/types.h>
35#include <urcu/list.h>
36#include <poll.h>
37#include <unistd.h>
38
39#include "lttngerr.h"
40#include "libkernelctl.h"
41#include "liblttsessiondcomm.h"
42#include "kconsumerd.h"
43
44/* Init the list of FDs */
45static struct ltt_kconsumerd_fd_list kconsumerd_fd_list = {
46 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
47};
48
49/* Number of element for the list below. */
50static unsigned int fds_count;
51
52/* If the local array of FDs needs update in the poll function */
53static unsigned int update_fd_array = 1;
54
55/* lock the fd array and structures */
56static pthread_mutex_t kconsumerd_lock_fds;
57
58/* the two threads (receive fd and poll) */
59static pthread_t threads[2];
60
61/* communication with splice */
62static int thread_pipe[2];
63
252fd492
JD
64/* pipe to wake the poll thread when necessary */
65static int poll_pipe[2];
66
d4a1283e
JD
67/* socket to communicate errors with sessiond */
68static int error_socket = -1;
69
13e44745
JD
70/* to count the number of time the user pressed ctrl+c */
71static int sigintcount = 0;
72
9d26659a
JD
73/* flag to inform the polling thread to quit when all fd hung up */
74static int quit = 0;
75
d4a1283e
JD
76/* Argument variables */
77int opt_quiet;
78int opt_verbose;
79static int opt_daemon;
80static const char *progname;
81static char command_sock_path[PATH_MAX]; /* Global command socket path */
82static char error_sock_path[PATH_MAX]; /* Global error path */
83
bcd8d9db
JD
84/*
85 * del_fd
86 *
87 * Remove a fd from the global list protected by a mutex
88 */
89static void del_fd(struct ltt_kconsumerd_fd *lcf)
90{
6aea26bc 91 DBG("Removing %d", lcf->consumerd_fd);
bcd8d9db
JD
92 pthread_mutex_lock(&kconsumerd_lock_fds);
93 cds_list_del(&lcf->list);
94 if (fds_count > 0) {
95 fds_count--;
96 DBG("Removed ltt_kconsumerd_fd");
97 if (lcf != NULL) {
98 close(lcf->out_fd);
99 close(lcf->consumerd_fd);
100 free(lcf);
101 lcf = NULL;
102 }
103 }
104 pthread_mutex_unlock(&kconsumerd_lock_fds);
105}
106
d4a1283e
JD
107/*
108 * cleanup
109 *
110 * Cleanup the daemon's socket on exit
111 */
112static void cleanup()
113{
bcd8d9db
JD
114 struct ltt_kconsumerd_fd *iter;
115
bcd8d9db 116 /* remove the socket file */
d4a1283e 117 unlink(command_sock_path);
bcd8d9db
JD
118
119 /* unblock the threads */
120 WARN("Terminating the threads before exiting");
121 pthread_cancel(threads[0]);
122 pthread_cancel(threads[1]);
123
124 /* close all outfd */
125 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
126 del_fd(iter);
127 }
d4a1283e
JD
128}
129
6aea26bc
JD
130/*
131 * send_error
d4a1283e
JD
132 *
133 * send return code to ltt-sessiond
134 */
135static int send_error(enum lttcomm_return_code cmd)
136{
137 if (error_socket > 0) {
138 return lttcomm_send_unix_sock(error_socket, &cmd,
139 sizeof(enum lttcomm_sessiond_command));
140 } else {
141 return 0;
142 }
143}
144
d4a1283e
JD
145/*
146 * add_fd
147 *
148 * Add a fd to the global list protected by a mutex
149 */
150static int add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
151{
152 struct ltt_kconsumerd_fd *tmp_fd;
153 int ret;
154
155 tmp_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
156 tmp_fd->sessiond_fd = buf->fd;
157 tmp_fd->consumerd_fd = consumerd_fd;
158 tmp_fd->state = buf->state;
159 tmp_fd->max_sb_size = buf->max_sb_size;
160 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
161
162 /* Opening the tracefile in write mode */
163 DBG("Opening %s for writing", tmp_fd->path_name);
164 ret = open(tmp_fd->path_name,
46258765 165 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
d4a1283e
JD
166 if (ret < 0) {
167 ERR("Opening %s", tmp_fd->path_name);
168 perror("open");
169 goto end;
170 }
171 tmp_fd->out_fd = ret;
172 tmp_fd->out_fd_offset = 0;
173
174 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
175 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
176
177 pthread_mutex_lock(&kconsumerd_lock_fds);
178 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
179 fds_count++;
180 pthread_mutex_unlock(&kconsumerd_lock_fds);
181
182end:
183 return ret;
184}
185
d4a1283e
JD
186
187/*
188 * sighandler
189 *
190 * Signal handler for the daemon
191 */
192static void sighandler(int sig)
193{
13e44745
JD
194 if (sig == SIGINT && sigintcount++ == 0) {
195 DBG("ignoring first SIGINT");
196 return;
197 }
198
d4a1283e
JD
199 cleanup();
200
201 return;
202}
203
204/*
205 * set_signal_handler
206 *
207 * Setup signal handler for :
208 * SIGINT, SIGTERM, SIGPIPE
209 */
210static int set_signal_handler(void)
211{
212 int ret = 0;
213 struct sigaction sa;
214 sigset_t sigset;
215
216 if ((ret = sigemptyset(&sigset)) < 0) {
217 perror("sigemptyset");
218 return ret;
219 }
220
221 sa.sa_handler = sighandler;
222 sa.sa_mask = sigset;
223 sa.sa_flags = 0;
224 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
225 perror("sigaction");
226 return ret;
227 }
228
229 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
230 perror("sigaction");
231 return ret;
232 }
233
234 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
235 perror("sigaction");
236 return ret;
237 }
238
239 return ret;
240}
241
242/*
243 * on_read_subbuffer
244 *
245 * Splice the data from the ring buffer to the tracefile.
246 * Returns the number of bytes spliced
247 */
248static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
249 unsigned long len)
250{
251 long ret = 0;
252 loff_t offset = 0;
253 off_t orig_offset = kconsumerd_fd->out_fd_offset;
254 int fd = kconsumerd_fd->consumerd_fd;
255 int outfd = kconsumerd_fd->out_fd;
256
257 while (len > 0) {
258 DBG("splice chan to pipe offset %lu (fd : %d)",
259 (unsigned long)offset, fd);
260 ret = splice(fd, &offset, thread_pipe[1], NULL, len,
261 SPLICE_F_MOVE | SPLICE_F_MORE);
262 DBG("splice chan to pipe ret %ld", ret);
263 if (ret < 0) {
0632499a 264 ret = errno;
d4a1283e 265 perror("Error in relay splice");
0632499a 266 goto splice_error;
d4a1283e
JD
267 }
268
269 ret = splice(thread_pipe[0], NULL, outfd, NULL, ret,
270 SPLICE_F_MOVE | SPLICE_F_MORE);
271 DBG("splice pipe to file %ld", ret);
272 if (ret < 0) {
0632499a 273 ret = errno;
d4a1283e 274 perror("Error in file splice");
0632499a 275 goto splice_error;
d4a1283e
JD
276 }
277 if (ret >= len) {
278 len = 0;
279 }
280 /* This won't block, but will start writeout asynchronously */
281 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
282 SYNC_FILE_RANGE_WRITE);
283 kconsumerd_fd->out_fd_offset += ret;
284 }
0632499a 285
d4a1283e
JD
286 /*
287 * This does a blocking write-and-wait on any page that belongs to the
288 * subbuffer prior to the one we just wrote.
289 * Don't care about error values, as these are just hints and ways to
290 * limit the amount of page cache used.
291 */
292 if (orig_offset >= kconsumerd_fd->max_sb_size) {
293 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
294 kconsumerd_fd->max_sb_size,
295 SYNC_FILE_RANGE_WAIT_BEFORE
296 | SYNC_FILE_RANGE_WRITE
297 | SYNC_FILE_RANGE_WAIT_AFTER);
298 /*
299 * Give hints to the kernel about how we access the file:
300 * POSIX_FADV_DONTNEED : we won't re-access data in a near
301 * future after we write it.
302 * We need to call fadvise again after the file grows because
303 * the kernel does not seem to apply fadvise to non-existing
304 * parts of the file.
305 * Call fadvise _after_ having waited for the page writeback to
306 * complete because the dirty page writeback semantic is not
307 * well defined. So it can be expected to lead to lower
308 * throughput in streaming.
309 */
310 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
311 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
312 }
0632499a
JD
313 goto end;
314
315splice_error:
316 /* send the appropriate error description to sessiond */
317 switch(ret) {
914a571b
JD
318 case EBADF:
319 send_error(KCONSUMERD_SPLICE_EBADF);
320 break;
321 case EINVAL:
322 send_error(KCONSUMERD_SPLICE_EINVAL);
323 break;
324 case ENOMEM:
325 send_error(KCONSUMERD_SPLICE_ENOMEM);
326 break;
327 case ESPIPE:
328 send_error(KCONSUMERD_SPLICE_ESPIPE);
329 break;
0632499a
JD
330 }
331
332end:
d4a1283e
JD
333 return ret;
334}
335
336/*
337 * read_subbuffer
338 *
339 * Consume data on a file descriptor and write it on a trace file
340 */
341static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd)
342{
343 unsigned long len;
344 int err;
345 long ret = 0;
346 int infd = kconsumerd_fd->consumerd_fd;
347
6aea26bc 348 DBG("In read_subbuffer (infd : %d)", infd);
d4a1283e
JD
349 /* Get the next subbuffer */
350 err = kernctl_get_next_subbuf(infd);
351 if (err != 0) {
352 ret = errno;
353 perror("Reserving sub buffer failed (everything is normal, "
354 "it is due to concurrency)");
355 goto end;
356 }
357
358 /* read the whole subbuffer */
359 err = kernctl_get_padded_subbuf_size(infd, &len);
360 if (err != 0) {
361 ret = errno;
362 perror("Getting sub-buffer len failed.");
363 goto end;
364 }
365
366 /* splice the subbuffer to the tracefile */
367 ret = on_read_subbuffer(kconsumerd_fd, len);
368 if (ret < 0) {
369 /*
370 * display the error but continue processing to try
371 * to release the subbuffer
372 */
373 ERR("Error splicing to tracefile");
374 }
375
376 err = kernctl_put_next_subbuf(infd);
377 if (err != 0) {
378 ret = errno;
379 if (errno == EFAULT) {
380 perror("Error in unreserving sub buffer\n");
381 } else if (errno == EIO) {
382 /* Should never happen with newer LTTng versions */
383 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
384 }
385 goto end;
386 }
387
388end:
389 return ret;
390}
391
392/*
393 * change_fd_state
394 *
395 * Update a fd according to what we just received
396 */
397static void change_fd_state(int sessiond_fd,
5dc18550 398 enum kconsumerd_fd_state state)
d4a1283e
JD
399{
400 struct ltt_kconsumerd_fd *iter;
401 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
402 if (iter->sessiond_fd == sessiond_fd) {
403 iter->state = state;
404 break;
405 }
406 }
407}
408
409/*
410 * consumerd_recv_fd
411 *
412 * Receives an array of file descriptors and the associated
413 * structures describing each fd (path name).
414 * Returns the size of received data
415 */
416static int consumerd_recv_fd(int sfd, int size,
5dc18550 417 enum kconsumerd_command cmd_type)
d4a1283e
JD
418{
419 struct msghdr msg;
420 struct iovec iov[1];
33a2b854 421 int ret = 0, i, tmp2;
d4a1283e
JD
422 struct cmsghdr *cmsg;
423 int nb_fd;
33a2b854
DG
424 char recv_fd[CMSG_SPACE(sizeof(int))];
425 struct lttcomm_kconsumerd_msg lkm;
426
d4a1283e 427 /* the number of fds we are about to receive */
33a2b854 428 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
d4a1283e 429
33a2b854
DG
430 for (i = 0; i < nb_fd; i++) {
431 memset(&msg, 0, sizeof(msg));
d4a1283e 432
33a2b854
DG
433 /* Prepare to receive the structures */
434 iov[0].iov_base = &lkm;
435 iov[0].iov_len = sizeof(lkm);
436 msg.msg_iov = iov;
437 msg.msg_iovlen = 1;
d4a1283e 438
33a2b854
DG
439 msg.msg_control = recv_fd;
440 msg.msg_controllen = sizeof(recv_fd);
d4a1283e 441
33a2b854
DG
442 DBG("Waiting to receive fd");
443 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
444 perror("recvmsg");
445 continue;
446 }
d4a1283e 447
33a2b854
DG
448 if (ret != (size / nb_fd)) {
449 ERR("Received only %d, expected %d", ret, size);
450 send_error(KCONSUMERD_ERROR_RECV_FD);
451 goto end;
452 }
d4a1283e 453
33a2b854
DG
454 cmsg = CMSG_FIRSTHDR(&msg);
455 if (!cmsg) {
456 ERR("Invalid control message header");
457 ret = -1;
458 send_error(KCONSUMERD_ERROR_RECV_FD);
459 goto end;
460 }
d4a1283e 461
33a2b854
DG
462 /* if we received fds */
463 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
d4a1283e 464 switch (cmd_type) {
5dc18550 465 case ADD_STREAM:
33a2b854
DG
466 DBG("add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
467 ret = add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
914a571b
JD
468 if (ret < 0) {
469 send_error(KCONSUMERD_OUTFD_ERROR);
470 goto end;
471 }
472 break;
5dc18550 473 case UPDATE_STREAM:
33a2b854 474 change_fd_state(lkm.fd, lkm.state);
914a571b
JD
475 break;
476 default:
477 break;
d4a1283e 478 }
33a2b854
DG
479 /* flag to tell the polling thread to update its fd array */
480 update_fd_array = 1;
481 /* signal the poll thread */
482 tmp2 = write(poll_pipe[1], "4", 1);
483 } else {
484 ERR("Didn't received any fd");
485 send_error(KCONSUMERD_ERROR_RECV_FD);
486 ret = -1;
487 goto end;
d4a1283e 488 }
d4a1283e
JD
489 }
490
491end:
9d26659a 492 DBG("consumerd_recv_fd thread exiting");
d4a1283e
JD
493 return ret;
494}
495
496/*
497 * thread_receive_fds
498 *
499 * This thread listens on the consumerd socket and
500 * receives the file descriptors from ltt-sessiond
501 */
502static void *thread_receive_fds(void *data)
503{
504 int sock, client_socket, ret;
505 struct lttcomm_kconsumerd_header tmp;
506
507 DBG("Creating command socket %s", command_sock_path);
508 unlink(command_sock_path);
509 client_socket = lttcomm_create_unix_sock(command_sock_path);
510 if (client_socket < 0) {
511 ERR("Cannot create command socket");
9d26659a 512 goto end;
d4a1283e
JD
513 }
514
515 ret = lttcomm_listen_unix_sock(client_socket);
516 if (ret < 0) {
9d26659a 517 goto end;
d4a1283e
JD
518 }
519
520 DBG("Sending ready command to ltt-sessiond");
521 ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
522 if (ret < 0) {
523 ERR("Error sending ready command to ltt-sessiond");
9d26659a 524 goto end;
d4a1283e
JD
525 }
526
7e8c38c6
JD
527 /* Blocking call, waiting for transmission */
528 sock = lttcomm_accept_unix_sock(client_socket);
529 if (sock <= 0) {
4abc0780 530 WARN("On accept");
9d26659a 531 goto end;
7e8c38c6 532 }
d4a1283e 533 while (1) {
d4a1283e
JD
534 /* We first get the number of fd we are about to receive */
535 ret = lttcomm_recv_unix_sock(sock, &tmp,
536 sizeof(struct lttcomm_kconsumerd_header));
6aea26bc 537 if (ret <= 0) {
9d26659a
JD
538 ERR("Communication interrupted on command socket");
539 goto end;
540 }
541 if (tmp.cmd_type == STOP) {
542 DBG("Received STOP command");
9d26659a 543 goto end;
d4a1283e 544 }
9d26659a 545 /* we received a command to add or update fds */
d4a1283e 546 ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
6aea26bc 547 if (ret <= 0) {
bcd8d9db 548 ERR("Receiving the FD, exiting");
9d26659a 549 goto end;
d4a1283e
JD
550 }
551 }
552
9d26659a
JD
553end:
554 DBG("thread_receive_fds exiting");
d13606b9
DG
555 quit = 1;
556 ret = write(poll_pipe[1], "4", 1);
557 if (ret < 0) {
558 perror("poll pipe write");
559 }
d4a1283e
JD
560 return NULL;
561}
562
563/*
564 * update_poll_array
565 *
566 * Allocate the pollfd structure and the local view of the out fds
567 * to avoid doing a lookup in the linked list and concurrency issues
568 * when writing is needed.
569 * Returns the number of fds in the structures
570 */
571static int update_poll_array(struct pollfd **pollfd,
572 struct ltt_kconsumerd_fd **local_kconsumerd_fd)
573{
574 struct ltt_kconsumerd_fd *iter;
575 int i = 0;
576
d4a1283e
JD
577
578 DBG("Updating poll fd array");
579 pthread_mutex_lock(&kconsumerd_lock_fds);
580
581 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
582 DBG("Inside for each");
583 if (iter->state == ACTIVE_FD) {
584 DBG("Active FD %d", iter->consumerd_fd);
1b686c3f
JD
585 (*pollfd)[i].fd = iter->consumerd_fd;
586 (*pollfd)[i].events = POLLIN | POLLPRI;
d4a1283e
JD
587 local_kconsumerd_fd[i] = iter;
588 i++;
d4a1283e
JD
589 }
590 }
252fd492
JD
591 /*
592 * insert the poll_pipe at the end of the array and don't increment i
593 * so nb_fd is the number of real FD
594 */
595 (*pollfd)[i].fd = poll_pipe[0];
596 (*pollfd)[i].events = POLLIN;
597
d4a1283e
JD
598 update_fd_array = 0;
599 pthread_mutex_unlock(&kconsumerd_lock_fds);
600 return i;
601
d4a1283e
JD
602}
603
604/*
605 * thread_poll_fds
606 *
607 * This thread polls the fds in the ltt_fd_list to consume the data
608 * and write it to tracefile if necessary.
609 */
610static void *thread_poll_fds(void *data)
611{
612 int num_rdy, num_hup, high_prio, ret, i;
613 struct pollfd *pollfd = NULL;
614 /* local view of the fds */
6aea26bc 615 struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
d4a1283e
JD
616 /* local view of fds_count */
617 int nb_fd = 0;
252fd492
JD
618 char tmp;
619 int tmp2;
d4a1283e
JD
620
621 ret = pipe(thread_pipe);
622 if (ret < 0) {
623 perror("Error creating pipe");
624 goto end;
625 }
626
6aea26bc
JD
627 local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
628
d4a1283e
JD
629 while (1) {
630 high_prio = 0;
631 num_hup = 0;
632
633 /*
634 * the ltt_fd_list has been updated, we need to update our
635 * local array as well
636 */
914a571b 637 if (update_fd_array == 1) {
4abc0780
JD
638 if (pollfd != NULL) {
639 free(pollfd);
640 pollfd = NULL;
641 }
642 if (local_kconsumerd_fd != NULL) {
643 free(local_kconsumerd_fd);
644 local_kconsumerd_fd = NULL;
645 }
646 /* allocate for all fds + 1 for the poll_pipe */
647 pollfd = malloc((fds_count + 1) * sizeof(struct pollfd));
648 if (pollfd == NULL) {
649 perror("pollfd malloc");
650 goto end;
651 }
652 /* allocate for all fds + 1 for the poll_pipe */
653 local_kconsumerd_fd = malloc((fds_count + 1) * sizeof(struct ltt_kconsumerd_fd));
654 if (local_kconsumerd_fd == NULL) {
655 perror("local_kconsumerd_fd malloc");
656 goto end;
657 }
658
6aea26bc 659 ret = update_poll_array(&pollfd, local_kconsumerd_fd);
d4a1283e
JD
660 if (ret < 0) {
661 ERR("Error in allocating pollfd or local_outfds");
662 send_error(KCONSUMERD_POLL_ERROR);
663 goto end;
664 }
665 nb_fd = ret;
666 }
667
668 /* poll on the array of fds */
252fd492
JD
669 DBG("polling on %d fd", nb_fd + 1);
670 num_rdy = poll(pollfd, nb_fd + 1, -1);
d4a1283e
JD
671 DBG("poll num_rdy : %d", num_rdy);
672 if (num_rdy == -1) {
673 perror("Poll error");
674 send_error(KCONSUMERD_POLL_ERROR);
675 goto end;
676 }
677
d13606b9
DG
678 /* No FDs and quit, cleanup the thread */
679 if (nb_fd == 0 && quit == 1) {
680 goto end;
681 }
682
252fd492
JD
683 /*
684 * if only the poll_pipe triggered poll to return just return to the
685 * beginning of the loop to update the array
686 */
687 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
688 DBG("poll_pipe wake up");
689 tmp2 = read(poll_pipe[0], &tmp, 1);
690 continue;
691 }
692
d4a1283e
JD
693 /* Take care of high priority channels first. */
694 for (i = 0; i < nb_fd; i++) {
695 switch(pollfd[i].revents) {
914a571b
JD
696 case POLLERR:
697 ERR("Error returned in polling fd %d.", pollfd[i].fd);
9d26659a
JD
698 del_fd(local_kconsumerd_fd[i]);
699 update_fd_array = 1;
914a571b 700 num_hup++;
914a571b
JD
701 break;
702 case POLLHUP:
703 ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
9d26659a
JD
704 del_fd(local_kconsumerd_fd[i]);
705 update_fd_array = 1;
914a571b
JD
706 num_hup++;
707 break;
708 case POLLNVAL:
709 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
9d26659a
JD
710 del_fd(local_kconsumerd_fd[i]);
711 update_fd_array = 1;
914a571b
JD
712 num_hup++;
713 break;
714 case POLLPRI:
715 DBG("Urgent read on fd %d", pollfd[i].fd);
716 high_prio = 1;
717 ret = read_subbuffer(local_kconsumerd_fd[i]);
718 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
719 if (ret == EAGAIN) {
720 ret = 0;
721 }
722 break;
d4a1283e
JD
723 }
724 }
725
726 /* If every buffer FD has hung up, we end the read loop here */
727 if (nb_fd > 0 && num_hup == nb_fd) {
728 DBG("every buffer FD has hung up\n");
9d26659a
JD
729 if (quit == 1) {
730 goto end;
731 }
732 continue;
d4a1283e
JD
733 }
734
735 /* Take care of low priority channels. */
914a571b 736 if (high_prio == 0) {
d4a1283e 737 for (i = 0; i < nb_fd; i++) {
914a571b
JD
738 if (pollfd[i].revents == POLLIN) {
739 DBG("Normal read on fd %d", pollfd[i].fd);
740 ret = read_subbuffer(local_kconsumerd_fd[i]);
741 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
742 if (ret == EAGAIN) {
743 ret = 0;
744 }
d4a1283e
JD
745 }
746 }
747 }
748 }
749end:
9d26659a 750 DBG("polling thread exiting");
d4a1283e
JD
751 if (pollfd != NULL) {
752 free(pollfd);
753 pollfd = NULL;
754 }
755 if (local_kconsumerd_fd != NULL) {
756 free(local_kconsumerd_fd);
757 local_kconsumerd_fd = NULL;
758 }
bcd8d9db 759 cleanup();
d4a1283e
JD
760 return NULL;
761}
762
763/*
764 * usage function on stderr
765 */
766static void usage(void)
767{
768 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
769 fprintf(stderr, " -h, --help "
770 "Display this usage.\n");
771 fprintf(stderr, " -c, --kconsumerd-cmd-sock PATH "
772 "Specify path for the command socket\n");
773 fprintf(stderr, " -e, --kconsumerd-err-sock PATH "
774 "Specify path for the error socket\n");
775 fprintf(stderr, " -d, --daemonize "
776 "Start as a daemon.\n");
777 fprintf(stderr, " -q, --quiet "
778 "No output at all.\n");
779 fprintf(stderr, " -v, --verbose "
780 "Verbose mode. Activate DBG() macro.\n");
781 fprintf(stderr, " -V, --version "
782 "Show version number.\n");
783}
784
785/*
786 * daemon argument parsing
787 */
788static void parse_args(int argc, char **argv)
789{
790 int c;
791
792 static struct option long_options[] = {
793 { "kconsumerd-cmd-sock", 1, 0, 'c' },
794 { "kconsumerd-err-sock", 1, 0, 'e' },
795 { "daemonize", 0, 0, 'd' },
796 { "help", 0, 0, 'h' },
797 { "quiet", 0, 0, 'q' },
798 { "verbose", 0, 0, 'v' },
799 { "version", 0, 0, 'V' },
800 { NULL, 0, 0, 0 }
801 };
802
803 while (1) {
804 int option_index = 0;
805 c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index);
806 if (c == -1) {
807 break;
808 }
809
810 switch (c) {
914a571b
JD
811 case 0:
812 fprintf(stderr, "option %s", long_options[option_index].name);
813 if (optarg) {
814 fprintf(stderr, " with arg %s\n", optarg);
815 }
816 break;
817 case 'c':
818 snprintf(command_sock_path, PATH_MAX, "%s", optarg);
819 break;
820 case 'e':
821 snprintf(error_sock_path, PATH_MAX, "%s", optarg);
822 break;
823 case 'd':
824 opt_daemon = 1;
825 break;
826 case 'h':
827 usage();
828 exit(EXIT_FAILURE);
829 case 'q':
830 opt_quiet = 1;
831 break;
832 case 'v':
833 opt_verbose = 1;
834 break;
835 case 'V':
836 fprintf(stdout, "%s\n", VERSION);
837 exit(EXIT_SUCCESS);
838 default:
839 usage();
840 exit(EXIT_FAILURE);
d4a1283e
JD
841 }
842 }
843}
844
845
846/*
847 * main
848 */
849int main(int argc, char **argv)
850{
851 int i;
852 int ret = 0;
853 void *status;
854
855 /* Parse arguments */
856 progname = argv[0];
857 parse_args(argc, argv);
858
859 /* Daemonize */
860 if (opt_daemon) {
861 ret = daemon(0, 0);
862 if (ret < 0) {
863 perror("daemon");
864 goto error;
865 }
866 }
867
868 if (strlen(command_sock_path) == 0) {
869 snprintf(command_sock_path, PATH_MAX,
870 KCONSUMERD_CMD_SOCK_PATH);
871 }
872 if (strlen(error_sock_path) == 0) {
873 snprintf(error_sock_path, PATH_MAX,
874 KCONSUMERD_ERR_SOCK_PATH);
875 }
876
877 if (set_signal_handler() < 0) {
878 goto error;
879 }
880
252fd492
JD
881 /* create the pipe to wake to polling thread when needed */
882 ret = pipe(poll_pipe);
883 if (ret < 0) {
884 perror("Error creating poll pipe");
885 goto end;
886 }
887
d4a1283e
JD
888 /* Connect to the socket created by ltt-sessiond to report errors */
889 DBG("Connecting to error socket %s", error_sock_path);
890 error_socket = lttcomm_connect_unix_sock(error_sock_path);
891 /* not a fatal error, but all communication with ltt-sessiond will fail */
892 if (error_socket < 0) {
893 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
894 }
895
896 /* Create the thread to manage the receive of fd */
897 ret = pthread_create(&threads[0], NULL, thread_receive_fds, (void *) NULL);
898 if (ret != 0) {
899 perror("pthread_create");
900 goto error;
901 }
902
903 /* Create thread to manage the polling/writing of traces */
904 ret = pthread_create(&threads[1], NULL, thread_poll_fds, (void *) NULL);
905 if (ret != 0) {
906 perror("pthread_create");
907 goto error;
908 }
909
910 for (i = 0; i < 2; i++) {
911 ret = pthread_join(threads[i], &status);
912 if (ret != 0) {
913 perror("pthread_join");
914 goto error;
915 }
916 }
917 ret = EXIT_SUCCESS;
918 send_error(KCONSUMERD_EXIT_SUCCESS);
919 goto end;
920
921error:
922 ret = EXIT_FAILURE;
923 send_error(KCONSUMERD_EXIT_FAILURE);
924
925end:
926 cleanup();
927
928 return ret;
929}
This page took 0.059286 seconds and 4 git commands to generate.