e4f5fe2e48323fa9ba2a8e7429501f65be03bc3d
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <fcntl.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <urcu/list.h>
31
32 #include "libkernelctl.h"
33 #include "liblttkconsumerd.h"
34 #include "lttngerr.h"
35
36 /* Init the list of FDs */
37 static struct kconsumerd_fd_list kconsumerd_fd_list = {
38 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
39 };
40
41 /* Number of element for the list below. */
42 static unsigned int kconsumerd_fds_count;
43
44 /* If the local array of FDs needs update in the poll function */
45 static unsigned int kconsumerd_update_fd_array = 1;
46
47 /* lock the fd array and structures */
48 static pthread_mutex_t kconsumerd_lock_fds;
49
50 /* communication with splice */
51 static int kconsumerd_thread_pipe[2];
52
53 /* pipe to wake the poll thread when necessary */
54 static int kconsumerd_poll_pipe[2];
55
56 /* timeout parameter, to control the polling thread grace period */
57 static int kconsumerd_poll_timeout = -1;
58
59 /* socket to communicate errors with sessiond */
60 static int kconsumerd_error_socket;
61
62 /* socket to exchange commands with sessiond */
63 static char *kconsumerd_command_sock_path;
64
65 /* flag to inform the polling thread to kconsumerd_quit when all fd hung up */
66 static int kconsumerd_quit = 0;
67
68 /*
69 * kconsumerd_set_error_socket
70 *
71 * Set the error socket
72 */
73 void kconsumerd_set_error_socket(int sock)
74 {
75 kconsumerd_error_socket = sock;
76 }
77
78 /*
79 * kconsumerd_set_command_socket_path
80 *
81 * Set the command socket path
82 */
83 void kconsumerd_set_command_socket_path(char *sock)
84 {
85 kconsumerd_command_sock_path = sock;
86 }
87
88 /*
89 * kconsumerd_find_session_fd
90 *
91 * Find a session fd in the global list.
92 *
93 * Return 1 if found else 0
94 */
95 static int kconsumerd_find_session_fd(int fd)
96 {
97 struct kconsumerd_fd *iter;
98
99 pthread_mutex_lock(&kconsumerd_lock_fds);
100 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
101 if (iter->sessiond_fd == fd) {
102 DBG("Duplicate session fd %d", fd);
103 pthread_mutex_unlock(&kconsumerd_lock_fds);
104 return 1;
105 }
106 }
107 pthread_mutex_unlock(&kconsumerd_lock_fds);
108
109 return 0;
110 }
111
112 /*
113 * kconsumerd_del_fd
114 *
115 * Remove a fd from the global list protected by a mutex
116 */
117 static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
118 {
119 pthread_mutex_lock(&kconsumerd_lock_fds);
120 cds_list_del(&lcf->list);
121 if (kconsumerd_fds_count > 0) {
122 kconsumerd_fds_count--;
123 if (lcf != NULL) {
124 close(lcf->out_fd);
125 close(lcf->consumerd_fd);
126 free(lcf);
127 lcf = NULL;
128 }
129 }
130 pthread_mutex_unlock(&kconsumerd_lock_fds);
131 }
132
133 /*
134 * kconsumerd_add_fd
135 *
136 * Add a fd to the global list protected by a mutex
137 */
138 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
139 {
140 int ret;
141 struct kconsumerd_fd *tmp_fd;
142
143 /* Check if already exist */
144 ret = kconsumerd_find_session_fd(buf->fd);
145 if (ret == 1) {
146 goto end;
147 }
148
149 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
150 tmp_fd->sessiond_fd = buf->fd;
151 tmp_fd->consumerd_fd = consumerd_fd;
152 tmp_fd->state = buf->state;
153 tmp_fd->max_sb_size = buf->max_sb_size;
154 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
155
156 /* Opening the tracefile in write mode */
157 ret = open(tmp_fd->path_name,
158 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
159 if (ret < 0) {
160 ERR("Opening %s", tmp_fd->path_name);
161 perror("open");
162 goto end;
163 }
164 tmp_fd->out_fd = ret;
165 tmp_fd->out_fd_offset = 0;
166
167 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
168 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
169
170 pthread_mutex_lock(&kconsumerd_lock_fds);
171 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
172 kconsumerd_fds_count++;
173 pthread_mutex_unlock(&kconsumerd_lock_fds);
174
175 end:
176 return ret;
177 }
178
179 /*
180 * kconsumerd_change_fd_state
181 *
182 * Update a fd according to what we just received
183 */
184 static void kconsumerd_change_fd_state(int sessiond_fd,
185 enum kconsumerd_fd_state state)
186 {
187 struct kconsumerd_fd *iter;
188 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
189 if (iter->sessiond_fd == sessiond_fd) {
190 iter->state = state;
191 break;
192 }
193 }
194 }
195
196 /*
197 * kconsumerd_update_poll_array
198 *
199 * Allocate the pollfd structure and the local view of the out fds
200 * to avoid doing a lookup in the linked list and concurrency issues
201 * when writing is needed.
202 * Returns the number of fds in the structures
203 */
204 static int kconsumerd_update_poll_array(struct pollfd **pollfd,
205 struct kconsumerd_fd **local_kconsumerd_fd)
206 {
207 struct kconsumerd_fd *iter;
208 int i = 0;
209
210 DBG("Updating poll fd array");
211 pthread_mutex_lock(&kconsumerd_lock_fds);
212
213 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
214 DBG("Inside for each");
215 if (iter->state == ACTIVE_FD) {
216 DBG("Active FD %d", iter->consumerd_fd);
217 (*pollfd)[i].fd = iter->consumerd_fd;
218 (*pollfd)[i].events = POLLIN | POLLPRI;
219 local_kconsumerd_fd[i] = iter;
220 i++;
221 }
222 }
223
224 /*
225 * insert the kconsumerd_poll_pipe at the end of the array and don't
226 * increment i so nb_fd is the number of real FD
227 */
228 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
229 (*pollfd)[i].events = POLLIN;
230
231 kconsumerd_update_fd_array = 0;
232 pthread_mutex_unlock(&kconsumerd_lock_fds);
233 return i;
234 }
235
236
237 /*
238 * kconsumerd_on_read_subbuffer_mmap
239 *
240 * mmap the ring buffer, read it and write the data to the tracefile.
241 * Returns the number of bytes written
242 */
243 static int kconsumerd_on_read_subbuffer_mmap(
244 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
245 {
246 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
247 char *mmap_base;
248 char *padding = NULL;
249 long ret = 0;
250 off_t orig_offset = kconsumerd_fd->out_fd_offset;
251 int fd = kconsumerd_fd->consumerd_fd;
252 int outfd = kconsumerd_fd->out_fd;
253
254 /* get the padded subbuffer size to know the padding required */
255 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
256 if (ret != 0) {
257 ret = errno;
258 perror("kernctl_get_padded_subbuf_size");
259 goto end;
260 }
261 padding_len = padded_len - len;
262 padding = malloc(padding_len * sizeof(char));
263 memset(padding, '\0', padding_len);
264
265 /* get the len of the mmap region */
266 ret = kernctl_get_mmap_len(fd, &mmap_len);
267 if (ret != 0) {
268 ret = errno;
269 perror("kernctl_get_mmap_len");
270 goto end;
271 }
272
273 /* get the offset inside the fd to mmap */
274 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
275 if (ret != 0) {
276 ret = errno;
277 perror("kernctl_get_mmap_read_offset");
278 goto end;
279 }
280
281 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
282 if (mmap_base == MAP_FAILED) {
283 perror("Error mmaping");
284 ret = -1;
285 goto end;
286 }
287
288 while (len > 0) {
289 ret = write(outfd, mmap_base, len);
290 if (ret >= len) {
291 len = 0;
292 } else if (ret < 0) {
293 ret = errno;
294 perror("Error in file write");
295 goto end;
296 }
297 /* This won't block, but will start writeout asynchronously */
298 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
299 SYNC_FILE_RANGE_WRITE);
300 kconsumerd_fd->out_fd_offset += ret;
301 }
302
303 /* once all the data is written, write the padding to disk */
304 ret = write(outfd, padding, padding_len);
305 if (ret < 0) {
306 ret = errno;
307 perror("Error writing padding to file");
308 goto end;
309 }
310
311 /*
312 * This does a blocking write-and-wait on any page that belongs to the
313 * subbuffer prior to the one we just wrote.
314 * Don't care about error values, as these are just hints and ways to
315 * limit the amount of page cache used.
316 */
317 if (orig_offset >= kconsumerd_fd->max_sb_size) {
318 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
319 kconsumerd_fd->max_sb_size,
320 SYNC_FILE_RANGE_WAIT_BEFORE
321 | SYNC_FILE_RANGE_WRITE
322 | SYNC_FILE_RANGE_WAIT_AFTER);
323
324 /*
325 * Give hints to the kernel about how we access the file:
326 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
327 * we write it.
328 *
329 * We need to call fadvise again after the file grows because the
330 * kernel does not seem to apply fadvise to non-existing parts of the
331 * file.
332 *
333 * Call fadvise _after_ having waited for the page writeback to
334 * complete because the dirty page writeback semantic is not well
335 * defined. So it can be expected to lead to lower throughput in
336 * streaming.
337 */
338 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
339 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
340 }
341 goto end;
342
343 end:
344 if (padding != NULL) {
345 free(padding);
346 }
347 return ret;
348 }
349
350 /*
351 * kconsumerd_on_read_subbuffer
352 *
353 * Splice the data from the ring buffer to the tracefile.
354 * Returns the number of bytes spliced
355 */
356 static int kconsumerd_on_read_subbuffer(
357 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
358 {
359 long ret = 0;
360 loff_t offset = 0;
361 off_t orig_offset = kconsumerd_fd->out_fd_offset;
362 int fd = kconsumerd_fd->consumerd_fd;
363 int outfd = kconsumerd_fd->out_fd;
364
365 while (len > 0) {
366 DBG("splice chan to pipe offset %lu (fd : %d)",
367 (unsigned long)offset, fd);
368 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
369 SPLICE_F_MOVE | SPLICE_F_MORE);
370 DBG("splice chan to pipe ret %ld", ret);
371 if (ret < 0) {
372 ret = errno;
373 perror("Error in relay splice");
374 goto splice_error;
375 }
376
377 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
378 SPLICE_F_MOVE | SPLICE_F_MORE);
379 DBG("splice pipe to file %ld", ret);
380 if (ret < 0) {
381 ret = errno;
382 perror("Error in file splice");
383 goto splice_error;
384 }
385 if (ret >= len) {
386 len = 0;
387 }
388 /* This won't block, but will start writeout asynchronously */
389 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
390 SYNC_FILE_RANGE_WRITE);
391 kconsumerd_fd->out_fd_offset += ret;
392 }
393
394 /*
395 * This does a blocking write-and-wait on any page that belongs to the
396 * subbuffer prior to the one we just wrote.
397 * Don't care about error values, as these are just hints and ways to
398 * limit the amount of page cache used.
399 */
400 if (orig_offset >= kconsumerd_fd->max_sb_size) {
401 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
402 kconsumerd_fd->max_sb_size,
403 SYNC_FILE_RANGE_WAIT_BEFORE
404 | SYNC_FILE_RANGE_WRITE
405 | SYNC_FILE_RANGE_WAIT_AFTER);
406 /*
407 * Give hints to the kernel about how we access the file:
408 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
409 * we write it.
410 *
411 * We need to call fadvise again after the file grows because the
412 * kernel does not seem to apply fadvise to non-existing parts of the
413 * file.
414 *
415 * Call fadvise _after_ having waited for the page writeback to
416 * complete because the dirty page writeback semantic is not well
417 * defined. So it can be expected to lead to lower throughput in
418 * streaming.
419 */
420 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
421 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
422 }
423 goto end;
424
425 splice_error:
426 /* send the appropriate error description to sessiond */
427 switch(ret) {
428 case EBADF:
429 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
430 break;
431 case EINVAL:
432 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
433 break;
434 case ENOMEM:
435 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
436 break;
437 case ESPIPE:
438 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
439 break;
440 }
441
442 end:
443 return ret;
444 }
445
446 /*
447 * kconsumerd_read_subbuffer
448 *
449 * Consume data on a file descriptor and write it on a trace file
450 */
451 static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
452 {
453 unsigned long len;
454 int err;
455 long ret = 0;
456 int infd = kconsumerd_fd->consumerd_fd;
457
458 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
459 /* Get the next subbuffer */
460 err = kernctl_get_next_subbuf(infd);
461 if (err != 0) {
462 ret = errno;
463 perror("Reserving sub buffer failed (everything is normal, "
464 "it is due to concurrency)");
465 goto end;
466 }
467
468 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
469 case LTTNG_KERNEL_SPLICE:
470 /* read the whole subbuffer */
471 err = kernctl_get_padded_subbuf_size(infd, &len);
472 if (err != 0) {
473 ret = errno;
474 perror("Getting sub-buffer len failed.");
475 goto end;
476 }
477
478 /* splice the subbuffer to the tracefile */
479 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
480 if (ret < 0) {
481 /*
482 * display the error but continue processing to try
483 * to release the subbuffer
484 */
485 ERR("Error splicing to tracefile");
486 }
487 break;
488 case LTTNG_KERNEL_MMAP:
489 /* read the used subbuffer size */
490 err = kernctl_get_subbuf_size(infd, &len);
491 if (err != 0) {
492 ret = errno;
493 perror("Getting sub-buffer len failed.");
494 goto end;
495 }
496 /* write the subbuffer to the tracefile */
497 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
498 if (ret < 0) {
499 /*
500 * display the error but continue processing to try
501 * to release the subbuffer
502 */
503 ERR("Error writing to tracefile");
504 }
505 break;
506 default:
507 ERR("Unknown output method");
508 ret = -1;
509 }
510
511 err = kernctl_put_next_subbuf(infd);
512 if (err != 0) {
513 ret = errno;
514 if (errno == EFAULT) {
515 perror("Error in unreserving sub buffer\n");
516 } else if (errno == EIO) {
517 /* Should never happen with newer LTTng versions */
518 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
519 }
520 goto end;
521 }
522
523 end:
524 return ret;
525 }
526
527 /*
528 * kconsumerd_consumerd_recv_fd
529 *
530 * Receives an array of file descriptors and the associated
531 * structures describing each fd (path name).
532 * Returns the size of received data
533 */
534 static int kconsumerd_consumerd_recv_fd(int sfd, int size,
535 enum kconsumerd_command cmd_type)
536 {
537 struct msghdr msg;
538 struct iovec iov[1];
539 int ret = 0, i, tmp2;
540 struct cmsghdr *cmsg;
541 int nb_fd;
542 char recv_fd[CMSG_SPACE(sizeof(int))];
543 struct lttcomm_kconsumerd_msg lkm;
544
545 /* the number of fds we are about to receive */
546 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
547
548 for (i = 0; i < nb_fd; i++) {
549 memset(&msg, 0, sizeof(msg));
550
551 /* Prepare to receive the structures */
552 iov[0].iov_base = &lkm;
553 iov[0].iov_len = sizeof(lkm);
554 msg.msg_iov = iov;
555 msg.msg_iovlen = 1;
556
557 msg.msg_control = recv_fd;
558 msg.msg_controllen = sizeof(recv_fd);
559
560 DBG("Waiting to receive fd");
561 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
562 perror("recvmsg");
563 continue;
564 }
565
566 if (ret != (size / nb_fd)) {
567 ERR("Received only %d, expected %d", ret, size);
568 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
569 goto end;
570 }
571
572 cmsg = CMSG_FIRSTHDR(&msg);
573 if (!cmsg) {
574 ERR("Invalid control message header");
575 ret = -1;
576 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
577 goto end;
578 }
579 /* if we received fds */
580 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
581 switch (cmd_type) {
582 case ADD_STREAM:
583 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
584 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
585 if (ret < 0) {
586 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
587 goto end;
588 }
589 break;
590 case UPDATE_STREAM:
591 kconsumerd_change_fd_state(lkm.fd, lkm.state);
592 break;
593 default:
594 break;
595 }
596 /* flag to tell the polling thread to update its fd array */
597 kconsumerd_update_fd_array = 1;
598 /* signal the poll thread */
599 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
600 } else {
601 ERR("Didn't received any fd");
602 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
603 ret = -1;
604 goto end;
605 }
606 }
607
608 end:
609 return ret;
610 }
611
612 /*
613 * kconsumerd_thread_poll_fds
614 *
615 * This thread polls the fds in the ltt_fd_list to consume the data
616 * and write it to tracefile if necessary.
617 */
618 void *kconsumerd_thread_poll_fds(void *data)
619 {
620 int num_rdy, num_hup, high_prio, ret, i;
621 struct pollfd *pollfd = NULL;
622 /* local view of the fds */
623 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
624 /* local view of kconsumerd_fds_count */
625 int nb_fd = 0;
626 char tmp;
627 int tmp2;
628
629 ret = pipe(kconsumerd_thread_pipe);
630 if (ret < 0) {
631 perror("Error creating pipe");
632 goto end;
633 }
634
635 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
636
637 while (1) {
638 high_prio = 0;
639 num_hup = 0;
640
641 /*
642 * the ltt_fd_list has been updated, we need to update our
643 * local array as well
644 */
645 if (kconsumerd_update_fd_array == 1) {
646 if (pollfd != NULL) {
647 free(pollfd);
648 pollfd = NULL;
649 }
650 if (local_kconsumerd_fd != NULL) {
651 free(local_kconsumerd_fd);
652 local_kconsumerd_fd = NULL;
653 }
654 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
655 pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd));
656 if (pollfd == NULL) {
657 perror("pollfd malloc");
658 goto end;
659 }
660 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
661 local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) *
662 sizeof(struct kconsumerd_fd));
663 if (local_kconsumerd_fd == NULL) {
664 perror("local_kconsumerd_fd malloc");
665 goto end;
666 }
667 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
668 if (ret < 0) {
669 ERR("Error in allocating pollfd or local_outfds");
670 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
671 goto end;
672 }
673 nb_fd = ret;
674 }
675
676 /* poll on the array of fds */
677 DBG("polling on %d fd", nb_fd + 1);
678 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
679 DBG("poll num_rdy : %d", num_rdy);
680 if (num_rdy == -1) {
681 perror("Poll error");
682 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
683 goto end;
684 } else if (num_rdy == 0) {
685 DBG("Polling thread timed out");
686 goto end;
687 }
688
689 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
690 if (nb_fd == 0 && kconsumerd_quit == 1) {
691 goto end;
692 }
693
694 /*
695 * if only the kconsumerd_poll_pipe triggered poll to return just
696 * return to the beginning of the loop to update the array
697 */
698 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
699 DBG("kconsumerd_poll_pipe wake up");
700 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
701 continue;
702 }
703
704 /* Take care of high priority channels first. */
705 for (i = 0; i < nb_fd; i++) {
706 switch(pollfd[i].revents) {
707 case POLLERR:
708 ERR("Error returned in polling fd %d.", pollfd[i].fd);
709 kconsumerd_del_fd(local_kconsumerd_fd[i]);
710 kconsumerd_update_fd_array = 1;
711 num_hup++;
712 break;
713 case POLLHUP:
714 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
715 kconsumerd_del_fd(local_kconsumerd_fd[i]);
716 kconsumerd_update_fd_array = 1;
717 num_hup++;
718 break;
719 case POLLNVAL:
720 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
721 kconsumerd_del_fd(local_kconsumerd_fd[i]);
722 kconsumerd_update_fd_array = 1;
723 num_hup++;
724 break;
725 case POLLPRI:
726 DBG("Urgent read on fd %d", pollfd[i].fd);
727 high_prio = 1;
728 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
729 /* it's ok to have an unavailable sub-buffer */
730 if (ret == EAGAIN) {
731 ret = 0;
732 }
733 break;
734 }
735 }
736
737 /* If every buffer FD has hung up, we end the read loop here */
738 if (nb_fd > 0 && num_hup == nb_fd) {
739 DBG("every buffer FD has hung up\n");
740 if (kconsumerd_quit == 1) {
741 goto end;
742 }
743 continue;
744 }
745
746 /* Take care of low priority channels. */
747 if (high_prio == 0) {
748 for (i = 0; i < nb_fd; i++) {
749 if (pollfd[i].revents == POLLIN) {
750 DBG("Normal read on fd %d", pollfd[i].fd);
751 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
752 /* it's ok to have an unavailable subbuffer */
753 if (ret == EAGAIN) {
754 ret = 0;
755 }
756 }
757 }
758 }
759 }
760 end:
761 DBG("polling thread exiting");
762 if (pollfd != NULL) {
763 free(pollfd);
764 pollfd = NULL;
765 }
766 if (local_kconsumerd_fd != NULL) {
767 free(local_kconsumerd_fd);
768 local_kconsumerd_fd = NULL;
769 }
770 kconsumerd_cleanup();
771 return NULL;
772 }
773
774 /*
775 * kconsumerd_create_poll_pipe
776 *
777 * create the pipe to wake to polling thread when needed
778 */
779 int kconsumerd_create_poll_pipe()
780 {
781 return pipe(kconsumerd_poll_pipe);
782 }
783
784 /*
785 * kconsumerd_thread_receive_fds
786 *
787 * This thread listens on the consumerd socket and
788 * receives the file descriptors from ltt-sessiond
789 */
790 void *kconsumerd_thread_receive_fds(void *data)
791 {
792 int sock, client_socket, ret;
793 struct lttcomm_kconsumerd_header tmp;
794
795 DBG("Creating command socket %s", kconsumerd_command_sock_path);
796 unlink(kconsumerd_command_sock_path);
797 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
798 if (client_socket < 0) {
799 ERR("Cannot create command socket");
800 goto end;
801 }
802
803 ret = lttcomm_listen_unix_sock(client_socket);
804 if (ret < 0) {
805 goto end;
806 }
807
808 DBG("Sending ready command to ltt-sessiond");
809 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
810 if (ret < 0) {
811 ERR("Error sending ready command to ltt-sessiond");
812 goto end;
813 }
814
815 /* Blocking call, waiting for transmission */
816 sock = lttcomm_accept_unix_sock(client_socket);
817 if (sock <= 0) {
818 WARN("On accept");
819 goto end;
820 }
821 while (1) {
822 /* We first get the number of fd we are about to receive */
823 ret = lttcomm_recv_unix_sock(sock, &tmp,
824 sizeof(struct lttcomm_kconsumerd_header));
825 if (ret <= 0) {
826 ERR("Communication interrupted on command socket");
827 goto end;
828 }
829 if (tmp.cmd_type == STOP) {
830 DBG("Received STOP command");
831 goto end;
832 }
833 /* we received a command to add or update fds */
834 ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
835 if (ret <= 0) {
836 ERR("Receiving the FD, exiting");
837 goto end;
838 }
839 }
840
841 end:
842 DBG("kconsumerd_thread_receive_fds exiting");
843
844 /*
845 * when all fds have hung up, the polling thread
846 * can exit cleanly
847 */
848 kconsumerd_quit = 1;
849
850 /*
851 * 2s of grace period, if no polling events occur during
852 * this period, the polling thread will exit even if there
853 * are still open FDs (should not happen, but safety mechanism).
854 */
855 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
856
857 /* wake up the polling thread */
858 ret = write(kconsumerd_poll_pipe[1], "4", 1);
859 if (ret < 0) {
860 perror("poll pipe write");
861 }
862 return NULL;
863 }
864
865 /*
866 * kconsumerd_cleanup
867 *
868 * Cleanup the daemon's socket on exit
869 */
870 void kconsumerd_cleanup()
871 {
872 struct kconsumerd_fd *iter;
873
874 /* remove the socket file */
875 unlink(kconsumerd_command_sock_path);
876
877 /* close all outfd */
878 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
879 kconsumerd_del_fd(iter);
880 }
881 }
882
883 /*
884 * kconsumerd_send_error
885 *
886 * send return code to ltt-sessiond
887 */
888 int kconsumerd_send_error(enum lttcomm_return_code cmd)
889 {
890 if (kconsumerd_error_socket > 0) {
891 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
892 sizeof(enum lttcomm_sessiond_command));
893 }
894
895 return 0;
896 }
This page took 0.081433 seconds and 4 git commands to generate.