Fix locking dependency problem
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
CommitLineData
1ce86c9a
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <urcu/list.h>
31
32#include "libkernelctl.h"
33#include "liblttkconsumerd.h"
34#include "lttngerr.h"
35
242cd187
MD
36static
37struct kconsumerd_global_data {
38 /*
39 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
40 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
41 * ensures the count matches the number of items in the fd_list.
42 * It ensures the list updates *always* trigger an fd_array
43 * update (therefore need to make list update vs
44 * kconsumerd_data.need_update flag update atomic, and also flag
45 * read, fd array and flag clear atomic).
46 */
47 pthread_mutex_t lock;
48 /*
49 * Number of element for the list below. Protected by
50 * kconsumerd_data.lock.
51 */
52 unsigned int fds_count;
53 /*
54 * List of FDs. Protected by kconsumerd_data.lock.
55 */
56 struct kconsumerd_fd_list fd_list;
57 /*
58 * Flag specifying if the local array of FDs needs update in the
59 * poll function. Protected by kconsumerd_data.lock.
60 */
61 unsigned int need_update;
62} kconsumerd_data = {
63 .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
1ce86c9a
JD
64};
65
1ce86c9a
JD
66/* communication with splice */
67static int kconsumerd_thread_pipe[2];
68
69/* pipe to wake the poll thread when necessary */
70static int kconsumerd_poll_pipe[2];
71
3dcd2721
MD
72/*
73 * TODO: create a should_quit pipe to let the signal handler wake up the
74 * fd receiver thread. It should be initialized before any signal can be
75 * received by the library.
76 */
77
fec07047 78
1ce86c9a
JD
79/* timeout parameter, to control the polling thread grace period */
80static int kconsumerd_poll_timeout = -1;
81
82/* socket to communicate errors with sessiond */
83static int kconsumerd_error_socket;
84
85/* socket to exchange commands with sessiond */
86static char *kconsumerd_command_sock_path;
87
3dcd2721
MD
88/*
89 * flag to inform the polling thread to quit when all fd hung up.
90 * Updated by the kconsumerd_thread_receive_fds when it notices that all
91 * fds has hung up. Also updated by the signal handler
92 * (kconsumerd_should_exit()). Read by the polling threads.
93 */
94static volatile int kconsumerd_quit = 0;
1ce86c9a
JD
95
96/*
97 * kconsumerd_set_error_socket
98 *
99 * Set the error socket
100 */
101void kconsumerd_set_error_socket(int sock)
102{
103 kconsumerd_error_socket = sock;
104}
105
106/*
107 * kconsumerd_set_command_socket_path
108 *
109 * Set the command socket path
110 */
111void kconsumerd_set_command_socket_path(char *sock)
112{
113 kconsumerd_command_sock_path = sock;
114}
115
38079a1b
DG
116/*
117 * kconsumerd_find_session_fd
118 *
119 * Find a session fd in the global list.
fec07047 120 * The kconsumerd_data.lock must be locked during this call
38079a1b
DG
121 *
122 * Return 1 if found else 0
123 */
124static int kconsumerd_find_session_fd(int fd)
125{
126 struct kconsumerd_fd *iter;
127
242cd187 128 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
38079a1b
DG
129 if (iter->sessiond_fd == fd) {
130 DBG("Duplicate session fd %d", fd);
242cd187 131 pthread_mutex_unlock(&kconsumerd_data.lock);
38079a1b
DG
132 return 1;
133 }
134 }
38079a1b
DG
135
136 return 0;
137}
138
1ce86c9a
JD
139/*
140 * kconsumerd_del_fd
141 *
142 * Remove a fd from the global list protected by a mutex
143 */
144static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
145{
242cd187 146 pthread_mutex_lock(&kconsumerd_data.lock);
1ce86c9a 147 cds_list_del(&lcf->list);
242cd187
MD
148 if (kconsumerd_data.fds_count > 0) {
149 kconsumerd_data.fds_count--;
1ce86c9a
JD
150 if (lcf != NULL) {
151 close(lcf->out_fd);
152 close(lcf->consumerd_fd);
153 free(lcf);
154 lcf = NULL;
155 }
156 }
242cd187
MD
157 kconsumerd_data.need_update = 1;
158 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
159}
160
161/*
162 * kconsumerd_add_fd
163 *
164 * Add a fd to the global list protected by a mutex
165 */
166static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
167{
1ce86c9a 168 int ret;
38079a1b
DG
169 struct kconsumerd_fd *tmp_fd;
170
242cd187 171 pthread_mutex_lock(&kconsumerd_data.lock);
38079a1b
DG
172 /* Check if already exist */
173 ret = kconsumerd_find_session_fd(buf->fd);
174 if (ret == 1) {
175 goto end;
176 }
1ce86c9a
JD
177
178 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
179 tmp_fd->sessiond_fd = buf->fd;
180 tmp_fd->consumerd_fd = consumerd_fd;
181 tmp_fd->state = buf->state;
182 tmp_fd->max_sb_size = buf->max_sb_size;
183 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
184
185 /* Opening the tracefile in write mode */
186 ret = open(tmp_fd->path_name,
187 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
188 if (ret < 0) {
189 ERR("Opening %s", tmp_fd->path_name);
190 perror("open");
191 goto end;
192 }
193 tmp_fd->out_fd = ret;
194 tmp_fd->out_fd_offset = 0;
195
196 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
197 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
198
242cd187
MD
199 cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
200 kconsumerd_data.fds_count++;
201 kconsumerd_data.need_update = 1;
1ce86c9a 202end:
242cd187 203 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
204 return ret;
205}
206
207/*
208 * kconsumerd_change_fd_state
209 *
210 * Update a fd according to what we just received
211 */
212static void kconsumerd_change_fd_state(int sessiond_fd,
213 enum kconsumerd_fd_state state)
214{
215 struct kconsumerd_fd *iter;
0237248c 216
242cd187
MD
217 pthread_mutex_lock(&kconsumerd_data.lock);
218 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
219 if (iter->sessiond_fd == sessiond_fd) {
220 iter->state = state;
221 break;
222 }
223 }
242cd187
MD
224 kconsumerd_data.need_update = 1;
225 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
226}
227
228/*
229 * kconsumerd_update_poll_array
230 *
231 * Allocate the pollfd structure and the local view of the out fds
232 * to avoid doing a lookup in the linked list and concurrency issues
233 * when writing is needed.
234 * Returns the number of fds in the structures
242cd187 235 * Called with kconsumerd_data.lock held.
1ce86c9a
JD
236 */
237static int kconsumerd_update_poll_array(struct pollfd **pollfd,
238 struct kconsumerd_fd **local_kconsumerd_fd)
239{
240 struct kconsumerd_fd *iter;
241 int i = 0;
242
243 DBG("Updating poll fd array");
1ce86c9a 244
242cd187 245 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
246 DBG("Inside for each");
247 if (iter->state == ACTIVE_FD) {
248 DBG("Active FD %d", iter->consumerd_fd);
249 (*pollfd)[i].fd = iter->consumerd_fd;
250 (*pollfd)[i].events = POLLIN | POLLPRI;
251 local_kconsumerd_fd[i] = iter;
252 i++;
253 }
254 }
255
256 /*
257 * insert the kconsumerd_poll_pipe at the end of the array and don't
258 * increment i so nb_fd is the number of real FD
259 */
260 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
261 (*pollfd)[i].events = POLLIN;
1ce86c9a
JD
262 return i;
263}
264
265
266/*
267 * kconsumerd_on_read_subbuffer_mmap
268 *
269 * mmap the ring buffer, read it and write the data to the tracefile.
270 * Returns the number of bytes written
271 */
272static int kconsumerd_on_read_subbuffer_mmap(
273 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
274{
275 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
276 char *mmap_base;
277 char *padding = NULL;
278 long ret = 0;
279 off_t orig_offset = kconsumerd_fd->out_fd_offset;
280 int fd = kconsumerd_fd->consumerd_fd;
281 int outfd = kconsumerd_fd->out_fd;
282
283 /* get the padded subbuffer size to know the padding required */
284 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
285 if (ret != 0) {
286 ret = errno;
287 perror("kernctl_get_padded_subbuf_size");
288 goto end;
289 }
290 padding_len = padded_len - len;
291 padding = malloc(padding_len * sizeof(char));
292 memset(padding, '\0', padding_len);
293
294 /* get the len of the mmap region */
295 ret = kernctl_get_mmap_len(fd, &mmap_len);
296 if (ret != 0) {
297 ret = errno;
298 perror("kernctl_get_mmap_len");
299 goto end;
300 }
301
302 /* get the offset inside the fd to mmap */
303 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
304 if (ret != 0) {
305 ret = errno;
306 perror("kernctl_get_mmap_read_offset");
307 goto end;
308 }
309
310 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
311 if (mmap_base == MAP_FAILED) {
312 perror("Error mmaping");
313 ret = -1;
314 goto end;
315 }
316
317 while (len > 0) {
318 ret = write(outfd, mmap_base, len);
319 if (ret >= len) {
320 len = 0;
321 } else if (ret < 0) {
322 ret = errno;
323 perror("Error in file write");
324 goto end;
325 }
326 /* This won't block, but will start writeout asynchronously */
327 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
328 SYNC_FILE_RANGE_WRITE);
329 kconsumerd_fd->out_fd_offset += ret;
330 }
331
332 /* once all the data is written, write the padding to disk */
333 ret = write(outfd, padding, padding_len);
334 if (ret < 0) {
335 ret = errno;
336 perror("Error writing padding to file");
337 goto end;
338 }
339
340 /*
341 * This does a blocking write-and-wait on any page that belongs to the
342 * subbuffer prior to the one we just wrote.
343 * Don't care about error values, as these are just hints and ways to
344 * limit the amount of page cache used.
345 */
346 if (orig_offset >= kconsumerd_fd->max_sb_size) {
347 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
348 kconsumerd_fd->max_sb_size,
349 SYNC_FILE_RANGE_WAIT_BEFORE
350 | SYNC_FILE_RANGE_WRITE
351 | SYNC_FILE_RANGE_WAIT_AFTER);
352
353 /*
354 * Give hints to the kernel about how we access the file:
355 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
356 * we write it.
357 *
358 * We need to call fadvise again after the file grows because the
359 * kernel does not seem to apply fadvise to non-existing parts of the
360 * file.
361 *
362 * Call fadvise _after_ having waited for the page writeback to
363 * complete because the dirty page writeback semantic is not well
364 * defined. So it can be expected to lead to lower throughput in
365 * streaming.
366 */
367 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
368 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
369 }
370 goto end;
371
372end:
373 if (padding != NULL) {
374 free(padding);
375 }
376 return ret;
377}
378
379/*
380 * kconsumerd_on_read_subbuffer
381 *
382 * Splice the data from the ring buffer to the tracefile.
383 * Returns the number of bytes spliced
384 */
385static int kconsumerd_on_read_subbuffer(
386 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
387{
388 long ret = 0;
389 loff_t offset = 0;
390 off_t orig_offset = kconsumerd_fd->out_fd_offset;
391 int fd = kconsumerd_fd->consumerd_fd;
392 int outfd = kconsumerd_fd->out_fd;
393
394 while (len > 0) {
395 DBG("splice chan to pipe offset %lu (fd : %d)",
396 (unsigned long)offset, fd);
397 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
398 SPLICE_F_MOVE | SPLICE_F_MORE);
399 DBG("splice chan to pipe ret %ld", ret);
400 if (ret < 0) {
401 ret = errno;
402 perror("Error in relay splice");
403 goto splice_error;
404 }
405
406 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
407 SPLICE_F_MOVE | SPLICE_F_MORE);
408 DBG("splice pipe to file %ld", ret);
409 if (ret < 0) {
410 ret = errno;
411 perror("Error in file splice");
412 goto splice_error;
413 }
414 if (ret >= len) {
415 len = 0;
416 }
417 /* This won't block, but will start writeout asynchronously */
418 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
419 SYNC_FILE_RANGE_WRITE);
420 kconsumerd_fd->out_fd_offset += ret;
421 }
422
423 /*
424 * This does a blocking write-and-wait on any page that belongs to the
425 * subbuffer prior to the one we just wrote.
426 * Don't care about error values, as these are just hints and ways to
427 * limit the amount of page cache used.
428 */
429 if (orig_offset >= kconsumerd_fd->max_sb_size) {
430 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
431 kconsumerd_fd->max_sb_size,
432 SYNC_FILE_RANGE_WAIT_BEFORE
433 | SYNC_FILE_RANGE_WRITE
434 | SYNC_FILE_RANGE_WAIT_AFTER);
435 /*
436 * Give hints to the kernel about how we access the file:
437 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
438 * we write it.
439 *
440 * We need to call fadvise again after the file grows because the
441 * kernel does not seem to apply fadvise to non-existing parts of the
442 * file.
443 *
444 * Call fadvise _after_ having waited for the page writeback to
445 * complete because the dirty page writeback semantic is not well
446 * defined. So it can be expected to lead to lower throughput in
447 * streaming.
448 */
449 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
450 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
451 }
452 goto end;
453
454splice_error:
455 /* send the appropriate error description to sessiond */
456 switch(ret) {
457 case EBADF:
458 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
459 break;
460 case EINVAL:
461 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
462 break;
463 case ENOMEM:
464 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
465 break;
466 case ESPIPE:
467 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
468 break;
469 }
470
471end:
472 return ret;
473}
474
475/*
476 * kconsumerd_read_subbuffer
477 *
478 * Consume data on a file descriptor and write it on a trace file
479 */
480static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
481{
482 unsigned long len;
483 int err;
484 long ret = 0;
485 int infd = kconsumerd_fd->consumerd_fd;
486
487 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
488 /* Get the next subbuffer */
489 err = kernctl_get_next_subbuf(infd);
490 if (err != 0) {
491 ret = errno;
492 perror("Reserving sub buffer failed (everything is normal, "
493 "it is due to concurrency)");
494 goto end;
495 }
496
497 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
498 case LTTNG_KERNEL_SPLICE:
499 /* read the whole subbuffer */
500 err = kernctl_get_padded_subbuf_size(infd, &len);
501 if (err != 0) {
502 ret = errno;
503 perror("Getting sub-buffer len failed.");
504 goto end;
505 }
506
507 /* splice the subbuffer to the tracefile */
508 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
509 if (ret < 0) {
510 /*
511 * display the error but continue processing to try
512 * to release the subbuffer
513 */
514 ERR("Error splicing to tracefile");
515 }
516 break;
517 case LTTNG_KERNEL_MMAP:
518 /* read the used subbuffer size */
519 err = kernctl_get_subbuf_size(infd, &len);
520 if (err != 0) {
521 ret = errno;
522 perror("Getting sub-buffer len failed.");
523 goto end;
524 }
525 /* write the subbuffer to the tracefile */
526 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
527 if (ret < 0) {
528 /*
529 * display the error but continue processing to try
530 * to release the subbuffer
531 */
532 ERR("Error writing to tracefile");
533 }
534 break;
535 default:
536 ERR("Unknown output method");
537 ret = -1;
538 }
539
540 err = kernctl_put_next_subbuf(infd);
541 if (err != 0) {
542 ret = errno;
543 if (errno == EFAULT) {
544 perror("Error in unreserving sub buffer\n");
545 } else if (errno == EIO) {
546 /* Should never happen with newer LTTng versions */
547 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
548 }
549 goto end;
550 }
551
552end:
553 return ret;
554}
555
556/*
557 * kconsumerd_consumerd_recv_fd
558 *
559 * Receives an array of file descriptors and the associated
560 * structures describing each fd (path name).
561 * Returns the size of received data
562 */
563static int kconsumerd_consumerd_recv_fd(int sfd, int size,
564 enum kconsumerd_command cmd_type)
565{
566 struct msghdr msg;
567 struct iovec iov[1];
568 int ret = 0, i, tmp2;
569 struct cmsghdr *cmsg;
570 int nb_fd;
571 char recv_fd[CMSG_SPACE(sizeof(int))];
572 struct lttcomm_kconsumerd_msg lkm;
573
574 /* the number of fds we are about to receive */
575 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
576
577 for (i = 0; i < nb_fd; i++) {
578 memset(&msg, 0, sizeof(msg));
579
580 /* Prepare to receive the structures */
581 iov[0].iov_base = &lkm;
582 iov[0].iov_len = sizeof(lkm);
583 msg.msg_iov = iov;
584 msg.msg_iovlen = 1;
585
586 msg.msg_control = recv_fd;
587 msg.msg_controllen = sizeof(recv_fd);
588
589 DBG("Waiting to receive fd");
590 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
591 perror("recvmsg");
592 continue;
593 }
594
595 if (ret != (size / nb_fd)) {
596 ERR("Received only %d, expected %d", ret, size);
597 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
598 goto end;
599 }
600
601 cmsg = CMSG_FIRSTHDR(&msg);
602 if (!cmsg) {
603 ERR("Invalid control message header");
604 ret = -1;
605 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
606 goto end;
607 }
608 /* if we received fds */
609 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
610 switch (cmd_type) {
611 case ADD_STREAM:
612 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
613 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
614 if (ret < 0) {
615 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
616 goto end;
617 }
618 break;
619 case UPDATE_STREAM:
620 kconsumerd_change_fd_state(lkm.fd, lkm.state);
621 break;
622 default:
623 break;
624 }
1ce86c9a
JD
625 /* signal the poll thread */
626 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
627 } else {
628 ERR("Didn't received any fd");
629 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
630 ret = -1;
631 goto end;
632 }
633 }
634
635end:
1ce86c9a
JD
636 return ret;
637}
638
639/*
640 * kconsumerd_thread_poll_fds
641 *
642 * This thread polls the fds in the ltt_fd_list to consume the data
643 * and write it to tracefile if necessary.
644 */
645void *kconsumerd_thread_poll_fds(void *data)
646{
647 int num_rdy, num_hup, high_prio, ret, i;
648 struct pollfd *pollfd = NULL;
649 /* local view of the fds */
650 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
242cd187 651 /* local view of kconsumerd_data.fds_count */
1ce86c9a
JD
652 int nb_fd = 0;
653 char tmp;
654 int tmp2;
655
656 ret = pipe(kconsumerd_thread_pipe);
657 if (ret < 0) {
658 perror("Error creating pipe");
659 goto end;
660 }
661
662 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
663
664 while (1) {
665 high_prio = 0;
666 num_hup = 0;
667
668 /*
669 * the ltt_fd_list has been updated, we need to update our
670 * local array as well
671 */
242cd187
MD
672 pthread_mutex_lock(&kconsumerd_data.lock);
673 if (kconsumerd_data.need_update) {
1ce86c9a
JD
674 if (pollfd != NULL) {
675 free(pollfd);
676 pollfd = NULL;
677 }
678 if (local_kconsumerd_fd != NULL) {
679 free(local_kconsumerd_fd);
680 local_kconsumerd_fd = NULL;
681 }
0237248c 682
1ce86c9a 683 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 684 pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
1ce86c9a
JD
685 if (pollfd == NULL) {
686 perror("pollfd malloc");
242cd187 687 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
688 goto end;
689 }
0237248c 690
1ce86c9a 691 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 692 local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
1ce86c9a
JD
693 sizeof(struct kconsumerd_fd));
694 if (local_kconsumerd_fd == NULL) {
695 perror("local_kconsumerd_fd malloc");
242cd187 696 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
697 goto end;
698 }
699 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
700 if (ret < 0) {
701 ERR("Error in allocating pollfd or local_outfds");
702 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
242cd187 703 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
704 goto end;
705 }
706 nb_fd = ret;
242cd187 707 kconsumerd_data.need_update = 0;
1ce86c9a 708 }
242cd187 709 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
710
711 /* poll on the array of fds */
712 DBG("polling on %d fd", nb_fd + 1);
713 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
714 DBG("poll num_rdy : %d", num_rdy);
715 if (num_rdy == -1) {
716 perror("Poll error");
717 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
718 goto end;
719 } else if (num_rdy == 0) {
720 DBG("Polling thread timed out");
721 goto end;
722 }
723
724 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
725 if (nb_fd == 0 && kconsumerd_quit == 1) {
726 goto end;
727 }
728
729 /*
242cd187
MD
730 * If the kconsumerd_poll_pipe triggered poll go
731 * directly to the beginning of the loop to update the
732 * array. We want to prioritize array update over
733 * low-priority reads.
1ce86c9a 734 */
242cd187 735 if (pollfd[nb_fd].revents == POLLIN) {
1ce86c9a
JD
736 DBG("kconsumerd_poll_pipe wake up");
737 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
738 continue;
739 }
740
741 /* Take care of high priority channels first. */
742 for (i = 0; i < nb_fd; i++) {
743 switch(pollfd[i].revents) {
744 case POLLERR:
745 ERR("Error returned in polling fd %d.", pollfd[i].fd);
746 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
747 num_hup++;
748 break;
749 case POLLHUP:
750 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
751 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
752 num_hup++;
753 break;
754 case POLLNVAL:
755 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
756 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
757 num_hup++;
758 break;
759 case POLLPRI:
760 DBG("Urgent read on fd %d", pollfd[i].fd);
761 high_prio = 1;
762 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
763 /* it's ok to have an unavailable sub-buffer */
764 if (ret == EAGAIN) {
765 ret = 0;
766 }
767 break;
768 }
769 }
770
771 /* If every buffer FD has hung up, we end the read loop here */
772 if (nb_fd > 0 && num_hup == nb_fd) {
773 DBG("every buffer FD has hung up\n");
774 if (kconsumerd_quit == 1) {
775 goto end;
776 }
777 continue;
778 }
779
780 /* Take care of low priority channels. */
781 if (high_prio == 0) {
782 for (i = 0; i < nb_fd; i++) {
783 if (pollfd[i].revents == POLLIN) {
784 DBG("Normal read on fd %d", pollfd[i].fd);
785 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
786 /* it's ok to have an unavailable subbuffer */
787 if (ret == EAGAIN) {
788 ret = 0;
789 }
790 }
791 }
792 }
793 }
794end:
795 DBG("polling thread exiting");
796 if (pollfd != NULL) {
797 free(pollfd);
798 pollfd = NULL;
799 }
800 if (local_kconsumerd_fd != NULL) {
801 free(local_kconsumerd_fd);
802 local_kconsumerd_fd = NULL;
803 }
1ce86c9a
JD
804 return NULL;
805}
806
807/*
808 * kconsumerd_create_poll_pipe
809 *
810 * create the pipe to wake to polling thread when needed
811 */
812int kconsumerd_create_poll_pipe()
813{
814 return pipe(kconsumerd_poll_pipe);
815}
816
817/*
818 * kconsumerd_thread_receive_fds
819 *
820 * This thread listens on the consumerd socket and
821 * receives the file descriptors from ltt-sessiond
822 */
823void *kconsumerd_thread_receive_fds(void *data)
824{
825 int sock, client_socket, ret;
826 struct lttcomm_kconsumerd_header tmp;
827
828 DBG("Creating command socket %s", kconsumerd_command_sock_path);
829 unlink(kconsumerd_command_sock_path);
830 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
831 if (client_socket < 0) {
832 ERR("Cannot create command socket");
833 goto end;
834 }
835
836 ret = lttcomm_listen_unix_sock(client_socket);
837 if (ret < 0) {
838 goto end;
839 }
840
841 DBG("Sending ready command to ltt-sessiond");
842 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
843 if (ret < 0) {
844 ERR("Error sending ready command to ltt-sessiond");
845 goto end;
846 }
847
3dcd2721
MD
848 /* TODO: poll on socket and "should_quit" fd pipe */
849 /* TODO: change blocking call into non-blocking call */
1ce86c9a
JD
850 /* Blocking call, waiting for transmission */
851 sock = lttcomm_accept_unix_sock(client_socket);
852 if (sock <= 0) {
853 WARN("On accept");
854 goto end;
855 }
856 while (1) {
857 /* We first get the number of fd we are about to receive */
3dcd2721
MD
858 /* TODO: poll on sock and "should_quit" fd pipe */
859 /* TODO: change recv into a non-blocking call */
1ce86c9a
JD
860 ret = lttcomm_recv_unix_sock(sock, &tmp,
861 sizeof(struct lttcomm_kconsumerd_header));
862 if (ret <= 0) {
863 ERR("Communication interrupted on command socket");
864 goto end;
865 }
866 if (tmp.cmd_type == STOP) {
867 DBG("Received STOP command");
868 goto end;
869 }
3dcd2721
MD
870 if (kconsumerd_quit) {
871 DBG("kconsumerd_thread_receive_fds received quit from signal");
872 goto end;
873 }
1ce86c9a
JD
874 /* we received a command to add or update fds */
875 ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
876 if (ret <= 0) {
877 ERR("Receiving the FD, exiting");
878 goto end;
879 }
880 }
881
882end:
883 DBG("kconsumerd_thread_receive_fds exiting");
884
885 /*
886 * when all fds have hung up, the polling thread
887 * can exit cleanly
888 */
889 kconsumerd_quit = 1;
890
891 /*
892 * 2s of grace period, if no polling events occur during
893 * this period, the polling thread will exit even if there
894 * are still open FDs (should not happen, but safety mechanism).
895 */
896 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
897
898 /* wake up the polling thread */
899 ret = write(kconsumerd_poll_pipe[1], "4", 1);
900 if (ret < 0) {
901 perror("poll pipe write");
902 }
903 return NULL;
904}
905
906/*
907 * kconsumerd_cleanup
908 *
909 * Cleanup the daemon's socket on exit
910 */
3dcd2721 911void kconsumerd_cleanup(void)
1ce86c9a
JD
912{
913 struct kconsumerd_fd *iter;
914
915 /* remove the socket file */
916 unlink(kconsumerd_command_sock_path);
917
3dcd2721
MD
918 /*
919 * close all outfd. Called when there are no more threads
920 * running (after joining on the threads), no need to protect
921 * list iteration with mutex.
922 */
242cd187 923 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
924 kconsumerd_del_fd(iter);
925 }
926}
927
3dcd2721
MD
928/*
929 * Called from signal handler.
930 */
931void kconsumerd_should_exit(void)
932{
933 kconsumerd_quit = 1;
934 /*
935 * TODO: write into a should_quit pipe to wake up the fd
936 * receiver thread.
937 */
938}
939
1ce86c9a
JD
940/*
941 * kconsumerd_send_error
942 *
943 * send return code to ltt-sessiond
944 */
945int kconsumerd_send_error(enum lttcomm_return_code cmd)
946{
947 if (kconsumerd_error_socket > 0) {
948 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
949 sizeof(enum lttcomm_sessiond_command));
950 }
951
952 return 0;
953}
This page took 0.059419 seconds and 4 git commands to generate.