Consume with mmap if enabled
[lttng-tools.git] / kconsumerd / kconsumerd.c
CommitLineData
d4a1283e
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <getopt.h>
23#include <grp.h>
24#include <limits.h>
25#include <pthread.h>
26#include <signal.h>
27#include <stdio.h>
28#include <stdlib.h>
29#include <string.h>
30#include <sys/ipc.h>
31#include <sys/shm.h>
32#include <sys/socket.h>
33#include <sys/stat.h>
34#include <sys/types.h>
35#include <urcu/list.h>
36#include <poll.h>
37#include <unistd.h>
03424a9b 38#include <sys/mman.h>
d4a1283e
JD
39
40#include "lttngerr.h"
41#include "libkernelctl.h"
42#include "liblttsessiondcomm.h"
43#include "kconsumerd.h"
44
45/* Init the list of FDs */
46static struct ltt_kconsumerd_fd_list kconsumerd_fd_list = {
47 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
48};
49
50/* Number of element for the list below. */
51static unsigned int fds_count;
52
53/* If the local array of FDs needs update in the poll function */
54static unsigned int update_fd_array = 1;
55
56/* lock the fd array and structures */
57static pthread_mutex_t kconsumerd_lock_fds;
58
59/* the two threads (receive fd and poll) */
60static pthread_t threads[2];
61
62/* communication with splice */
63static int thread_pipe[2];
64
252fd492
JD
65/* pipe to wake the poll thread when necessary */
66static int poll_pipe[2];
67
d4a1283e
JD
68/* socket to communicate errors with sessiond */
69static int error_socket = -1;
70
13e44745
JD
71/* to count the number of time the user pressed ctrl+c */
72static int sigintcount = 0;
73
9d26659a
JD
74/* flag to inform the polling thread to quit when all fd hung up */
75static int quit = 0;
76
d4a1283e
JD
77/* Argument variables */
78int opt_quiet;
79int opt_verbose;
80static int opt_daemon;
81static const char *progname;
82static char command_sock_path[PATH_MAX]; /* Global command socket path */
83static char error_sock_path[PATH_MAX]; /* Global error path */
84
bcd8d9db
JD
85/*
86 * del_fd
87 *
88 * Remove a fd from the global list protected by a mutex
89 */
90static void del_fd(struct ltt_kconsumerd_fd *lcf)
91{
6aea26bc 92 DBG("Removing %d", lcf->consumerd_fd);
bcd8d9db
JD
93 pthread_mutex_lock(&kconsumerd_lock_fds);
94 cds_list_del(&lcf->list);
95 if (fds_count > 0) {
96 fds_count--;
97 DBG("Removed ltt_kconsumerd_fd");
98 if (lcf != NULL) {
99 close(lcf->out_fd);
100 close(lcf->consumerd_fd);
101 free(lcf);
102 lcf = NULL;
103 }
104 }
105 pthread_mutex_unlock(&kconsumerd_lock_fds);
106}
107
d4a1283e
JD
108/*
109 * cleanup
110 *
111 * Cleanup the daemon's socket on exit
112 */
113static void cleanup()
114{
bcd8d9db
JD
115 struct ltt_kconsumerd_fd *iter;
116
bcd8d9db 117 /* remove the socket file */
d4a1283e 118 unlink(command_sock_path);
bcd8d9db
JD
119
120 /* unblock the threads */
121 WARN("Terminating the threads before exiting");
122 pthread_cancel(threads[0]);
123 pthread_cancel(threads[1]);
124
125 /* close all outfd */
126 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
127 del_fd(iter);
128 }
d4a1283e
JD
129}
130
6aea26bc
JD
131/*
132 * send_error
d4a1283e
JD
133 *
134 * send return code to ltt-sessiond
135 */
136static int send_error(enum lttcomm_return_code cmd)
137{
138 if (error_socket > 0) {
139 return lttcomm_send_unix_sock(error_socket, &cmd,
140 sizeof(enum lttcomm_sessiond_command));
141 } else {
142 return 0;
143 }
144}
145
d4a1283e
JD
146/*
147 * add_fd
148 *
149 * Add a fd to the global list protected by a mutex
150 */
151static int add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
152{
153 struct ltt_kconsumerd_fd *tmp_fd;
154 int ret;
155
156 tmp_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
157 tmp_fd->sessiond_fd = buf->fd;
158 tmp_fd->consumerd_fd = consumerd_fd;
159 tmp_fd->state = buf->state;
160 tmp_fd->max_sb_size = buf->max_sb_size;
161 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
162
163 /* Opening the tracefile in write mode */
164 DBG("Opening %s for writing", tmp_fd->path_name);
165 ret = open(tmp_fd->path_name,
46258765 166 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
d4a1283e
JD
167 if (ret < 0) {
168 ERR("Opening %s", tmp_fd->path_name);
169 perror("open");
170 goto end;
171 }
172 tmp_fd->out_fd = ret;
173 tmp_fd->out_fd_offset = 0;
174
175 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
176 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
177
178 pthread_mutex_lock(&kconsumerd_lock_fds);
179 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
180 fds_count++;
181 pthread_mutex_unlock(&kconsumerd_lock_fds);
182
183end:
184 return ret;
185}
186
d4a1283e
JD
187
188/*
189 * sighandler
190 *
191 * Signal handler for the daemon
192 */
193static void sighandler(int sig)
194{
13e44745
JD
195 if (sig == SIGINT && sigintcount++ == 0) {
196 DBG("ignoring first SIGINT");
197 return;
198 }
199
d4a1283e
JD
200 cleanup();
201
202 return;
203}
204
205/*
206 * set_signal_handler
207 *
208 * Setup signal handler for :
209 * SIGINT, SIGTERM, SIGPIPE
210 */
211static int set_signal_handler(void)
212{
213 int ret = 0;
214 struct sigaction sa;
215 sigset_t sigset;
216
217 if ((ret = sigemptyset(&sigset)) < 0) {
218 perror("sigemptyset");
219 return ret;
220 }
221
222 sa.sa_handler = sighandler;
223 sa.sa_mask = sigset;
224 sa.sa_flags = 0;
225 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
226 perror("sigaction");
227 return ret;
228 }
229
230 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
231 perror("sigaction");
232 return ret;
233 }
234
235 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
236 perror("sigaction");
237 return ret;
238 }
239
240 return ret;
241}
242
03424a9b
JD
243/*
244 * on_read_subbuffer_mmap
245 *
246 * mmap the ring buffer, read it and write the data to the tracefile.
247 * Returns the number of bytes written
248 */
249static int on_read_subbuffer_mmap(struct ltt_kconsumerd_fd *kconsumerd_fd,
250 unsigned long len)
251{
252 unsigned long mmap_len;
253 unsigned long mmap_offset;
254 unsigned long padded_len;
255 unsigned long padding_len;
256 char *mmap_base;
257 char *padding = NULL;
258 long ret = 0;
259 off_t orig_offset = kconsumerd_fd->out_fd_offset;
260 int fd = kconsumerd_fd->consumerd_fd;
261 int outfd = kconsumerd_fd->out_fd;
262
263 /* get the padded subbuffer size to know the padding required */
264 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
265 if (ret != 0) {
266 ret = errno;
267 perror("kernctl_get_padded_subbuf_size");
268 goto end;
269 }
270 padding_len = padded_len - len;
271 padding = malloc(padding_len * sizeof(char));
272 memset(padding, '\0', padding_len);
273
274 /* get the len of the mmap region */
275 ret = kernctl_get_mmap_len(fd, &mmap_len);
276 if (ret != 0) {
277 ret = errno;
278 perror("kernctl_get_mmap_len");
279 goto end;
280 }
281
282 /* get the offset inside the fd to mmap */
283 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
284 if (ret != 0) {
285 ret = errno;
286 perror("kernctl_get_mmap_read_offset");
287 goto end;
288 }
289
290 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
291 if (mmap_base == MAP_FAILED) {
292 perror("Error mmaping");
293 ret = -1;
294 goto end;
295 }
296
297 while (len > 0) {
298 ret = write(outfd, mmap_base, len);
299 if (ret >= len) {
300 len = 0;
301 } else if (ret < 0) {
302 ret = errno;
303 perror("Error in file write");
304 goto end;
305 }
306 /* This won't block, but will start writeout asynchronously */
307 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
308 SYNC_FILE_RANGE_WRITE);
309 kconsumerd_fd->out_fd_offset += ret;
310 }
311
312 /* once all the data is written, write the padding to disk */
313 ret = write(outfd, padding, padding_len);
314 if (ret < 0) {
315 ret = errno;
316 perror("Error writing padding to file");
317 goto end;
318 }
319
320 /*
321 * This does a blocking write-and-wait on any page that belongs to the
322 * subbuffer prior to the one we just wrote.
323 * Don't care about error values, as these are just hints and ways to
324 * limit the amount of page cache used.
325 */
326 if (orig_offset >= kconsumerd_fd->max_sb_size) {
327 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
328 kconsumerd_fd->max_sb_size,
329 SYNC_FILE_RANGE_WAIT_BEFORE
330 | SYNC_FILE_RANGE_WRITE
331 | SYNC_FILE_RANGE_WAIT_AFTER);
332 /*
333 * Give hints to the kernel about how we access the file:
334 * POSIX_FADV_DONTNEED : we won't re-access data in a near
335 * future after we write it.
336 * We need to call fadvise again after the file grows because
337 * the kernel does not seem to apply fadvise to non-existing
338 * parts of the file.
339 * Call fadvise _after_ having waited for the page writeback to
340 * complete because the dirty page writeback semantic is not
341 * well defined. So it can be expected to lead to lower
342 * throughput in streaming.
343 */
344 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
345 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
346 }
347 goto end;
348
349end:
350 if (padding != NULL) {
351 free(padding);
352 }
353 return ret;
354}
355
d4a1283e
JD
356/*
357 * on_read_subbuffer
358 *
359 * Splice the data from the ring buffer to the tracefile.
360 * Returns the number of bytes spliced
361 */
362static int on_read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd,
363 unsigned long len)
364{
365 long ret = 0;
366 loff_t offset = 0;
367 off_t orig_offset = kconsumerd_fd->out_fd_offset;
368 int fd = kconsumerd_fd->consumerd_fd;
369 int outfd = kconsumerd_fd->out_fd;
370
371 while (len > 0) {
372 DBG("splice chan to pipe offset %lu (fd : %d)",
373 (unsigned long)offset, fd);
374 ret = splice(fd, &offset, thread_pipe[1], NULL, len,
375 SPLICE_F_MOVE | SPLICE_F_MORE);
376 DBG("splice chan to pipe ret %ld", ret);
377 if (ret < 0) {
0632499a 378 ret = errno;
d4a1283e 379 perror("Error in relay splice");
0632499a 380 goto splice_error;
d4a1283e
JD
381 }
382
383 ret = splice(thread_pipe[0], NULL, outfd, NULL, ret,
384 SPLICE_F_MOVE | SPLICE_F_MORE);
385 DBG("splice pipe to file %ld", ret);
386 if (ret < 0) {
0632499a 387 ret = errno;
d4a1283e 388 perror("Error in file splice");
0632499a 389 goto splice_error;
d4a1283e
JD
390 }
391 if (ret >= len) {
392 len = 0;
393 }
394 /* This won't block, but will start writeout asynchronously */
395 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
396 SYNC_FILE_RANGE_WRITE);
397 kconsumerd_fd->out_fd_offset += ret;
398 }
0632499a 399
d4a1283e
JD
400 /*
401 * This does a blocking write-and-wait on any page that belongs to the
402 * subbuffer prior to the one we just wrote.
403 * Don't care about error values, as these are just hints and ways to
404 * limit the amount of page cache used.
405 */
406 if (orig_offset >= kconsumerd_fd->max_sb_size) {
407 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
408 kconsumerd_fd->max_sb_size,
409 SYNC_FILE_RANGE_WAIT_BEFORE
410 | SYNC_FILE_RANGE_WRITE
411 | SYNC_FILE_RANGE_WAIT_AFTER);
412 /*
413 * Give hints to the kernel about how we access the file:
414 * POSIX_FADV_DONTNEED : we won't re-access data in a near
415 * future after we write it.
416 * We need to call fadvise again after the file grows because
417 * the kernel does not seem to apply fadvise to non-existing
418 * parts of the file.
419 * Call fadvise _after_ having waited for the page writeback to
420 * complete because the dirty page writeback semantic is not
421 * well defined. So it can be expected to lead to lower
422 * throughput in streaming.
423 */
424 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
425 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
426 }
0632499a
JD
427 goto end;
428
429splice_error:
430 /* send the appropriate error description to sessiond */
431 switch(ret) {
914a571b
JD
432 case EBADF:
433 send_error(KCONSUMERD_SPLICE_EBADF);
434 break;
435 case EINVAL:
436 send_error(KCONSUMERD_SPLICE_EINVAL);
437 break;
438 case ENOMEM:
439 send_error(KCONSUMERD_SPLICE_ENOMEM);
440 break;
441 case ESPIPE:
442 send_error(KCONSUMERD_SPLICE_ESPIPE);
443 break;
0632499a
JD
444 }
445
446end:
d4a1283e
JD
447 return ret;
448}
449
450/*
451 * read_subbuffer
452 *
453 * Consume data on a file descriptor and write it on a trace file
454 */
455static int read_subbuffer(struct ltt_kconsumerd_fd *kconsumerd_fd)
456{
457 unsigned long len;
458 int err;
459 long ret = 0;
460 int infd = kconsumerd_fd->consumerd_fd;
461
6aea26bc 462 DBG("In read_subbuffer (infd : %d)", infd);
d4a1283e
JD
463 /* Get the next subbuffer */
464 err = kernctl_get_next_subbuf(infd);
465 if (err != 0) {
466 ret = errno;
467 perror("Reserving sub buffer failed (everything is normal, "
468 "it is due to concurrency)");
469 goto end;
470 }
471
03424a9b
JD
472 if (DEFAULT_CHANNEL_OUTPUT == LTTNG_KERNEL_SPLICE) {
473 /* read the whole subbuffer */
474 err = kernctl_get_padded_subbuf_size(infd, &len);
475 if (err != 0) {
476 ret = errno;
477 perror("Getting sub-buffer len failed.");
478 goto end;
479 }
d4a1283e 480
03424a9b
JD
481 /* splice the subbuffer to the tracefile */
482 ret = on_read_subbuffer(kconsumerd_fd, len);
483 if (ret < 0) {
484 /*
485 * display the error but continue processing to try
486 * to release the subbuffer
487 */
488 ERR("Error splicing to tracefile");
489 }
490 } else if (DEFAULT_CHANNEL_OUTPUT == LTTNG_KERNEL_MMAP) {
491 /* read the used subbuffer size */
492 err = kernctl_get_subbuf_size(infd, &len);
493 if (err != 0) {
494 ret = errno;
495 perror("Getting sub-buffer len failed.");
496 goto end;
497 }
498
499 /* write the subbuffer to the tracefile */
500 ret = on_read_subbuffer_mmap(kconsumerd_fd, len);
501 if (ret < 0) {
502 /*
503 * display the error but continue processing to try
504 * to release the subbuffer
505 */
506 ERR("Error writing to tracefile");
507 }
508 } else {
509 ERR("Unknown output method");
510 ret = -1;
511 goto end;
d4a1283e
JD
512 }
513
514 err = kernctl_put_next_subbuf(infd);
515 if (err != 0) {
516 ret = errno;
517 if (errno == EFAULT) {
518 perror("Error in unreserving sub buffer\n");
519 } else if (errno == EIO) {
520 /* Should never happen with newer LTTng versions */
521 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
522 }
523 goto end;
524 }
525
526end:
527 return ret;
528}
529
530/*
531 * change_fd_state
532 *
533 * Update a fd according to what we just received
534 */
535static void change_fd_state(int sessiond_fd,
5dc18550 536 enum kconsumerd_fd_state state)
d4a1283e
JD
537{
538 struct ltt_kconsumerd_fd *iter;
539 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
540 if (iter->sessiond_fd == sessiond_fd) {
541 iter->state = state;
542 break;
543 }
544 }
545}
546
547/*
548 * consumerd_recv_fd
549 *
550 * Receives an array of file descriptors and the associated
551 * structures describing each fd (path name).
552 * Returns the size of received data
553 */
554static int consumerd_recv_fd(int sfd, int size,
5dc18550 555 enum kconsumerd_command cmd_type)
d4a1283e
JD
556{
557 struct msghdr msg;
558 struct iovec iov[1];
33a2b854 559 int ret = 0, i, tmp2;
d4a1283e
JD
560 struct cmsghdr *cmsg;
561 int nb_fd;
33a2b854
DG
562 char recv_fd[CMSG_SPACE(sizeof(int))];
563 struct lttcomm_kconsumerd_msg lkm;
564
d4a1283e 565 /* the number of fds we are about to receive */
33a2b854 566 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
d4a1283e 567
33a2b854
DG
568 for (i = 0; i < nb_fd; i++) {
569 memset(&msg, 0, sizeof(msg));
d4a1283e 570
33a2b854
DG
571 /* Prepare to receive the structures */
572 iov[0].iov_base = &lkm;
573 iov[0].iov_len = sizeof(lkm);
574 msg.msg_iov = iov;
575 msg.msg_iovlen = 1;
d4a1283e 576
33a2b854
DG
577 msg.msg_control = recv_fd;
578 msg.msg_controllen = sizeof(recv_fd);
d4a1283e 579
33a2b854
DG
580 DBG("Waiting to receive fd");
581 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
582 perror("recvmsg");
583 continue;
584 }
d4a1283e 585
33a2b854
DG
586 if (ret != (size / nb_fd)) {
587 ERR("Received only %d, expected %d", ret, size);
588 send_error(KCONSUMERD_ERROR_RECV_FD);
589 goto end;
590 }
d4a1283e 591
33a2b854
DG
592 cmsg = CMSG_FIRSTHDR(&msg);
593 if (!cmsg) {
594 ERR("Invalid control message header");
595 ret = -1;
596 send_error(KCONSUMERD_ERROR_RECV_FD);
597 goto end;
598 }
d4a1283e 599
33a2b854
DG
600 /* if we received fds */
601 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
d4a1283e 602 switch (cmd_type) {
5dc18550 603 case ADD_STREAM:
33a2b854
DG
604 DBG("add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
605 ret = add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
914a571b
JD
606 if (ret < 0) {
607 send_error(KCONSUMERD_OUTFD_ERROR);
608 goto end;
609 }
610 break;
5dc18550 611 case UPDATE_STREAM:
33a2b854 612 change_fd_state(lkm.fd, lkm.state);
914a571b
JD
613 break;
614 default:
615 break;
d4a1283e 616 }
33a2b854
DG
617 /* flag to tell the polling thread to update its fd array */
618 update_fd_array = 1;
619 /* signal the poll thread */
620 tmp2 = write(poll_pipe[1], "4", 1);
621 } else {
622 ERR("Didn't received any fd");
623 send_error(KCONSUMERD_ERROR_RECV_FD);
624 ret = -1;
625 goto end;
d4a1283e 626 }
d4a1283e
JD
627 }
628
629end:
9d26659a 630 DBG("consumerd_recv_fd thread exiting");
d4a1283e
JD
631 return ret;
632}
633
634/*
635 * thread_receive_fds
636 *
637 * This thread listens on the consumerd socket and
638 * receives the file descriptors from ltt-sessiond
639 */
640static void *thread_receive_fds(void *data)
641{
642 int sock, client_socket, ret;
643 struct lttcomm_kconsumerd_header tmp;
644
645 DBG("Creating command socket %s", command_sock_path);
646 unlink(command_sock_path);
647 client_socket = lttcomm_create_unix_sock(command_sock_path);
648 if (client_socket < 0) {
649 ERR("Cannot create command socket");
9d26659a 650 goto end;
d4a1283e
JD
651 }
652
653 ret = lttcomm_listen_unix_sock(client_socket);
654 if (ret < 0) {
9d26659a 655 goto end;
d4a1283e
JD
656 }
657
658 DBG("Sending ready command to ltt-sessiond");
659 ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
660 if (ret < 0) {
661 ERR("Error sending ready command to ltt-sessiond");
9d26659a 662 goto end;
d4a1283e
JD
663 }
664
7e8c38c6
JD
665 /* Blocking call, waiting for transmission */
666 sock = lttcomm_accept_unix_sock(client_socket);
667 if (sock <= 0) {
4abc0780 668 WARN("On accept");
9d26659a 669 goto end;
7e8c38c6 670 }
d4a1283e 671 while (1) {
d4a1283e
JD
672 /* We first get the number of fd we are about to receive */
673 ret = lttcomm_recv_unix_sock(sock, &tmp,
674 sizeof(struct lttcomm_kconsumerd_header));
6aea26bc 675 if (ret <= 0) {
9d26659a
JD
676 ERR("Communication interrupted on command socket");
677 goto end;
678 }
679 if (tmp.cmd_type == STOP) {
680 DBG("Received STOP command");
9d26659a 681 goto end;
d4a1283e 682 }
9d26659a 683 /* we received a command to add or update fds */
d4a1283e 684 ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
6aea26bc 685 if (ret <= 0) {
bcd8d9db 686 ERR("Receiving the FD, exiting");
9d26659a 687 goto end;
d4a1283e
JD
688 }
689 }
690
9d26659a
JD
691end:
692 DBG("thread_receive_fds exiting");
d13606b9
DG
693 quit = 1;
694 ret = write(poll_pipe[1], "4", 1);
695 if (ret < 0) {
696 perror("poll pipe write");
697 }
d4a1283e
JD
698 return NULL;
699}
700
701/*
702 * update_poll_array
703 *
704 * Allocate the pollfd structure and the local view of the out fds
705 * to avoid doing a lookup in the linked list and concurrency issues
706 * when writing is needed.
707 * Returns the number of fds in the structures
708 */
709static int update_poll_array(struct pollfd **pollfd,
710 struct ltt_kconsumerd_fd **local_kconsumerd_fd)
711{
712 struct ltt_kconsumerd_fd *iter;
713 int i = 0;
714
d4a1283e
JD
715
716 DBG("Updating poll fd array");
717 pthread_mutex_lock(&kconsumerd_lock_fds);
718
719 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
720 DBG("Inside for each");
721 if (iter->state == ACTIVE_FD) {
722 DBG("Active FD %d", iter->consumerd_fd);
1b686c3f
JD
723 (*pollfd)[i].fd = iter->consumerd_fd;
724 (*pollfd)[i].events = POLLIN | POLLPRI;
d4a1283e
JD
725 local_kconsumerd_fd[i] = iter;
726 i++;
d4a1283e
JD
727 }
728 }
252fd492
JD
729 /*
730 * insert the poll_pipe at the end of the array and don't increment i
731 * so nb_fd is the number of real FD
732 */
733 (*pollfd)[i].fd = poll_pipe[0];
734 (*pollfd)[i].events = POLLIN;
735
d4a1283e
JD
736 update_fd_array = 0;
737 pthread_mutex_unlock(&kconsumerd_lock_fds);
738 return i;
739
d4a1283e
JD
740}
741
742/*
743 * thread_poll_fds
744 *
745 * This thread polls the fds in the ltt_fd_list to consume the data
746 * and write it to tracefile if necessary.
747 */
748static void *thread_poll_fds(void *data)
749{
750 int num_rdy, num_hup, high_prio, ret, i;
751 struct pollfd *pollfd = NULL;
752 /* local view of the fds */
6aea26bc 753 struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
d4a1283e
JD
754 /* local view of fds_count */
755 int nb_fd = 0;
252fd492
JD
756 char tmp;
757 int tmp2;
d4a1283e
JD
758
759 ret = pipe(thread_pipe);
760 if (ret < 0) {
761 perror("Error creating pipe");
762 goto end;
763 }
764
6aea26bc
JD
765 local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
766
d4a1283e
JD
767 while (1) {
768 high_prio = 0;
769 num_hup = 0;
770
771 /*
772 * the ltt_fd_list has been updated, we need to update our
773 * local array as well
774 */
914a571b 775 if (update_fd_array == 1) {
4abc0780
JD
776 if (pollfd != NULL) {
777 free(pollfd);
778 pollfd = NULL;
779 }
780 if (local_kconsumerd_fd != NULL) {
781 free(local_kconsumerd_fd);
782 local_kconsumerd_fd = NULL;
783 }
784 /* allocate for all fds + 1 for the poll_pipe */
785 pollfd = malloc((fds_count + 1) * sizeof(struct pollfd));
786 if (pollfd == NULL) {
787 perror("pollfd malloc");
788 goto end;
789 }
790 /* allocate for all fds + 1 for the poll_pipe */
791 local_kconsumerd_fd = malloc((fds_count + 1) * sizeof(struct ltt_kconsumerd_fd));
792 if (local_kconsumerd_fd == NULL) {
793 perror("local_kconsumerd_fd malloc");
794 goto end;
795 }
796
6aea26bc 797 ret = update_poll_array(&pollfd, local_kconsumerd_fd);
d4a1283e
JD
798 if (ret < 0) {
799 ERR("Error in allocating pollfd or local_outfds");
800 send_error(KCONSUMERD_POLL_ERROR);
801 goto end;
802 }
803 nb_fd = ret;
804 }
805
806 /* poll on the array of fds */
252fd492
JD
807 DBG("polling on %d fd", nb_fd + 1);
808 num_rdy = poll(pollfd, nb_fd + 1, -1);
d4a1283e
JD
809 DBG("poll num_rdy : %d", num_rdy);
810 if (num_rdy == -1) {
811 perror("Poll error");
812 send_error(KCONSUMERD_POLL_ERROR);
813 goto end;
814 }
815
d13606b9
DG
816 /* No FDs and quit, cleanup the thread */
817 if (nb_fd == 0 && quit == 1) {
818 goto end;
819 }
820
252fd492
JD
821 /*
822 * if only the poll_pipe triggered poll to return just return to the
823 * beginning of the loop to update the array
824 */
825 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
826 DBG("poll_pipe wake up");
827 tmp2 = read(poll_pipe[0], &tmp, 1);
828 continue;
829 }
830
d4a1283e
JD
831 /* Take care of high priority channels first. */
832 for (i = 0; i < nb_fd; i++) {
833 switch(pollfd[i].revents) {
914a571b
JD
834 case POLLERR:
835 ERR("Error returned in polling fd %d.", pollfd[i].fd);
9d26659a
JD
836 del_fd(local_kconsumerd_fd[i]);
837 update_fd_array = 1;
914a571b 838 num_hup++;
914a571b
JD
839 break;
840 case POLLHUP:
841 ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
9d26659a
JD
842 del_fd(local_kconsumerd_fd[i]);
843 update_fd_array = 1;
914a571b
JD
844 num_hup++;
845 break;
846 case POLLNVAL:
847 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
9d26659a
JD
848 del_fd(local_kconsumerd_fd[i]);
849 update_fd_array = 1;
914a571b
JD
850 num_hup++;
851 break;
852 case POLLPRI:
853 DBG("Urgent read on fd %d", pollfd[i].fd);
854 high_prio = 1;
855 ret = read_subbuffer(local_kconsumerd_fd[i]);
856 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
857 if (ret == EAGAIN) {
858 ret = 0;
859 }
860 break;
d4a1283e
JD
861 }
862 }
863
864 /* If every buffer FD has hung up, we end the read loop here */
865 if (nb_fd > 0 && num_hup == nb_fd) {
866 DBG("every buffer FD has hung up\n");
9d26659a
JD
867 if (quit == 1) {
868 goto end;
869 }
870 continue;
d4a1283e
JD
871 }
872
873 /* Take care of low priority channels. */
914a571b 874 if (high_prio == 0) {
d4a1283e 875 for (i = 0; i < nb_fd; i++) {
914a571b
JD
876 if (pollfd[i].revents == POLLIN) {
877 DBG("Normal read on fd %d", pollfd[i].fd);
878 ret = read_subbuffer(local_kconsumerd_fd[i]);
879 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
880 if (ret == EAGAIN) {
881 ret = 0;
882 }
d4a1283e
JD
883 }
884 }
885 }
886 }
887end:
9d26659a 888 DBG("polling thread exiting");
d4a1283e
JD
889 if (pollfd != NULL) {
890 free(pollfd);
891 pollfd = NULL;
892 }
893 if (local_kconsumerd_fd != NULL) {
894 free(local_kconsumerd_fd);
895 local_kconsumerd_fd = NULL;
896 }
bcd8d9db 897 cleanup();
d4a1283e
JD
898 return NULL;
899}
900
901/*
902 * usage function on stderr
903 */
904static void usage(void)
905{
906 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
907 fprintf(stderr, " -h, --help "
908 "Display this usage.\n");
909 fprintf(stderr, " -c, --kconsumerd-cmd-sock PATH "
910 "Specify path for the command socket\n");
911 fprintf(stderr, " -e, --kconsumerd-err-sock PATH "
912 "Specify path for the error socket\n");
913 fprintf(stderr, " -d, --daemonize "
914 "Start as a daemon.\n");
915 fprintf(stderr, " -q, --quiet "
916 "No output at all.\n");
917 fprintf(stderr, " -v, --verbose "
918 "Verbose mode. Activate DBG() macro.\n");
919 fprintf(stderr, " -V, --version "
920 "Show version number.\n");
921}
922
923/*
924 * daemon argument parsing
925 */
926static void parse_args(int argc, char **argv)
927{
928 int c;
929
930 static struct option long_options[] = {
931 { "kconsumerd-cmd-sock", 1, 0, 'c' },
932 { "kconsumerd-err-sock", 1, 0, 'e' },
933 { "daemonize", 0, 0, 'd' },
934 { "help", 0, 0, 'h' },
935 { "quiet", 0, 0, 'q' },
936 { "verbose", 0, 0, 'v' },
937 { "version", 0, 0, 'V' },
938 { NULL, 0, 0, 0 }
939 };
940
941 while (1) {
942 int option_index = 0;
943 c = getopt_long(argc, argv, "dhqvV" "c:e:", long_options, &option_index);
944 if (c == -1) {
945 break;
946 }
947
948 switch (c) {
914a571b
JD
949 case 0:
950 fprintf(stderr, "option %s", long_options[option_index].name);
951 if (optarg) {
952 fprintf(stderr, " with arg %s\n", optarg);
953 }
954 break;
955 case 'c':
956 snprintf(command_sock_path, PATH_MAX, "%s", optarg);
957 break;
958 case 'e':
959 snprintf(error_sock_path, PATH_MAX, "%s", optarg);
960 break;
961 case 'd':
962 opt_daemon = 1;
963 break;
964 case 'h':
965 usage();
966 exit(EXIT_FAILURE);
967 case 'q':
968 opt_quiet = 1;
969 break;
970 case 'v':
971 opt_verbose = 1;
972 break;
973 case 'V':
974 fprintf(stdout, "%s\n", VERSION);
975 exit(EXIT_SUCCESS);
976 default:
977 usage();
978 exit(EXIT_FAILURE);
d4a1283e
JD
979 }
980 }
981}
982
983
984/*
985 * main
986 */
987int main(int argc, char **argv)
988{
989 int i;
990 int ret = 0;
991 void *status;
992
993 /* Parse arguments */
994 progname = argv[0];
995 parse_args(argc, argv);
996
997 /* Daemonize */
998 if (opt_daemon) {
999 ret = daemon(0, 0);
1000 if (ret < 0) {
1001 perror("daemon");
1002 goto error;
1003 }
1004 }
1005
1006 if (strlen(command_sock_path) == 0) {
1007 snprintf(command_sock_path, PATH_MAX,
1008 KCONSUMERD_CMD_SOCK_PATH);
1009 }
1010 if (strlen(error_sock_path) == 0) {
1011 snprintf(error_sock_path, PATH_MAX,
1012 KCONSUMERD_ERR_SOCK_PATH);
1013 }
1014
1015 if (set_signal_handler() < 0) {
1016 goto error;
1017 }
1018
252fd492
JD
1019 /* create the pipe to wake to polling thread when needed */
1020 ret = pipe(poll_pipe);
1021 if (ret < 0) {
1022 perror("Error creating poll pipe");
1023 goto end;
1024 }
1025
d4a1283e
JD
1026 /* Connect to the socket created by ltt-sessiond to report errors */
1027 DBG("Connecting to error socket %s", error_sock_path);
1028 error_socket = lttcomm_connect_unix_sock(error_sock_path);
1029 /* not a fatal error, but all communication with ltt-sessiond will fail */
1030 if (error_socket < 0) {
1031 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
1032 }
1033
1034 /* Create the thread to manage the receive of fd */
1035 ret = pthread_create(&threads[0], NULL, thread_receive_fds, (void *) NULL);
1036 if (ret != 0) {
1037 perror("pthread_create");
1038 goto error;
1039 }
1040
1041 /* Create thread to manage the polling/writing of traces */
1042 ret = pthread_create(&threads[1], NULL, thread_poll_fds, (void *) NULL);
1043 if (ret != 0) {
1044 perror("pthread_create");
1045 goto error;
1046 }
1047
1048 for (i = 0; i < 2; i++) {
1049 ret = pthread_join(threads[i], &status);
1050 if (ret != 0) {
1051 perror("pthread_join");
1052 goto error;
1053 }
1054 }
1055 ret = EXIT_SUCCESS;
1056 send_error(KCONSUMERD_EXIT_SUCCESS);
1057 goto end;
1058
1059error:
1060 ret = EXIT_FAILURE;
1061 send_error(KCONSUMERD_EXIT_FAILURE);
1062
1063end:
1064 cleanup();
1065
1066 return ret;
1067}
This page took 0.06575 seconds and 4 git commands to generate.