Locking fix.
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
... / ...
CommitLineData
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <urcu/list.h>
31
32#include "libkernelctl.h"
33#include "liblttkconsumerd.h"
34#include "lttngerr.h"
35
36static
37struct kconsumerd_global_data {
38 /*
39 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
40 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
41 * ensures the count matches the number of items in the fd_list.
42 * It ensures the list updates *always* trigger an fd_array
43 * update (therefore need to make list update vs
44 * kconsumerd_data.need_update flag update atomic, and also flag
45 * read, fd array and flag clear atomic).
46 */
47 pthread_mutex_t lock;
48 /*
49 * Number of element for the list below. Protected by
50 * kconsumerd_data.lock.
51 */
52 unsigned int fds_count;
53 /*
54 * List of FDs. Protected by kconsumerd_data.lock.
55 */
56 struct kconsumerd_fd_list fd_list;
57 /*
58 * Flag specifying if the local array of FDs needs update in the
59 * poll function. Protected by kconsumerd_data.lock.
60 */
61 unsigned int need_update;
62} kconsumerd_data = {
63 .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
64};
65
66/* communication with splice */
67static int kconsumerd_thread_pipe[2];
68
69/* pipe to wake the poll thread when necessary */
70static int kconsumerd_poll_pipe[2];
71
72/* timeout parameter, to control the polling thread grace period */
73static int kconsumerd_poll_timeout = -1;
74
75/* socket to communicate errors with sessiond */
76static int kconsumerd_error_socket;
77
78/* socket to exchange commands with sessiond */
79static char *kconsumerd_command_sock_path;
80
81/* flag to inform the polling thread to kconsumerd_quit when all fd hung up */
82static int kconsumerd_quit = 0;
83
84/*
85 * kconsumerd_set_error_socket
86 *
87 * Set the error socket
88 */
89void kconsumerd_set_error_socket(int sock)
90{
91 kconsumerd_error_socket = sock;
92}
93
94/*
95 * kconsumerd_set_command_socket_path
96 *
97 * Set the command socket path
98 */
99void kconsumerd_set_command_socket_path(char *sock)
100{
101 kconsumerd_command_sock_path = sock;
102}
103
104/*
105 * kconsumerd_find_session_fd
106 *
107 * Find a session fd in the global list.
108 *
109 * Return 1 if found else 0
110 */
111static int kconsumerd_find_session_fd(int fd)
112{
113 struct kconsumerd_fd *iter;
114
115 pthread_mutex_lock(&kconsumerd_data.lock);
116 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
117 if (iter->sessiond_fd == fd) {
118 DBG("Duplicate session fd %d", fd);
119 pthread_mutex_unlock(&kconsumerd_data.lock);
120 return 1;
121 }
122 }
123 pthread_mutex_unlock(&kconsumerd_data.lock);
124
125 return 0;
126}
127
128/*
129 * kconsumerd_del_fd
130 *
131 * Remove a fd from the global list protected by a mutex
132 */
133static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
134{
135 pthread_mutex_lock(&kconsumerd_data.lock);
136 cds_list_del(&lcf->list);
137 if (kconsumerd_data.fds_count > 0) {
138 kconsumerd_data.fds_count--;
139 if (lcf != NULL) {
140 close(lcf->out_fd);
141 close(lcf->consumerd_fd);
142 free(lcf);
143 lcf = NULL;
144 }
145 }
146 kconsumerd_data.need_update = 1;
147 pthread_mutex_unlock(&kconsumerd_data.lock);
148}
149
150/*
151 * kconsumerd_add_fd
152 *
153 * Add a fd to the global list protected by a mutex
154 */
155static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
156{
157 int ret;
158 struct kconsumerd_fd *tmp_fd;
159
160 pthread_mutex_lock(&kconsumerd_data.lock);
161 /* Check if already exist */
162 ret = kconsumerd_find_session_fd(buf->fd);
163 if (ret == 1) {
164 goto end;
165 }
166
167 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
168 tmp_fd->sessiond_fd = buf->fd;
169 tmp_fd->consumerd_fd = consumerd_fd;
170 tmp_fd->state = buf->state;
171 tmp_fd->max_sb_size = buf->max_sb_size;
172 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
173
174 /* Opening the tracefile in write mode */
175 ret = open(tmp_fd->path_name,
176 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
177 if (ret < 0) {
178 ERR("Opening %s", tmp_fd->path_name);
179 perror("open");
180 goto end;
181 }
182 tmp_fd->out_fd = ret;
183 tmp_fd->out_fd_offset = 0;
184
185 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
186 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
187
188 cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
189 kconsumerd_data.fds_count++;
190 kconsumerd_data.need_update = 1;
191end:
192 pthread_mutex_unlock(&kconsumerd_data.lock);
193 return ret;
194}
195
196/*
197 * kconsumerd_change_fd_state
198 *
199 * Update a fd according to what we just received
200 */
201static void kconsumerd_change_fd_state(int sessiond_fd,
202 enum kconsumerd_fd_state state)
203{
204 struct kconsumerd_fd *iter;
205
206 pthread_mutex_lock(&kconsumerd_data.lock);
207 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
208 if (iter->sessiond_fd == sessiond_fd) {
209 iter->state = state;
210 break;
211 }
212 }
213 kconsumerd_data.need_update = 1;
214 pthread_mutex_unlock(&kconsumerd_data.lock);
215}
216
217/*
218 * kconsumerd_update_poll_array
219 *
220 * Allocate the pollfd structure and the local view of the out fds
221 * to avoid doing a lookup in the linked list and concurrency issues
222 * when writing is needed.
223 * Returns the number of fds in the structures
224 * Called with kconsumerd_data.lock held.
225 */
226static int kconsumerd_update_poll_array(struct pollfd **pollfd,
227 struct kconsumerd_fd **local_kconsumerd_fd)
228{
229 struct kconsumerd_fd *iter;
230 int i = 0;
231
232 DBG("Updating poll fd array");
233
234 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
235 DBG("Inside for each");
236 if (iter->state == ACTIVE_FD) {
237 DBG("Active FD %d", iter->consumerd_fd);
238 (*pollfd)[i].fd = iter->consumerd_fd;
239 (*pollfd)[i].events = POLLIN | POLLPRI;
240 local_kconsumerd_fd[i] = iter;
241 i++;
242 }
243 }
244
245 /*
246 * insert the kconsumerd_poll_pipe at the end of the array and don't
247 * increment i so nb_fd is the number of real FD
248 */
249 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
250 (*pollfd)[i].events = POLLIN;
251 return i;
252}
253
254
255/*
256 * kconsumerd_on_read_subbuffer_mmap
257 *
258 * mmap the ring buffer, read it and write the data to the tracefile.
259 * Returns the number of bytes written
260 */
261static int kconsumerd_on_read_subbuffer_mmap(
262 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
263{
264 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
265 char *mmap_base;
266 char *padding = NULL;
267 long ret = 0;
268 off_t orig_offset = kconsumerd_fd->out_fd_offset;
269 int fd = kconsumerd_fd->consumerd_fd;
270 int outfd = kconsumerd_fd->out_fd;
271
272 /* get the padded subbuffer size to know the padding required */
273 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
274 if (ret != 0) {
275 ret = errno;
276 perror("kernctl_get_padded_subbuf_size");
277 goto end;
278 }
279 padding_len = padded_len - len;
280 padding = malloc(padding_len * sizeof(char));
281 memset(padding, '\0', padding_len);
282
283 /* get the len of the mmap region */
284 ret = kernctl_get_mmap_len(fd, &mmap_len);
285 if (ret != 0) {
286 ret = errno;
287 perror("kernctl_get_mmap_len");
288 goto end;
289 }
290
291 /* get the offset inside the fd to mmap */
292 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
293 if (ret != 0) {
294 ret = errno;
295 perror("kernctl_get_mmap_read_offset");
296 goto end;
297 }
298
299 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
300 if (mmap_base == MAP_FAILED) {
301 perror("Error mmaping");
302 ret = -1;
303 goto end;
304 }
305
306 while (len > 0) {
307 ret = write(outfd, mmap_base, len);
308 if (ret >= len) {
309 len = 0;
310 } else if (ret < 0) {
311 ret = errno;
312 perror("Error in file write");
313 goto end;
314 }
315 /* This won't block, but will start writeout asynchronously */
316 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
317 SYNC_FILE_RANGE_WRITE);
318 kconsumerd_fd->out_fd_offset += ret;
319 }
320
321 /* once all the data is written, write the padding to disk */
322 ret = write(outfd, padding, padding_len);
323 if (ret < 0) {
324 ret = errno;
325 perror("Error writing padding to file");
326 goto end;
327 }
328
329 /*
330 * This does a blocking write-and-wait on any page that belongs to the
331 * subbuffer prior to the one we just wrote.
332 * Don't care about error values, as these are just hints and ways to
333 * limit the amount of page cache used.
334 */
335 if (orig_offset >= kconsumerd_fd->max_sb_size) {
336 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
337 kconsumerd_fd->max_sb_size,
338 SYNC_FILE_RANGE_WAIT_BEFORE
339 | SYNC_FILE_RANGE_WRITE
340 | SYNC_FILE_RANGE_WAIT_AFTER);
341
342 /*
343 * Give hints to the kernel about how we access the file:
344 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
345 * we write it.
346 *
347 * We need to call fadvise again after the file grows because the
348 * kernel does not seem to apply fadvise to non-existing parts of the
349 * file.
350 *
351 * Call fadvise _after_ having waited for the page writeback to
352 * complete because the dirty page writeback semantic is not well
353 * defined. So it can be expected to lead to lower throughput in
354 * streaming.
355 */
356 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
357 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
358 }
359 goto end;
360
361end:
362 if (padding != NULL) {
363 free(padding);
364 }
365 return ret;
366}
367
368/*
369 * kconsumerd_on_read_subbuffer
370 *
371 * Splice the data from the ring buffer to the tracefile.
372 * Returns the number of bytes spliced
373 */
374static int kconsumerd_on_read_subbuffer(
375 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
376{
377 long ret = 0;
378 loff_t offset = 0;
379 off_t orig_offset = kconsumerd_fd->out_fd_offset;
380 int fd = kconsumerd_fd->consumerd_fd;
381 int outfd = kconsumerd_fd->out_fd;
382
383 while (len > 0) {
384 DBG("splice chan to pipe offset %lu (fd : %d)",
385 (unsigned long)offset, fd);
386 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
387 SPLICE_F_MOVE | SPLICE_F_MORE);
388 DBG("splice chan to pipe ret %ld", ret);
389 if (ret < 0) {
390 ret = errno;
391 perror("Error in relay splice");
392 goto splice_error;
393 }
394
395 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
396 SPLICE_F_MOVE | SPLICE_F_MORE);
397 DBG("splice pipe to file %ld", ret);
398 if (ret < 0) {
399 ret = errno;
400 perror("Error in file splice");
401 goto splice_error;
402 }
403 if (ret >= len) {
404 len = 0;
405 }
406 /* This won't block, but will start writeout asynchronously */
407 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
408 SYNC_FILE_RANGE_WRITE);
409 kconsumerd_fd->out_fd_offset += ret;
410 }
411
412 /*
413 * This does a blocking write-and-wait on any page that belongs to the
414 * subbuffer prior to the one we just wrote.
415 * Don't care about error values, as these are just hints and ways to
416 * limit the amount of page cache used.
417 */
418 if (orig_offset >= kconsumerd_fd->max_sb_size) {
419 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
420 kconsumerd_fd->max_sb_size,
421 SYNC_FILE_RANGE_WAIT_BEFORE
422 | SYNC_FILE_RANGE_WRITE
423 | SYNC_FILE_RANGE_WAIT_AFTER);
424 /*
425 * Give hints to the kernel about how we access the file:
426 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
427 * we write it.
428 *
429 * We need to call fadvise again after the file grows because the
430 * kernel does not seem to apply fadvise to non-existing parts of the
431 * file.
432 *
433 * Call fadvise _after_ having waited for the page writeback to
434 * complete because the dirty page writeback semantic is not well
435 * defined. So it can be expected to lead to lower throughput in
436 * streaming.
437 */
438 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
439 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
440 }
441 goto end;
442
443splice_error:
444 /* send the appropriate error description to sessiond */
445 switch(ret) {
446 case EBADF:
447 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
448 break;
449 case EINVAL:
450 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
451 break;
452 case ENOMEM:
453 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
454 break;
455 case ESPIPE:
456 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
457 break;
458 }
459
460end:
461 return ret;
462}
463
464/*
465 * kconsumerd_read_subbuffer
466 *
467 * Consume data on a file descriptor and write it on a trace file
468 */
469static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
470{
471 unsigned long len;
472 int err;
473 long ret = 0;
474 int infd = kconsumerd_fd->consumerd_fd;
475
476 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
477 /* Get the next subbuffer */
478 err = kernctl_get_next_subbuf(infd);
479 if (err != 0) {
480 ret = errno;
481 perror("Reserving sub buffer failed (everything is normal, "
482 "it is due to concurrency)");
483 goto end;
484 }
485
486 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
487 case LTTNG_KERNEL_SPLICE:
488 /* read the whole subbuffer */
489 err = kernctl_get_padded_subbuf_size(infd, &len);
490 if (err != 0) {
491 ret = errno;
492 perror("Getting sub-buffer len failed.");
493 goto end;
494 }
495
496 /* splice the subbuffer to the tracefile */
497 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
498 if (ret < 0) {
499 /*
500 * display the error but continue processing to try
501 * to release the subbuffer
502 */
503 ERR("Error splicing to tracefile");
504 }
505 break;
506 case LTTNG_KERNEL_MMAP:
507 /* read the used subbuffer size */
508 err = kernctl_get_subbuf_size(infd, &len);
509 if (err != 0) {
510 ret = errno;
511 perror("Getting sub-buffer len failed.");
512 goto end;
513 }
514 /* write the subbuffer to the tracefile */
515 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
516 if (ret < 0) {
517 /*
518 * display the error but continue processing to try
519 * to release the subbuffer
520 */
521 ERR("Error writing to tracefile");
522 }
523 break;
524 default:
525 ERR("Unknown output method");
526 ret = -1;
527 }
528
529 err = kernctl_put_next_subbuf(infd);
530 if (err != 0) {
531 ret = errno;
532 if (errno == EFAULT) {
533 perror("Error in unreserving sub buffer\n");
534 } else if (errno == EIO) {
535 /* Should never happen with newer LTTng versions */
536 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
537 }
538 goto end;
539 }
540
541end:
542 return ret;
543}
544
545/*
546 * kconsumerd_consumerd_recv_fd
547 *
548 * Receives an array of file descriptors and the associated
549 * structures describing each fd (path name).
550 * Returns the size of received data
551 */
552static int kconsumerd_consumerd_recv_fd(int sfd, int size,
553 enum kconsumerd_command cmd_type)
554{
555 struct msghdr msg;
556 struct iovec iov[1];
557 int ret = 0, i, tmp2;
558 struct cmsghdr *cmsg;
559 int nb_fd;
560 char recv_fd[CMSG_SPACE(sizeof(int))];
561 struct lttcomm_kconsumerd_msg lkm;
562
563 /* the number of fds we are about to receive */
564 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
565
566 for (i = 0; i < nb_fd; i++) {
567 memset(&msg, 0, sizeof(msg));
568
569 /* Prepare to receive the structures */
570 iov[0].iov_base = &lkm;
571 iov[0].iov_len = sizeof(lkm);
572 msg.msg_iov = iov;
573 msg.msg_iovlen = 1;
574
575 msg.msg_control = recv_fd;
576 msg.msg_controllen = sizeof(recv_fd);
577
578 DBG("Waiting to receive fd");
579 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
580 perror("recvmsg");
581 continue;
582 }
583
584 if (ret != (size / nb_fd)) {
585 ERR("Received only %d, expected %d", ret, size);
586 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
587 goto end;
588 }
589
590 cmsg = CMSG_FIRSTHDR(&msg);
591 if (!cmsg) {
592 ERR("Invalid control message header");
593 ret = -1;
594 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
595 goto end;
596 }
597 /* if we received fds */
598 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
599 switch (cmd_type) {
600 case ADD_STREAM:
601 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
602 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
603 if (ret < 0) {
604 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
605 goto end;
606 }
607 break;
608 case UPDATE_STREAM:
609 kconsumerd_change_fd_state(lkm.fd, lkm.state);
610 break;
611 default:
612 break;
613 }
614 /* signal the poll thread */
615 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
616 } else {
617 ERR("Didn't received any fd");
618 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
619 ret = -1;
620 goto end;
621 }
622 }
623
624end:
625 return ret;
626}
627
628/*
629 * kconsumerd_thread_poll_fds
630 *
631 * This thread polls the fds in the ltt_fd_list to consume the data
632 * and write it to tracefile if necessary.
633 */
634void *kconsumerd_thread_poll_fds(void *data)
635{
636 int num_rdy, num_hup, high_prio, ret, i;
637 struct pollfd *pollfd = NULL;
638 /* local view of the fds */
639 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
640 /* local view of kconsumerd_data.fds_count */
641 int nb_fd = 0;
642 char tmp;
643 int tmp2;
644
645 ret = pipe(kconsumerd_thread_pipe);
646 if (ret < 0) {
647 perror("Error creating pipe");
648 goto end;
649 }
650
651 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
652
653 while (1) {
654 high_prio = 0;
655 num_hup = 0;
656
657 /*
658 * the ltt_fd_list has been updated, we need to update our
659 * local array as well
660 */
661 pthread_mutex_lock(&kconsumerd_data.lock);
662 if (kconsumerd_data.need_update) {
663 if (pollfd != NULL) {
664 free(pollfd);
665 pollfd = NULL;
666 }
667 if (local_kconsumerd_fd != NULL) {
668 free(local_kconsumerd_fd);
669 local_kconsumerd_fd = NULL;
670 }
671
672 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
673 pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
674 if (pollfd == NULL) {
675 perror("pollfd malloc");
676 pthread_mutex_unlock(&kconsumerd_data.lock);
677 goto end;
678 }
679
680 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
681 local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
682 sizeof(struct kconsumerd_fd));
683 if (local_kconsumerd_fd == NULL) {
684 perror("local_kconsumerd_fd malloc");
685 pthread_mutex_unlock(&kconsumerd_data.lock);
686 goto end;
687 }
688 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
689 if (ret < 0) {
690 ERR("Error in allocating pollfd or local_outfds");
691 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
692 pthread_mutex_unlock(&kconsumerd_data.lock);
693 goto end;
694 }
695 nb_fd = ret;
696 kconsumerd_data.need_update = 0;
697 }
698 pthread_mutex_unlock(&kconsumerd_data.lock);
699
700 /* poll on the array of fds */
701 DBG("polling on %d fd", nb_fd + 1);
702 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
703 DBG("poll num_rdy : %d", num_rdy);
704 if (num_rdy == -1) {
705 perror("Poll error");
706 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
707 goto end;
708 } else if (num_rdy == 0) {
709 DBG("Polling thread timed out");
710 goto end;
711 }
712
713 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
714 if (nb_fd == 0 && kconsumerd_quit == 1) {
715 goto end;
716 }
717
718 /*
719 * If the kconsumerd_poll_pipe triggered poll go
720 * directly to the beginning of the loop to update the
721 * array. We want to prioritize array update over
722 * low-priority reads.
723 */
724 if (pollfd[nb_fd].revents == POLLIN) {
725 DBG("kconsumerd_poll_pipe wake up");
726 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
727 continue;
728 }
729
730 /* Take care of high priority channels first. */
731 for (i = 0; i < nb_fd; i++) {
732 switch(pollfd[i].revents) {
733 case POLLERR:
734 ERR("Error returned in polling fd %d.", pollfd[i].fd);
735 kconsumerd_del_fd(local_kconsumerd_fd[i]);
736 num_hup++;
737 break;
738 case POLLHUP:
739 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
740 kconsumerd_del_fd(local_kconsumerd_fd[i]);
741 num_hup++;
742 break;
743 case POLLNVAL:
744 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
745 kconsumerd_del_fd(local_kconsumerd_fd[i]);
746 num_hup++;
747 break;
748 case POLLPRI:
749 DBG("Urgent read on fd %d", pollfd[i].fd);
750 high_prio = 1;
751 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
752 /* it's ok to have an unavailable sub-buffer */
753 if (ret == EAGAIN) {
754 ret = 0;
755 }
756 break;
757 }
758 }
759
760 /* If every buffer FD has hung up, we end the read loop here */
761 if (nb_fd > 0 && num_hup == nb_fd) {
762 DBG("every buffer FD has hung up\n");
763 if (kconsumerd_quit == 1) {
764 goto end;
765 }
766 continue;
767 }
768
769 /* Take care of low priority channels. */
770 if (high_prio == 0) {
771 for (i = 0; i < nb_fd; i++) {
772 if (pollfd[i].revents == POLLIN) {
773 DBG("Normal read on fd %d", pollfd[i].fd);
774 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
775 /* it's ok to have an unavailable subbuffer */
776 if (ret == EAGAIN) {
777 ret = 0;
778 }
779 }
780 }
781 }
782 }
783end:
784 DBG("polling thread exiting");
785 if (pollfd != NULL) {
786 free(pollfd);
787 pollfd = NULL;
788 }
789 if (local_kconsumerd_fd != NULL) {
790 free(local_kconsumerd_fd);
791 local_kconsumerd_fd = NULL;
792 }
793 kconsumerd_cleanup();
794 return NULL;
795}
796
797/*
798 * kconsumerd_create_poll_pipe
799 *
800 * create the pipe to wake to polling thread when needed
801 */
802int kconsumerd_create_poll_pipe()
803{
804 return pipe(kconsumerd_poll_pipe);
805}
806
807/*
808 * kconsumerd_thread_receive_fds
809 *
810 * This thread listens on the consumerd socket and
811 * receives the file descriptors from ltt-sessiond
812 */
813void *kconsumerd_thread_receive_fds(void *data)
814{
815 int sock, client_socket, ret;
816 struct lttcomm_kconsumerd_header tmp;
817
818 DBG("Creating command socket %s", kconsumerd_command_sock_path);
819 unlink(kconsumerd_command_sock_path);
820 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
821 if (client_socket < 0) {
822 ERR("Cannot create command socket");
823 goto end;
824 }
825
826 ret = lttcomm_listen_unix_sock(client_socket);
827 if (ret < 0) {
828 goto end;
829 }
830
831 DBG("Sending ready command to ltt-sessiond");
832 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
833 if (ret < 0) {
834 ERR("Error sending ready command to ltt-sessiond");
835 goto end;
836 }
837
838 /* Blocking call, waiting for transmission */
839 sock = lttcomm_accept_unix_sock(client_socket);
840 if (sock <= 0) {
841 WARN("On accept");
842 goto end;
843 }
844 while (1) {
845 /* We first get the number of fd we are about to receive */
846 ret = lttcomm_recv_unix_sock(sock, &tmp,
847 sizeof(struct lttcomm_kconsumerd_header));
848 if (ret <= 0) {
849 ERR("Communication interrupted on command socket");
850 goto end;
851 }
852 if (tmp.cmd_type == STOP) {
853 DBG("Received STOP command");
854 goto end;
855 }
856 /* we received a command to add or update fds */
857 ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
858 if (ret <= 0) {
859 ERR("Receiving the FD, exiting");
860 goto end;
861 }
862 }
863
864end:
865 DBG("kconsumerd_thread_receive_fds exiting");
866
867 /*
868 * when all fds have hung up, the polling thread
869 * can exit cleanly
870 */
871 kconsumerd_quit = 1;
872
873 /*
874 * 2s of grace period, if no polling events occur during
875 * this period, the polling thread will exit even if there
876 * are still open FDs (should not happen, but safety mechanism).
877 */
878 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
879
880 /* wake up the polling thread */
881 ret = write(kconsumerd_poll_pipe[1], "4", 1);
882 if (ret < 0) {
883 perror("poll pipe write");
884 }
885 return NULL;
886}
887
888/*
889 * kconsumerd_cleanup
890 *
891 * Cleanup the daemon's socket on exit
892 */
893void kconsumerd_cleanup()
894{
895 struct kconsumerd_fd *iter;
896
897 /* remove the socket file */
898 unlink(kconsumerd_command_sock_path);
899
900 /* close all outfd */
901 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
902 kconsumerd_del_fd(iter);
903 }
904}
905
906/*
907 * kconsumerd_send_error
908 *
909 * send return code to ltt-sessiond
910 */
911int kconsumerd_send_error(enum lttcomm_return_code cmd)
912{
913 if (kconsumerd_error_socket > 0) {
914 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
915 sizeof(enum lttcomm_sessiond_command));
916 }
917
918 return 0;
919}
This page took 0.02591 seconds and 4 git commands to generate.