Create a library for the kernel consumer
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <fcntl.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <urcu/list.h>
31
32 #include "libkernelctl.h"
33 #include "liblttkconsumerd.h"
34 #include "lttngerr.h"
35
36 /* Init the list of FDs */
37 static struct kconsumerd_fd_list kconsumerd_fd_list = {
38 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
39 };
40
41 /* Number of element for the list below. */
42 static unsigned int kconsumerd_fds_count;
43
44 /* If the local array of FDs needs update in the poll function */
45 static unsigned int kconsumerd_update_fd_array = 1;
46
47 /* lock the fd array and structures */
48 static pthread_mutex_t kconsumerd_lock_fds;
49
50 /* communication with splice */
51 static int kconsumerd_thread_pipe[2];
52
53 /* pipe to wake the poll thread when necessary */
54 static int kconsumerd_poll_pipe[2];
55
56 /* timeout parameter, to control the polling thread grace period */
57 static int kconsumerd_poll_timeout = -1;
58
59 /* socket to communicate errors with sessiond */
60 static int kconsumerd_error_socket;
61
62 /* socket to exchange commands with sessiond */
63 static char *kconsumerd_command_sock_path;
64
65 /* flag to inform the polling thread to kconsumerd_quit when all fd hung up */
66 static int kconsumerd_quit = 0;
67
68 /*
69 * kconsumerd_set_error_socket
70 *
71 * Set the error socket
72 */
73 void kconsumerd_set_error_socket(int sock)
74 {
75 kconsumerd_error_socket = sock;
76 }
77
78 /*
79 * kconsumerd_set_command_socket_path
80 *
81 * Set the command socket path
82 */
83 void kconsumerd_set_command_socket_path(char *sock)
84 {
85 kconsumerd_command_sock_path = sock;
86 }
87
88 /*
89 * kconsumerd_del_fd
90 *
91 * Remove a fd from the global list protected by a mutex
92 */
93 static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
94 {
95 pthread_mutex_lock(&kconsumerd_lock_fds);
96 cds_list_del(&lcf->list);
97 if (kconsumerd_fds_count > 0) {
98 kconsumerd_fds_count--;
99 if (lcf != NULL) {
100 close(lcf->out_fd);
101 close(lcf->consumerd_fd);
102 free(lcf);
103 lcf = NULL;
104 }
105 }
106 pthread_mutex_unlock(&kconsumerd_lock_fds);
107 }
108
109 /*
110 * kconsumerd_add_fd
111 *
112 * Add a fd to the global list protected by a mutex
113 */
114 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
115 {
116 struct kconsumerd_fd *tmp_fd;
117 int ret;
118
119 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
120 tmp_fd->sessiond_fd = buf->fd;
121 tmp_fd->consumerd_fd = consumerd_fd;
122 tmp_fd->state = buf->state;
123 tmp_fd->max_sb_size = buf->max_sb_size;
124 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
125
126 /* Opening the tracefile in write mode */
127 ret = open(tmp_fd->path_name,
128 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
129 if (ret < 0) {
130 ERR("Opening %s", tmp_fd->path_name);
131 perror("open");
132 goto end;
133 }
134 tmp_fd->out_fd = ret;
135 tmp_fd->out_fd_offset = 0;
136
137 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
138 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
139
140 pthread_mutex_lock(&kconsumerd_lock_fds);
141 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
142 kconsumerd_fds_count++;
143 pthread_mutex_unlock(&kconsumerd_lock_fds);
144
145 end:
146 return ret;
147 }
148
149 /*
150 * kconsumerd_change_fd_state
151 *
152 * Update a fd according to what we just received
153 */
154 static void kconsumerd_change_fd_state(int sessiond_fd,
155 enum kconsumerd_fd_state state)
156 {
157 struct kconsumerd_fd *iter;
158 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
159 if (iter->sessiond_fd == sessiond_fd) {
160 iter->state = state;
161 break;
162 }
163 }
164 }
165
166 /*
167 * kconsumerd_update_poll_array
168 *
169 * Allocate the pollfd structure and the local view of the out fds
170 * to avoid doing a lookup in the linked list and concurrency issues
171 * when writing is needed.
172 * Returns the number of fds in the structures
173 */
174 static int kconsumerd_update_poll_array(struct pollfd **pollfd,
175 struct kconsumerd_fd **local_kconsumerd_fd)
176 {
177 struct kconsumerd_fd *iter;
178 int i = 0;
179
180 DBG("Updating poll fd array");
181 pthread_mutex_lock(&kconsumerd_lock_fds);
182
183 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
184 DBG("Inside for each");
185 if (iter->state == ACTIVE_FD) {
186 DBG("Active FD %d", iter->consumerd_fd);
187 (*pollfd)[i].fd = iter->consumerd_fd;
188 (*pollfd)[i].events = POLLIN | POLLPRI;
189 local_kconsumerd_fd[i] = iter;
190 i++;
191 }
192 }
193
194 /*
195 * insert the kconsumerd_poll_pipe at the end of the array and don't
196 * increment i so nb_fd is the number of real FD
197 */
198 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
199 (*pollfd)[i].events = POLLIN;
200
201 kconsumerd_update_fd_array = 0;
202 pthread_mutex_unlock(&kconsumerd_lock_fds);
203 return i;
204 }
205
206
207 /*
208 * kconsumerd_on_read_subbuffer_mmap
209 *
210 * mmap the ring buffer, read it and write the data to the tracefile.
211 * Returns the number of bytes written
212 */
213 static int kconsumerd_on_read_subbuffer_mmap(
214 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
215 {
216 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
217 char *mmap_base;
218 char *padding = NULL;
219 long ret = 0;
220 off_t orig_offset = kconsumerd_fd->out_fd_offset;
221 int fd = kconsumerd_fd->consumerd_fd;
222 int outfd = kconsumerd_fd->out_fd;
223
224 /* get the padded subbuffer size to know the padding required */
225 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
226 if (ret != 0) {
227 ret = errno;
228 perror("kernctl_get_padded_subbuf_size");
229 goto end;
230 }
231 padding_len = padded_len - len;
232 padding = malloc(padding_len * sizeof(char));
233 memset(padding, '\0', padding_len);
234
235 /* get the len of the mmap region */
236 ret = kernctl_get_mmap_len(fd, &mmap_len);
237 if (ret != 0) {
238 ret = errno;
239 perror("kernctl_get_mmap_len");
240 goto end;
241 }
242
243 /* get the offset inside the fd to mmap */
244 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
245 if (ret != 0) {
246 ret = errno;
247 perror("kernctl_get_mmap_read_offset");
248 goto end;
249 }
250
251 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
252 if (mmap_base == MAP_FAILED) {
253 perror("Error mmaping");
254 ret = -1;
255 goto end;
256 }
257
258 while (len > 0) {
259 ret = write(outfd, mmap_base, len);
260 if (ret >= len) {
261 len = 0;
262 } else if (ret < 0) {
263 ret = errno;
264 perror("Error in file write");
265 goto end;
266 }
267 /* This won't block, but will start writeout asynchronously */
268 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
269 SYNC_FILE_RANGE_WRITE);
270 kconsumerd_fd->out_fd_offset += ret;
271 }
272
273 /* once all the data is written, write the padding to disk */
274 ret = write(outfd, padding, padding_len);
275 if (ret < 0) {
276 ret = errno;
277 perror("Error writing padding to file");
278 goto end;
279 }
280
281 /*
282 * This does a blocking write-and-wait on any page that belongs to the
283 * subbuffer prior to the one we just wrote.
284 * Don't care about error values, as these are just hints and ways to
285 * limit the amount of page cache used.
286 */
287 if (orig_offset >= kconsumerd_fd->max_sb_size) {
288 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
289 kconsumerd_fd->max_sb_size,
290 SYNC_FILE_RANGE_WAIT_BEFORE
291 | SYNC_FILE_RANGE_WRITE
292 | SYNC_FILE_RANGE_WAIT_AFTER);
293
294 /*
295 * Give hints to the kernel about how we access the file:
296 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
297 * we write it.
298 *
299 * We need to call fadvise again after the file grows because the
300 * kernel does not seem to apply fadvise to non-existing parts of the
301 * file.
302 *
303 * Call fadvise _after_ having waited for the page writeback to
304 * complete because the dirty page writeback semantic is not well
305 * defined. So it can be expected to lead to lower throughput in
306 * streaming.
307 */
308 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
309 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
310 }
311 goto end;
312
313 end:
314 if (padding != NULL) {
315 free(padding);
316 }
317 return ret;
318 }
319
320 /*
321 * kconsumerd_on_read_subbuffer
322 *
323 * Splice the data from the ring buffer to the tracefile.
324 * Returns the number of bytes spliced
325 */
326 static int kconsumerd_on_read_subbuffer(
327 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
328 {
329 long ret = 0;
330 loff_t offset = 0;
331 off_t orig_offset = kconsumerd_fd->out_fd_offset;
332 int fd = kconsumerd_fd->consumerd_fd;
333 int outfd = kconsumerd_fd->out_fd;
334
335 while (len > 0) {
336 DBG("splice chan to pipe offset %lu (fd : %d)",
337 (unsigned long)offset, fd);
338 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
339 SPLICE_F_MOVE | SPLICE_F_MORE);
340 DBG("splice chan to pipe ret %ld", ret);
341 if (ret < 0) {
342 ret = errno;
343 perror("Error in relay splice");
344 goto splice_error;
345 }
346
347 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
348 SPLICE_F_MOVE | SPLICE_F_MORE);
349 DBG("splice pipe to file %ld", ret);
350 if (ret < 0) {
351 ret = errno;
352 perror("Error in file splice");
353 goto splice_error;
354 }
355 if (ret >= len) {
356 len = 0;
357 }
358 /* This won't block, but will start writeout asynchronously */
359 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
360 SYNC_FILE_RANGE_WRITE);
361 kconsumerd_fd->out_fd_offset += ret;
362 }
363
364 /*
365 * This does a blocking write-and-wait on any page that belongs to the
366 * subbuffer prior to the one we just wrote.
367 * Don't care about error values, as these are just hints and ways to
368 * limit the amount of page cache used.
369 */
370 if (orig_offset >= kconsumerd_fd->max_sb_size) {
371 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
372 kconsumerd_fd->max_sb_size,
373 SYNC_FILE_RANGE_WAIT_BEFORE
374 | SYNC_FILE_RANGE_WRITE
375 | SYNC_FILE_RANGE_WAIT_AFTER);
376 /*
377 * Give hints to the kernel about how we access the file:
378 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
379 * we write it.
380 *
381 * We need to call fadvise again after the file grows because the
382 * kernel does not seem to apply fadvise to non-existing parts of the
383 * file.
384 *
385 * Call fadvise _after_ having waited for the page writeback to
386 * complete because the dirty page writeback semantic is not well
387 * defined. So it can be expected to lead to lower throughput in
388 * streaming.
389 */
390 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
391 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
392 }
393 goto end;
394
395 splice_error:
396 /* send the appropriate error description to sessiond */
397 switch(ret) {
398 case EBADF:
399 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
400 break;
401 case EINVAL:
402 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
403 break;
404 case ENOMEM:
405 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
406 break;
407 case ESPIPE:
408 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
409 break;
410 }
411
412 end:
413 return ret;
414 }
415
416 /*
417 * kconsumerd_read_subbuffer
418 *
419 * Consume data on a file descriptor and write it on a trace file
420 */
421 static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
422 {
423 unsigned long len;
424 int err;
425 long ret = 0;
426 int infd = kconsumerd_fd->consumerd_fd;
427
428 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
429 /* Get the next subbuffer */
430 err = kernctl_get_next_subbuf(infd);
431 if (err != 0) {
432 ret = errno;
433 perror("Reserving sub buffer failed (everything is normal, "
434 "it is due to concurrency)");
435 goto end;
436 }
437
438 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
439 case LTTNG_KERNEL_SPLICE:
440 /* read the whole subbuffer */
441 err = kernctl_get_padded_subbuf_size(infd, &len);
442 if (err != 0) {
443 ret = errno;
444 perror("Getting sub-buffer len failed.");
445 goto end;
446 }
447
448 /* splice the subbuffer to the tracefile */
449 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
450 if (ret < 0) {
451 /*
452 * display the error but continue processing to try
453 * to release the subbuffer
454 */
455 ERR("Error splicing to tracefile");
456 }
457 break;
458 case LTTNG_KERNEL_MMAP:
459 /* read the used subbuffer size */
460 err = kernctl_get_subbuf_size(infd, &len);
461 if (err != 0) {
462 ret = errno;
463 perror("Getting sub-buffer len failed.");
464 goto end;
465 }
466 /* write the subbuffer to the tracefile */
467 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
468 if (ret < 0) {
469 /*
470 * display the error but continue processing to try
471 * to release the subbuffer
472 */
473 ERR("Error writing to tracefile");
474 }
475 break;
476 default:
477 ERR("Unknown output method");
478 ret = -1;
479 }
480
481 err = kernctl_put_next_subbuf(infd);
482 if (err != 0) {
483 ret = errno;
484 if (errno == EFAULT) {
485 perror("Error in unreserving sub buffer\n");
486 } else if (errno == EIO) {
487 /* Should never happen with newer LTTng versions */
488 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
489 }
490 goto end;
491 }
492
493 end:
494 return ret;
495 }
496
497 /*
498 * kconsumerd_consumerd_recv_fd
499 *
500 * Receives an array of file descriptors and the associated
501 * structures describing each fd (path name).
502 * Returns the size of received data
503 */
504 static int kconsumerd_consumerd_recv_fd(int sfd, int size,
505 enum kconsumerd_command cmd_type)
506 {
507 struct msghdr msg;
508 struct iovec iov[1];
509 int ret = 0, i, tmp2;
510 struct cmsghdr *cmsg;
511 int nb_fd;
512 char recv_fd[CMSG_SPACE(sizeof(int))];
513 struct lttcomm_kconsumerd_msg lkm;
514
515 /* the number of fds we are about to receive */
516 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
517
518 for (i = 0; i < nb_fd; i++) {
519 memset(&msg, 0, sizeof(msg));
520
521 /* Prepare to receive the structures */
522 iov[0].iov_base = &lkm;
523 iov[0].iov_len = sizeof(lkm);
524 msg.msg_iov = iov;
525 msg.msg_iovlen = 1;
526
527 msg.msg_control = recv_fd;
528 msg.msg_controllen = sizeof(recv_fd);
529
530 DBG("Waiting to receive fd");
531 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
532 perror("recvmsg");
533 continue;
534 }
535
536 if (ret != (size / nb_fd)) {
537 ERR("Received only %d, expected %d", ret, size);
538 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
539 goto end;
540 }
541
542 cmsg = CMSG_FIRSTHDR(&msg);
543 if (!cmsg) {
544 ERR("Invalid control message header");
545 ret = -1;
546 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
547 goto end;
548 }
549 /* if we received fds */
550 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
551 switch (cmd_type) {
552 case ADD_STREAM:
553 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
554 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
555 if (ret < 0) {
556 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
557 goto end;
558 }
559 break;
560 case UPDATE_STREAM:
561 kconsumerd_change_fd_state(lkm.fd, lkm.state);
562 break;
563 default:
564 break;
565 }
566 /* flag to tell the polling thread to update its fd array */
567 kconsumerd_update_fd_array = 1;
568 /* signal the poll thread */
569 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
570 } else {
571 ERR("Didn't received any fd");
572 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
573 ret = -1;
574 goto end;
575 }
576 }
577
578 end:
579 DBG("kconsumerd_consumerd_recv_fd thread exiting");
580 return ret;
581 }
582
583 /*
584 * kconsumerd_thread_poll_fds
585 *
586 * This thread polls the fds in the ltt_fd_list to consume the data
587 * and write it to tracefile if necessary.
588 */
589 void *kconsumerd_thread_poll_fds(void *data)
590 {
591 int num_rdy, num_hup, high_prio, ret, i;
592 struct pollfd *pollfd = NULL;
593 /* local view of the fds */
594 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
595 /* local view of kconsumerd_fds_count */
596 int nb_fd = 0;
597 char tmp;
598 int tmp2;
599
600 ret = pipe(kconsumerd_thread_pipe);
601 if (ret < 0) {
602 perror("Error creating pipe");
603 goto end;
604 }
605
606 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
607
608 while (1) {
609 high_prio = 0;
610 num_hup = 0;
611
612 /*
613 * the ltt_fd_list has been updated, we need to update our
614 * local array as well
615 */
616 if (kconsumerd_update_fd_array == 1) {
617 if (pollfd != NULL) {
618 free(pollfd);
619 pollfd = NULL;
620 }
621 if (local_kconsumerd_fd != NULL) {
622 free(local_kconsumerd_fd);
623 local_kconsumerd_fd = NULL;
624 }
625 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
626 pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd));
627 if (pollfd == NULL) {
628 perror("pollfd malloc");
629 goto end;
630 }
631 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
632 local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) *
633 sizeof(struct kconsumerd_fd));
634 if (local_kconsumerd_fd == NULL) {
635 perror("local_kconsumerd_fd malloc");
636 goto end;
637 }
638 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
639 if (ret < 0) {
640 ERR("Error in allocating pollfd or local_outfds");
641 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
642 goto end;
643 }
644 nb_fd = ret;
645 }
646
647 /* poll on the array of fds */
648 DBG("polling on %d fd", nb_fd + 1);
649 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
650 DBG("poll num_rdy : %d", num_rdy);
651 if (num_rdy == -1) {
652 perror("Poll error");
653 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
654 goto end;
655 } else if (num_rdy == 0) {
656 DBG("Polling thread timed out");
657 goto end;
658 }
659
660 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
661 if (nb_fd == 0 && kconsumerd_quit == 1) {
662 goto end;
663 }
664
665 /*
666 * if only the kconsumerd_poll_pipe triggered poll to return just
667 * return to the beginning of the loop to update the array
668 */
669 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
670 DBG("kconsumerd_poll_pipe wake up");
671 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
672 continue;
673 }
674
675 /* Take care of high priority channels first. */
676 for (i = 0; i < nb_fd; i++) {
677 switch(pollfd[i].revents) {
678 case POLLERR:
679 ERR("Error returned in polling fd %d.", pollfd[i].fd);
680 kconsumerd_del_fd(local_kconsumerd_fd[i]);
681 kconsumerd_update_fd_array = 1;
682 num_hup++;
683 break;
684 case POLLHUP:
685 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
686 kconsumerd_del_fd(local_kconsumerd_fd[i]);
687 kconsumerd_update_fd_array = 1;
688 num_hup++;
689 break;
690 case POLLNVAL:
691 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
692 kconsumerd_del_fd(local_kconsumerd_fd[i]);
693 kconsumerd_update_fd_array = 1;
694 num_hup++;
695 break;
696 case POLLPRI:
697 DBG("Urgent read on fd %d", pollfd[i].fd);
698 high_prio = 1;
699 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
700 /* it's ok to have an unavailable sub-buffer */
701 if (ret == EAGAIN) {
702 ret = 0;
703 }
704 break;
705 }
706 }
707
708 /* If every buffer FD has hung up, we end the read loop here */
709 if (nb_fd > 0 && num_hup == nb_fd) {
710 DBG("every buffer FD has hung up\n");
711 if (kconsumerd_quit == 1) {
712 goto end;
713 }
714 continue;
715 }
716
717 /* Take care of low priority channels. */
718 if (high_prio == 0) {
719 for (i = 0; i < nb_fd; i++) {
720 if (pollfd[i].revents == POLLIN) {
721 DBG("Normal read on fd %d", pollfd[i].fd);
722 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
723 /* it's ok to have an unavailable subbuffer */
724 if (ret == EAGAIN) {
725 ret = 0;
726 }
727 }
728 }
729 }
730 }
731 end:
732 DBG("polling thread exiting");
733 if (pollfd != NULL) {
734 free(pollfd);
735 pollfd = NULL;
736 }
737 if (local_kconsumerd_fd != NULL) {
738 free(local_kconsumerd_fd);
739 local_kconsumerd_fd = NULL;
740 }
741 kconsumerd_cleanup();
742 return NULL;
743 }
744
745 /*
746 * kconsumerd_create_poll_pipe
747 *
748 * create the pipe to wake to polling thread when needed
749 */
750 int kconsumerd_create_poll_pipe()
751 {
752 return pipe(kconsumerd_poll_pipe);
753 }
754
755 /*
756 * kconsumerd_thread_receive_fds
757 *
758 * This thread listens on the consumerd socket and
759 * receives the file descriptors from ltt-sessiond
760 */
761 void *kconsumerd_thread_receive_fds(void *data)
762 {
763 int sock, client_socket, ret;
764 struct lttcomm_kconsumerd_header tmp;
765
766 DBG("Creating command socket %s", kconsumerd_command_sock_path);
767 unlink(kconsumerd_command_sock_path);
768 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
769 if (client_socket < 0) {
770 ERR("Cannot create command socket");
771 goto end;
772 }
773
774 ret = lttcomm_listen_unix_sock(client_socket);
775 if (ret < 0) {
776 goto end;
777 }
778
779 DBG("Sending ready command to ltt-sessiond");
780 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
781 if (ret < 0) {
782 ERR("Error sending ready command to ltt-sessiond");
783 goto end;
784 }
785
786 /* Blocking call, waiting for transmission */
787 sock = lttcomm_accept_unix_sock(client_socket);
788 if (sock <= 0) {
789 WARN("On accept");
790 goto end;
791 }
792 while (1) {
793 /* We first get the number of fd we are about to receive */
794 ret = lttcomm_recv_unix_sock(sock, &tmp,
795 sizeof(struct lttcomm_kconsumerd_header));
796 if (ret <= 0) {
797 ERR("Communication interrupted on command socket");
798 goto end;
799 }
800 if (tmp.cmd_type == STOP) {
801 DBG("Received STOP command");
802 goto end;
803 }
804 /* we received a command to add or update fds */
805 ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
806 if (ret <= 0) {
807 ERR("Receiving the FD, exiting");
808 goto end;
809 }
810 }
811
812 end:
813 DBG("kconsumerd_thread_receive_fds exiting");
814
815 /*
816 * when all fds have hung up, the polling thread
817 * can exit cleanly
818 */
819 kconsumerd_quit = 1;
820
821 /*
822 * 2s of grace period, if no polling events occur during
823 * this period, the polling thread will exit even if there
824 * are still open FDs (should not happen, but safety mechanism).
825 */
826 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
827
828 /* wake up the polling thread */
829 ret = write(kconsumerd_poll_pipe[1], "4", 1);
830 if (ret < 0) {
831 perror("poll pipe write");
832 }
833 return NULL;
834 }
835
836 /*
837 * kconsumerd_cleanup
838 *
839 * Cleanup the daemon's socket on exit
840 */
841 void kconsumerd_cleanup()
842 {
843 struct kconsumerd_fd *iter;
844
845 /* remove the socket file */
846 unlink(kconsumerd_command_sock_path);
847
848 /* close all outfd */
849 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
850 kconsumerd_del_fd(iter);
851 }
852 }
853
854 /*
855 * kconsumerd_send_error
856 *
857 * send return code to ltt-sessiond
858 */
859 int kconsumerd_send_error(enum lttcomm_return_code cmd)
860 {
861 if (kconsumerd_error_socket > 0) {
862 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
863 sizeof(enum lttcomm_sessiond_command));
864 }
865
866 return 0;
867 }
This page took 0.052715 seconds and 5 git commands to generate.