New public API for lttng control
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <fcntl.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <urcu/list.h>
31
32 #include "libkernelctl.h"
33 #include "liblttkconsumerd.h"
34 #include "lttngerr.h"
35
36 static
37 struct kconsumerd_global_data {
38 /*
39 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
40 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
41 * ensures the count matches the number of items in the fd_list.
42 * It ensures the list updates *always* trigger an fd_array
43 * update (therefore need to make list update vs
44 * kconsumerd_data.need_update flag update atomic, and also flag
45 * read, fd array and flag clear atomic).
46 */
47 pthread_mutex_t lock;
48 /*
49 * Number of element for the list below. Protected by
50 * kconsumerd_data.lock.
51 */
52 unsigned int fds_count;
53 /*
54 * List of FDs. Protected by kconsumerd_data.lock.
55 */
56 struct kconsumerd_fd_list fd_list;
57 /*
58 * Flag specifying if the local array of FDs needs update in the
59 * poll function. Protected by kconsumerd_data.lock.
60 */
61 unsigned int need_update;
62 } kconsumerd_data = {
63 .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
64 .need_update = 1,
65 };
66
67 /* communication with splice */
68 static int kconsumerd_thread_pipe[2];
69
70 /* pipe to wake the poll thread when necessary */
71 static int kconsumerd_poll_pipe[2];
72
73 /*
74 * TODO: create a should_quit pipe to let the signal handler wake up the
75 * fd receiver thread. It should be initialized before any signal can be
76 * received by the library.
77 */
78
79
80 /* timeout parameter, to control the polling thread grace period */
81 static int kconsumerd_poll_timeout = -1;
82
83 /* socket to communicate errors with sessiond */
84 static int kconsumerd_error_socket;
85
86 /* socket to exchange commands with sessiond */
87 static char *kconsumerd_command_sock_path;
88
89 /*
90 * flag to inform the polling thread to quit when all fd hung up.
91 * Updated by the kconsumerd_thread_receive_fds when it notices that all
92 * fds has hung up. Also updated by the signal handler
93 * (kconsumerd_should_exit()). Read by the polling threads.
94 */
95 static volatile int kconsumerd_quit = 0;
96
97 /*
98 * kconsumerd_set_error_socket
99 *
100 * Set the error socket
101 */
102 void kconsumerd_set_error_socket(int sock)
103 {
104 kconsumerd_error_socket = sock;
105 }
106
107 /*
108 * kconsumerd_set_command_socket_path
109 *
110 * Set the command socket path
111 */
112 void kconsumerd_set_command_socket_path(char *sock)
113 {
114 kconsumerd_command_sock_path = sock;
115 }
116
117 /*
118 * kconsumerd_find_session_fd
119 *
120 * Find a session fd in the global list.
121 * The kconsumerd_data.lock must be locked during this call
122 *
123 * Return 1 if found else 0
124 */
125 static int kconsumerd_find_session_fd(int fd)
126 {
127 struct kconsumerd_fd *iter;
128
129 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
130 if (iter->sessiond_fd == fd) {
131 DBG("Duplicate session fd %d", fd);
132 pthread_mutex_unlock(&kconsumerd_data.lock);
133 return 1;
134 }
135 }
136
137 return 0;
138 }
139
140 /*
141 * kconsumerd_del_fd
142 *
143 * Remove a fd from the global list protected by a mutex
144 */
145 static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
146 {
147 pthread_mutex_lock(&kconsumerd_data.lock);
148 cds_list_del(&lcf->list);
149 if (kconsumerd_data.fds_count > 0) {
150 kconsumerd_data.fds_count--;
151 if (lcf != NULL) {
152 close(lcf->out_fd);
153 close(lcf->consumerd_fd);
154 free(lcf);
155 lcf = NULL;
156 }
157 }
158 kconsumerd_data.need_update = 1;
159 pthread_mutex_unlock(&kconsumerd_data.lock);
160 }
161
162 /*
163 * kconsumerd_add_fd
164 *
165 * Add a fd to the global list protected by a mutex
166 */
167 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
168 {
169 int ret;
170 struct kconsumerd_fd *tmp_fd;
171
172 pthread_mutex_lock(&kconsumerd_data.lock);
173 /* Check if already exist */
174 ret = kconsumerd_find_session_fd(buf->fd);
175 if (ret == 1) {
176 goto end;
177 }
178
179 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
180 tmp_fd->sessiond_fd = buf->fd;
181 tmp_fd->consumerd_fd = consumerd_fd;
182 tmp_fd->state = buf->state;
183 tmp_fd->max_sb_size = buf->max_sb_size;
184 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
185
186 /* Opening the tracefile in write mode */
187 ret = open(tmp_fd->path_name,
188 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
189 if (ret < 0) {
190 ERR("Opening %s", tmp_fd->path_name);
191 perror("open");
192 goto end;
193 }
194 tmp_fd->out_fd = ret;
195 tmp_fd->out_fd_offset = 0;
196
197 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
198 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
199
200 cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
201 kconsumerd_data.fds_count++;
202 kconsumerd_data.need_update = 1;
203 end:
204 pthread_mutex_unlock(&kconsumerd_data.lock);
205 return ret;
206 }
207
208 /*
209 * kconsumerd_change_fd_state
210 *
211 * Update a fd according to what we just received
212 */
213 static void kconsumerd_change_fd_state(int sessiond_fd,
214 enum kconsumerd_fd_state state)
215 {
216 struct kconsumerd_fd *iter;
217
218 pthread_mutex_lock(&kconsumerd_data.lock);
219 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
220 if (iter->sessiond_fd == sessiond_fd) {
221 iter->state = state;
222 break;
223 }
224 }
225 kconsumerd_data.need_update = 1;
226 pthread_mutex_unlock(&kconsumerd_data.lock);
227 }
228
229 /*
230 * kconsumerd_update_poll_array
231 *
232 * Allocate the pollfd structure and the local view of the out fds
233 * to avoid doing a lookup in the linked list and concurrency issues
234 * when writing is needed.
235 * Returns the number of fds in the structures
236 * Called with kconsumerd_data.lock held.
237 */
238 static int kconsumerd_update_poll_array(struct pollfd **pollfd,
239 struct kconsumerd_fd **local_kconsumerd_fd)
240 {
241 struct kconsumerd_fd *iter;
242 int i = 0;
243
244 DBG("Updating poll fd array");
245
246 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
247 DBG("Inside for each");
248 if (iter->state == ACTIVE_FD) {
249 DBG("Active FD %d", iter->consumerd_fd);
250 (*pollfd)[i].fd = iter->consumerd_fd;
251 (*pollfd)[i].events = POLLIN | POLLPRI;
252 local_kconsumerd_fd[i] = iter;
253 i++;
254 }
255 }
256
257 /*
258 * insert the kconsumerd_poll_pipe at the end of the array and don't
259 * increment i so nb_fd is the number of real FD
260 */
261 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
262 (*pollfd)[i].events = POLLIN;
263 return i;
264 }
265
266
267 /*
268 * kconsumerd_on_read_subbuffer_mmap
269 *
270 * mmap the ring buffer, read it and write the data to the tracefile.
271 * Returns the number of bytes written
272 */
273 static int kconsumerd_on_read_subbuffer_mmap(
274 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
275 {
276 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
277 char *mmap_base;
278 char *padding = NULL;
279 long ret = 0;
280 off_t orig_offset = kconsumerd_fd->out_fd_offset;
281 int fd = kconsumerd_fd->consumerd_fd;
282 int outfd = kconsumerd_fd->out_fd;
283
284 /* get the padded subbuffer size to know the padding required */
285 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
286 if (ret != 0) {
287 ret = errno;
288 perror("kernctl_get_padded_subbuf_size");
289 goto end;
290 }
291 padding_len = padded_len - len;
292 padding = malloc(padding_len * sizeof(char));
293 memset(padding, '\0', padding_len);
294
295 /* get the len of the mmap region */
296 ret = kernctl_get_mmap_len(fd, &mmap_len);
297 if (ret != 0) {
298 ret = errno;
299 perror("kernctl_get_mmap_len");
300 goto end;
301 }
302
303 /* get the offset inside the fd to mmap */
304 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
305 if (ret != 0) {
306 ret = errno;
307 perror("kernctl_get_mmap_read_offset");
308 goto end;
309 }
310
311 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
312 if (mmap_base == MAP_FAILED) {
313 perror("Error mmaping");
314 ret = -1;
315 goto end;
316 }
317
318 while (len > 0) {
319 ret = write(outfd, mmap_base, len);
320 if (ret >= len) {
321 len = 0;
322 } else if (ret < 0) {
323 ret = errno;
324 perror("Error in file write");
325 goto end;
326 }
327 /* This won't block, but will start writeout asynchronously */
328 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
329 SYNC_FILE_RANGE_WRITE);
330 kconsumerd_fd->out_fd_offset += ret;
331 }
332
333 /* once all the data is written, write the padding to disk */
334 ret = write(outfd, padding, padding_len);
335 if (ret < 0) {
336 ret = errno;
337 perror("Error writing padding to file");
338 goto end;
339 }
340
341 /*
342 * This does a blocking write-and-wait on any page that belongs to the
343 * subbuffer prior to the one we just wrote.
344 * Don't care about error values, as these are just hints and ways to
345 * limit the amount of page cache used.
346 */
347 if (orig_offset >= kconsumerd_fd->max_sb_size) {
348 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
349 kconsumerd_fd->max_sb_size,
350 SYNC_FILE_RANGE_WAIT_BEFORE
351 | SYNC_FILE_RANGE_WRITE
352 | SYNC_FILE_RANGE_WAIT_AFTER);
353
354 /*
355 * Give hints to the kernel about how we access the file:
356 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
357 * we write it.
358 *
359 * We need to call fadvise again after the file grows because the
360 * kernel does not seem to apply fadvise to non-existing parts of the
361 * file.
362 *
363 * Call fadvise _after_ having waited for the page writeback to
364 * complete because the dirty page writeback semantic is not well
365 * defined. So it can be expected to lead to lower throughput in
366 * streaming.
367 */
368 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
369 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
370 }
371 goto end;
372
373 end:
374 if (padding != NULL) {
375 free(padding);
376 }
377 return ret;
378 }
379
380 /*
381 * kconsumerd_on_read_subbuffer
382 *
383 * Splice the data from the ring buffer to the tracefile.
384 * Returns the number of bytes spliced
385 */
386 static int kconsumerd_on_read_subbuffer(
387 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
388 {
389 long ret = 0;
390 loff_t offset = 0;
391 off_t orig_offset = kconsumerd_fd->out_fd_offset;
392 int fd = kconsumerd_fd->consumerd_fd;
393 int outfd = kconsumerd_fd->out_fd;
394
395 while (len > 0) {
396 DBG("splice chan to pipe offset %lu (fd : %d)",
397 (unsigned long)offset, fd);
398 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
399 SPLICE_F_MOVE | SPLICE_F_MORE);
400 DBG("splice chan to pipe ret %ld", ret);
401 if (ret < 0) {
402 ret = errno;
403 perror("Error in relay splice");
404 goto splice_error;
405 }
406
407 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
408 SPLICE_F_MOVE | SPLICE_F_MORE);
409 DBG("splice pipe to file %ld", ret);
410 if (ret < 0) {
411 ret = errno;
412 perror("Error in file splice");
413 goto splice_error;
414 }
415 if (ret >= len) {
416 len = 0;
417 }
418 /* This won't block, but will start writeout asynchronously */
419 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
420 SYNC_FILE_RANGE_WRITE);
421 kconsumerd_fd->out_fd_offset += ret;
422 }
423
424 /*
425 * This does a blocking write-and-wait on any page that belongs to the
426 * subbuffer prior to the one we just wrote.
427 * Don't care about error values, as these are just hints and ways to
428 * limit the amount of page cache used.
429 */
430 if (orig_offset >= kconsumerd_fd->max_sb_size) {
431 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
432 kconsumerd_fd->max_sb_size,
433 SYNC_FILE_RANGE_WAIT_BEFORE
434 | SYNC_FILE_RANGE_WRITE
435 | SYNC_FILE_RANGE_WAIT_AFTER);
436 /*
437 * Give hints to the kernel about how we access the file:
438 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
439 * we write it.
440 *
441 * We need to call fadvise again after the file grows because the
442 * kernel does not seem to apply fadvise to non-existing parts of the
443 * file.
444 *
445 * Call fadvise _after_ having waited for the page writeback to
446 * complete because the dirty page writeback semantic is not well
447 * defined. So it can be expected to lead to lower throughput in
448 * streaming.
449 */
450 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
451 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
452 }
453 goto end;
454
455 splice_error:
456 /* send the appropriate error description to sessiond */
457 switch(ret) {
458 case EBADF:
459 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
460 break;
461 case EINVAL:
462 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
463 break;
464 case ENOMEM:
465 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
466 break;
467 case ESPIPE:
468 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
469 break;
470 }
471
472 end:
473 return ret;
474 }
475
476 /*
477 * kconsumerd_read_subbuffer
478 *
479 * Consume data on a file descriptor and write it on a trace file
480 */
481 static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
482 {
483 unsigned long len;
484 int err;
485 long ret = 0;
486 int infd = kconsumerd_fd->consumerd_fd;
487
488 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
489 /* Get the next subbuffer */
490 err = kernctl_get_next_subbuf(infd);
491 if (err != 0) {
492 ret = errno;
493 perror("Reserving sub buffer failed (everything is normal, "
494 "it is due to concurrency)");
495 goto end;
496 }
497
498 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
499 case LTTNG_EVENT_SPLICE:
500 /* read the whole subbuffer */
501 err = kernctl_get_padded_subbuf_size(infd, &len);
502 if (err != 0) {
503 ret = errno;
504 perror("Getting sub-buffer len failed.");
505 goto end;
506 }
507
508 /* splice the subbuffer to the tracefile */
509 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
510 if (ret < 0) {
511 /*
512 * display the error but continue processing to try
513 * to release the subbuffer
514 */
515 ERR("Error splicing to tracefile");
516 }
517 break;
518 case LTTNG_EVENT_MMAP:
519 /* read the used subbuffer size */
520 err = kernctl_get_subbuf_size(infd, &len);
521 if (err != 0) {
522 ret = errno;
523 perror("Getting sub-buffer len failed.");
524 goto end;
525 }
526 /* write the subbuffer to the tracefile */
527 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
528 if (ret < 0) {
529 /*
530 * display the error but continue processing to try
531 * to release the subbuffer
532 */
533 ERR("Error writing to tracefile");
534 }
535 break;
536 default:
537 ERR("Unknown output method");
538 ret = -1;
539 }
540
541 err = kernctl_put_next_subbuf(infd);
542 if (err != 0) {
543 ret = errno;
544 if (errno == EFAULT) {
545 perror("Error in unreserving sub buffer\n");
546 } else if (errno == EIO) {
547 /* Should never happen with newer LTTng versions */
548 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
549 }
550 goto end;
551 }
552
553 end:
554 return ret;
555 }
556
557 /*
558 * kconsumerd_consumerd_recv_fd
559 *
560 * Receives an array of file descriptors and the associated
561 * structures describing each fd (path name).
562 * Returns the size of received data
563 */
564 static int kconsumerd_consumerd_recv_fd(int sfd, int size,
565 enum kconsumerd_command cmd_type)
566 {
567 struct msghdr msg;
568 struct iovec iov[1];
569 int ret = 0, i, tmp2;
570 struct cmsghdr *cmsg;
571 int nb_fd;
572 char recv_fd[CMSG_SPACE(sizeof(int))];
573 struct lttcomm_kconsumerd_msg lkm;
574
575 /* the number of fds we are about to receive */
576 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
577
578 for (i = 0; i < nb_fd; i++) {
579 memset(&msg, 0, sizeof(msg));
580
581 /* Prepare to receive the structures */
582 iov[0].iov_base = &lkm;
583 iov[0].iov_len = sizeof(lkm);
584 msg.msg_iov = iov;
585 msg.msg_iovlen = 1;
586
587 msg.msg_control = recv_fd;
588 msg.msg_controllen = sizeof(recv_fd);
589
590 DBG("Waiting to receive fd");
591 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
592 perror("recvmsg");
593 continue;
594 }
595
596 if (ret != (size / nb_fd)) {
597 ERR("Received only %d, expected %d", ret, size);
598 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
599 goto end;
600 }
601
602 cmsg = CMSG_FIRSTHDR(&msg);
603 if (!cmsg) {
604 ERR("Invalid control message header");
605 ret = -1;
606 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
607 goto end;
608 }
609 /* if we received fds */
610 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
611 switch (cmd_type) {
612 case ADD_STREAM:
613 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
614 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
615 if (ret < 0) {
616 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
617 goto end;
618 }
619 break;
620 case UPDATE_STREAM:
621 kconsumerd_change_fd_state(lkm.fd, lkm.state);
622 break;
623 default:
624 break;
625 }
626 /* signal the poll thread */
627 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
628 } else {
629 ERR("Didn't received any fd");
630 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
631 ret = -1;
632 goto end;
633 }
634 }
635
636 end:
637 return ret;
638 }
639
640 /*
641 * kconsumerd_thread_poll_fds
642 *
643 * This thread polls the fds in the ltt_fd_list to consume the data
644 * and write it to tracefile if necessary.
645 */
646 void *kconsumerd_thread_poll_fds(void *data)
647 {
648 int num_rdy, num_hup, high_prio, ret, i;
649 struct pollfd *pollfd = NULL;
650 /* local view of the fds */
651 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
652 /* local view of kconsumerd_data.fds_count */
653 int nb_fd = 0;
654 char tmp;
655 int tmp2;
656
657 ret = pipe(kconsumerd_thread_pipe);
658 if (ret < 0) {
659 perror("Error creating pipe");
660 goto end;
661 }
662
663 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
664
665 while (1) {
666 high_prio = 0;
667 num_hup = 0;
668
669 /*
670 * the ltt_fd_list has been updated, we need to update our
671 * local array as well
672 */
673 pthread_mutex_lock(&kconsumerd_data.lock);
674 if (kconsumerd_data.need_update) {
675 if (pollfd != NULL) {
676 free(pollfd);
677 pollfd = NULL;
678 }
679 if (local_kconsumerd_fd != NULL) {
680 free(local_kconsumerd_fd);
681 local_kconsumerd_fd = NULL;
682 }
683
684 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
685 pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
686 if (pollfd == NULL) {
687 perror("pollfd malloc");
688 pthread_mutex_unlock(&kconsumerd_data.lock);
689 goto end;
690 }
691
692 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
693 local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
694 sizeof(struct kconsumerd_fd));
695 if (local_kconsumerd_fd == NULL) {
696 perror("local_kconsumerd_fd malloc");
697 pthread_mutex_unlock(&kconsumerd_data.lock);
698 goto end;
699 }
700 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
701 if (ret < 0) {
702 ERR("Error in allocating pollfd or local_outfds");
703 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
704 pthread_mutex_unlock(&kconsumerd_data.lock);
705 goto end;
706 }
707 nb_fd = ret;
708 kconsumerd_data.need_update = 0;
709 }
710 pthread_mutex_unlock(&kconsumerd_data.lock);
711
712 /* poll on the array of fds */
713 DBG("polling on %d fd", nb_fd + 1);
714 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
715 DBG("poll num_rdy : %d", num_rdy);
716 if (num_rdy == -1) {
717 perror("Poll error");
718 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
719 goto end;
720 } else if (num_rdy == 0) {
721 DBG("Polling thread timed out");
722 goto end;
723 }
724
725 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
726 if (nb_fd == 0 && kconsumerd_quit == 1) {
727 goto end;
728 }
729
730 /*
731 * If the kconsumerd_poll_pipe triggered poll go
732 * directly to the beginning of the loop to update the
733 * array. We want to prioritize array update over
734 * low-priority reads.
735 */
736 if (pollfd[nb_fd].revents == POLLIN) {
737 DBG("kconsumerd_poll_pipe wake up");
738 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
739 continue;
740 }
741
742 /* Take care of high priority channels first. */
743 for (i = 0; i < nb_fd; i++) {
744 switch(pollfd[i].revents) {
745 case POLLERR:
746 ERR("Error returned in polling fd %d.", pollfd[i].fd);
747 kconsumerd_del_fd(local_kconsumerd_fd[i]);
748 num_hup++;
749 break;
750 case POLLHUP:
751 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
752 kconsumerd_del_fd(local_kconsumerd_fd[i]);
753 num_hup++;
754 break;
755 case POLLNVAL:
756 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
757 kconsumerd_del_fd(local_kconsumerd_fd[i]);
758 num_hup++;
759 break;
760 case POLLPRI:
761 DBG("Urgent read on fd %d", pollfd[i].fd);
762 high_prio = 1;
763 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
764 /* it's ok to have an unavailable sub-buffer */
765 if (ret == EAGAIN) {
766 ret = 0;
767 }
768 break;
769 }
770 }
771
772 /* If every buffer FD has hung up, we end the read loop here */
773 if (nb_fd > 0 && num_hup == nb_fd) {
774 DBG("every buffer FD has hung up\n");
775 if (kconsumerd_quit == 1) {
776 goto end;
777 }
778 continue;
779 }
780
781 /* Take care of low priority channels. */
782 if (high_prio == 0) {
783 for (i = 0; i < nb_fd; i++) {
784 if (pollfd[i].revents == POLLIN) {
785 DBG("Normal read on fd %d", pollfd[i].fd);
786 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
787 /* it's ok to have an unavailable subbuffer */
788 if (ret == EAGAIN) {
789 ret = 0;
790 }
791 }
792 }
793 }
794 }
795 end:
796 DBG("polling thread exiting");
797 if (pollfd != NULL) {
798 free(pollfd);
799 pollfd = NULL;
800 }
801 if (local_kconsumerd_fd != NULL) {
802 free(local_kconsumerd_fd);
803 local_kconsumerd_fd = NULL;
804 }
805 return NULL;
806 }
807
808 /*
809 * kconsumerd_create_poll_pipe
810 *
811 * create the pipe to wake to polling thread when needed
812 */
813 int kconsumerd_create_poll_pipe()
814 {
815 return pipe(kconsumerd_poll_pipe);
816 }
817
818 /*
819 * kconsumerd_thread_receive_fds
820 *
821 * This thread listens on the consumerd socket and
822 * receives the file descriptors from ltt-sessiond
823 */
824 void *kconsumerd_thread_receive_fds(void *data)
825 {
826 int sock, client_socket, ret;
827 struct lttcomm_kconsumerd_header tmp;
828
829 DBG("Creating command socket %s", kconsumerd_command_sock_path);
830 unlink(kconsumerd_command_sock_path);
831 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
832 if (client_socket < 0) {
833 ERR("Cannot create command socket");
834 goto end;
835 }
836
837 ret = lttcomm_listen_unix_sock(client_socket);
838 if (ret < 0) {
839 goto end;
840 }
841
842 DBG("Sending ready command to ltt-sessiond");
843 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
844 if (ret < 0) {
845 ERR("Error sending ready command to ltt-sessiond");
846 goto end;
847 }
848
849 /* TODO: poll on socket and "should_quit" fd pipe */
850 /* TODO: change blocking call into non-blocking call */
851 /* Blocking call, waiting for transmission */
852 sock = lttcomm_accept_unix_sock(client_socket);
853 if (sock <= 0) {
854 WARN("On accept");
855 goto end;
856 }
857 while (1) {
858 /* We first get the number of fd we are about to receive */
859 /* TODO: poll on sock and "should_quit" fd pipe */
860 /* TODO: change recv into a non-blocking call */
861 ret = lttcomm_recv_unix_sock(sock, &tmp,
862 sizeof(struct lttcomm_kconsumerd_header));
863 if (ret <= 0) {
864 ERR("Communication interrupted on command socket");
865 goto end;
866 }
867 if (tmp.cmd_type == STOP) {
868 DBG("Received STOP command");
869 goto end;
870 }
871 if (kconsumerd_quit) {
872 DBG("kconsumerd_thread_receive_fds received quit from signal");
873 goto end;
874 }
875 /* we received a command to add or update fds */
876 ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
877 if (ret <= 0) {
878 ERR("Receiving the FD, exiting");
879 goto end;
880 }
881 }
882
883 end:
884 DBG("kconsumerd_thread_receive_fds exiting");
885
886 /*
887 * when all fds have hung up, the polling thread
888 * can exit cleanly
889 */
890 kconsumerd_quit = 1;
891
892 /*
893 * 2s of grace period, if no polling events occur during
894 * this period, the polling thread will exit even if there
895 * are still open FDs (should not happen, but safety mechanism).
896 */
897 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
898
899 /* wake up the polling thread */
900 ret = write(kconsumerd_poll_pipe[1], "4", 1);
901 if (ret < 0) {
902 perror("poll pipe write");
903 }
904 return NULL;
905 }
906
907 /*
908 * kconsumerd_cleanup
909 *
910 * Cleanup the daemon's socket on exit
911 */
912 void kconsumerd_cleanup(void)
913 {
914 struct kconsumerd_fd *iter;
915
916 /* remove the socket file */
917 unlink(kconsumerd_command_sock_path);
918
919 /*
920 * close all outfd. Called when there are no more threads
921 * running (after joining on the threads), no need to protect
922 * list iteration with mutex.
923 */
924 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
925 kconsumerd_del_fd(iter);
926 }
927 }
928
929 /*
930 * Called from signal handler.
931 */
932 void kconsumerd_should_exit(void)
933 {
934 kconsumerd_quit = 1;
935 /*
936 * TODO: write into a should_quit pipe to wake up the fd
937 * receiver thread.
938 */
939 }
940
941 /*
942 * kconsumerd_send_error
943 *
944 * send return code to ltt-sessiond
945 */
946 int kconsumerd_send_error(enum lttcomm_return_code cmd)
947 {
948 if (kconsumerd_error_socket > 0) {
949 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
950 sizeof(enum lttcomm_sessiond_command));
951 }
952
953 return 0;
954 }
This page took 0.079757 seconds and 4 git commands to generate.