Fix mutex unprotected list
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
CommitLineData
1ce86c9a
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <urcu/list.h>
31
32#include "libkernelctl.h"
33#include "liblttkconsumerd.h"
34#include "lttngerr.h"
35
36/* Init the list of FDs */
37static struct kconsumerd_fd_list kconsumerd_fd_list = {
38 .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
39};
40
41/* Number of element for the list below. */
42static unsigned int kconsumerd_fds_count;
43
44/* If the local array of FDs needs update in the poll function */
45static unsigned int kconsumerd_update_fd_array = 1;
46
47/* lock the fd array and structures */
48static pthread_mutex_t kconsumerd_lock_fds;
49
50/* communication with splice */
51static int kconsumerd_thread_pipe[2];
52
53/* pipe to wake the poll thread when necessary */
54static int kconsumerd_poll_pipe[2];
55
56/* timeout parameter, to control the polling thread grace period */
57static int kconsumerd_poll_timeout = -1;
58
59/* socket to communicate errors with sessiond */
60static int kconsumerd_error_socket;
61
62/* socket to exchange commands with sessiond */
63static char *kconsumerd_command_sock_path;
64
65/* flag to inform the polling thread to kconsumerd_quit when all fd hung up */
66static int kconsumerd_quit = 0;
67
68/*
69 * kconsumerd_set_error_socket
70 *
71 * Set the error socket
72 */
73void kconsumerd_set_error_socket(int sock)
74{
75 kconsumerd_error_socket = sock;
76}
77
78/*
79 * kconsumerd_set_command_socket_path
80 *
81 * Set the command socket path
82 */
83void kconsumerd_set_command_socket_path(char *sock)
84{
85 kconsumerd_command_sock_path = sock;
86}
87
38079a1b
DG
88/*
89 * kconsumerd_find_session_fd
90 *
91 * Find a session fd in the global list.
92 *
93 * Return 1 if found else 0
94 */
95static int kconsumerd_find_session_fd(int fd)
96{
97 struct kconsumerd_fd *iter;
98
99 pthread_mutex_lock(&kconsumerd_lock_fds);
100 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
101 if (iter->sessiond_fd == fd) {
102 DBG("Duplicate session fd %d", fd);
103 pthread_mutex_unlock(&kconsumerd_lock_fds);
104 return 1;
105 }
106 }
107 pthread_mutex_unlock(&kconsumerd_lock_fds);
108
109 return 0;
110}
111
1ce86c9a
JD
112/*
113 * kconsumerd_del_fd
114 *
115 * Remove a fd from the global list protected by a mutex
116 */
117static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
118{
119 pthread_mutex_lock(&kconsumerd_lock_fds);
120 cds_list_del(&lcf->list);
121 if (kconsumerd_fds_count > 0) {
122 kconsumerd_fds_count--;
123 if (lcf != NULL) {
124 close(lcf->out_fd);
125 close(lcf->consumerd_fd);
126 free(lcf);
127 lcf = NULL;
128 }
129 }
130 pthread_mutex_unlock(&kconsumerd_lock_fds);
131}
132
133/*
134 * kconsumerd_add_fd
135 *
136 * Add a fd to the global list protected by a mutex
137 */
138static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
139{
1ce86c9a 140 int ret;
38079a1b
DG
141 struct kconsumerd_fd *tmp_fd;
142
143 /* Check if already exist */
144 ret = kconsumerd_find_session_fd(buf->fd);
145 if (ret == 1) {
146 goto end;
147 }
1ce86c9a
JD
148
149 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
150 tmp_fd->sessiond_fd = buf->fd;
151 tmp_fd->consumerd_fd = consumerd_fd;
152 tmp_fd->state = buf->state;
153 tmp_fd->max_sb_size = buf->max_sb_size;
154 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
155
156 /* Opening the tracefile in write mode */
157 ret = open(tmp_fd->path_name,
158 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
159 if (ret < 0) {
160 ERR("Opening %s", tmp_fd->path_name);
161 perror("open");
162 goto end;
163 }
164 tmp_fd->out_fd = ret;
165 tmp_fd->out_fd_offset = 0;
166
167 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
168 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
169
170 pthread_mutex_lock(&kconsumerd_lock_fds);
171 cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
172 kconsumerd_fds_count++;
173 pthread_mutex_unlock(&kconsumerd_lock_fds);
174
175end:
176 return ret;
177}
178
179/*
180 * kconsumerd_change_fd_state
181 *
182 * Update a fd according to what we just received
183 */
184static void kconsumerd_change_fd_state(int sessiond_fd,
185 enum kconsumerd_fd_state state)
186{
187 struct kconsumerd_fd *iter;
0237248c
DG
188
189 pthread_mutex_lock(&kconsumerd_lock_fds);
1ce86c9a
JD
190 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
191 if (iter->sessiond_fd == sessiond_fd) {
192 iter->state = state;
193 break;
194 }
195 }
0237248c 196 pthread_mutex_unlock(&kconsumerd_lock_fds);
1ce86c9a
JD
197}
198
199/*
200 * kconsumerd_update_poll_array
201 *
202 * Allocate the pollfd structure and the local view of the out fds
203 * to avoid doing a lookup in the linked list and concurrency issues
204 * when writing is needed.
205 * Returns the number of fds in the structures
206 */
207static int kconsumerd_update_poll_array(struct pollfd **pollfd,
208 struct kconsumerd_fd **local_kconsumerd_fd)
209{
210 struct kconsumerd_fd *iter;
211 int i = 0;
212
213 DBG("Updating poll fd array");
1ce86c9a
JD
214
215 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
216 DBG("Inside for each");
217 if (iter->state == ACTIVE_FD) {
218 DBG("Active FD %d", iter->consumerd_fd);
219 (*pollfd)[i].fd = iter->consumerd_fd;
220 (*pollfd)[i].events = POLLIN | POLLPRI;
221 local_kconsumerd_fd[i] = iter;
222 i++;
223 }
224 }
225
226 /*
227 * insert the kconsumerd_poll_pipe at the end of the array and don't
228 * increment i so nb_fd is the number of real FD
229 */
230 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
231 (*pollfd)[i].events = POLLIN;
232
233 kconsumerd_update_fd_array = 0;
1ce86c9a
JD
234 return i;
235}
236
237
238/*
239 * kconsumerd_on_read_subbuffer_mmap
240 *
241 * mmap the ring buffer, read it and write the data to the tracefile.
242 * Returns the number of bytes written
243 */
244static int kconsumerd_on_read_subbuffer_mmap(
245 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
246{
247 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
248 char *mmap_base;
249 char *padding = NULL;
250 long ret = 0;
251 off_t orig_offset = kconsumerd_fd->out_fd_offset;
252 int fd = kconsumerd_fd->consumerd_fd;
253 int outfd = kconsumerd_fd->out_fd;
254
255 /* get the padded subbuffer size to know the padding required */
256 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
257 if (ret != 0) {
258 ret = errno;
259 perror("kernctl_get_padded_subbuf_size");
260 goto end;
261 }
262 padding_len = padded_len - len;
263 padding = malloc(padding_len * sizeof(char));
264 memset(padding, '\0', padding_len);
265
266 /* get the len of the mmap region */
267 ret = kernctl_get_mmap_len(fd, &mmap_len);
268 if (ret != 0) {
269 ret = errno;
270 perror("kernctl_get_mmap_len");
271 goto end;
272 }
273
274 /* get the offset inside the fd to mmap */
275 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
276 if (ret != 0) {
277 ret = errno;
278 perror("kernctl_get_mmap_read_offset");
279 goto end;
280 }
281
282 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
283 if (mmap_base == MAP_FAILED) {
284 perror("Error mmaping");
285 ret = -1;
286 goto end;
287 }
288
289 while (len > 0) {
290 ret = write(outfd, mmap_base, len);
291 if (ret >= len) {
292 len = 0;
293 } else if (ret < 0) {
294 ret = errno;
295 perror("Error in file write");
296 goto end;
297 }
298 /* This won't block, but will start writeout asynchronously */
299 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
300 SYNC_FILE_RANGE_WRITE);
301 kconsumerd_fd->out_fd_offset += ret;
302 }
303
304 /* once all the data is written, write the padding to disk */
305 ret = write(outfd, padding, padding_len);
306 if (ret < 0) {
307 ret = errno;
308 perror("Error writing padding to file");
309 goto end;
310 }
311
312 /*
313 * This does a blocking write-and-wait on any page that belongs to the
314 * subbuffer prior to the one we just wrote.
315 * Don't care about error values, as these are just hints and ways to
316 * limit the amount of page cache used.
317 */
318 if (orig_offset >= kconsumerd_fd->max_sb_size) {
319 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
320 kconsumerd_fd->max_sb_size,
321 SYNC_FILE_RANGE_WAIT_BEFORE
322 | SYNC_FILE_RANGE_WRITE
323 | SYNC_FILE_RANGE_WAIT_AFTER);
324
325 /*
326 * Give hints to the kernel about how we access the file:
327 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
328 * we write it.
329 *
330 * We need to call fadvise again after the file grows because the
331 * kernel does not seem to apply fadvise to non-existing parts of the
332 * file.
333 *
334 * Call fadvise _after_ having waited for the page writeback to
335 * complete because the dirty page writeback semantic is not well
336 * defined. So it can be expected to lead to lower throughput in
337 * streaming.
338 */
339 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
340 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
341 }
342 goto end;
343
344end:
345 if (padding != NULL) {
346 free(padding);
347 }
348 return ret;
349}
350
351/*
352 * kconsumerd_on_read_subbuffer
353 *
354 * Splice the data from the ring buffer to the tracefile.
355 * Returns the number of bytes spliced
356 */
357static int kconsumerd_on_read_subbuffer(
358 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
359{
360 long ret = 0;
361 loff_t offset = 0;
362 off_t orig_offset = kconsumerd_fd->out_fd_offset;
363 int fd = kconsumerd_fd->consumerd_fd;
364 int outfd = kconsumerd_fd->out_fd;
365
366 while (len > 0) {
367 DBG("splice chan to pipe offset %lu (fd : %d)",
368 (unsigned long)offset, fd);
369 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
370 SPLICE_F_MOVE | SPLICE_F_MORE);
371 DBG("splice chan to pipe ret %ld", ret);
372 if (ret < 0) {
373 ret = errno;
374 perror("Error in relay splice");
375 goto splice_error;
376 }
377
378 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
379 SPLICE_F_MOVE | SPLICE_F_MORE);
380 DBG("splice pipe to file %ld", ret);
381 if (ret < 0) {
382 ret = errno;
383 perror("Error in file splice");
384 goto splice_error;
385 }
386 if (ret >= len) {
387 len = 0;
388 }
389 /* This won't block, but will start writeout asynchronously */
390 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
391 SYNC_FILE_RANGE_WRITE);
392 kconsumerd_fd->out_fd_offset += ret;
393 }
394
395 /*
396 * This does a blocking write-and-wait on any page that belongs to the
397 * subbuffer prior to the one we just wrote.
398 * Don't care about error values, as these are just hints and ways to
399 * limit the amount of page cache used.
400 */
401 if (orig_offset >= kconsumerd_fd->max_sb_size) {
402 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
403 kconsumerd_fd->max_sb_size,
404 SYNC_FILE_RANGE_WAIT_BEFORE
405 | SYNC_FILE_RANGE_WRITE
406 | SYNC_FILE_RANGE_WAIT_AFTER);
407 /*
408 * Give hints to the kernel about how we access the file:
409 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
410 * we write it.
411 *
412 * We need to call fadvise again after the file grows because the
413 * kernel does not seem to apply fadvise to non-existing parts of the
414 * file.
415 *
416 * Call fadvise _after_ having waited for the page writeback to
417 * complete because the dirty page writeback semantic is not well
418 * defined. So it can be expected to lead to lower throughput in
419 * streaming.
420 */
421 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
422 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
423 }
424 goto end;
425
426splice_error:
427 /* send the appropriate error description to sessiond */
428 switch(ret) {
429 case EBADF:
430 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
431 break;
432 case EINVAL:
433 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
434 break;
435 case ENOMEM:
436 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
437 break;
438 case ESPIPE:
439 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
440 break;
441 }
442
443end:
444 return ret;
445}
446
447/*
448 * kconsumerd_read_subbuffer
449 *
450 * Consume data on a file descriptor and write it on a trace file
451 */
452static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
453{
454 unsigned long len;
455 int err;
456 long ret = 0;
457 int infd = kconsumerd_fd->consumerd_fd;
458
459 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
460 /* Get the next subbuffer */
461 err = kernctl_get_next_subbuf(infd);
462 if (err != 0) {
463 ret = errno;
464 perror("Reserving sub buffer failed (everything is normal, "
465 "it is due to concurrency)");
466 goto end;
467 }
468
469 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
470 case LTTNG_KERNEL_SPLICE:
471 /* read the whole subbuffer */
472 err = kernctl_get_padded_subbuf_size(infd, &len);
473 if (err != 0) {
474 ret = errno;
475 perror("Getting sub-buffer len failed.");
476 goto end;
477 }
478
479 /* splice the subbuffer to the tracefile */
480 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
481 if (ret < 0) {
482 /*
483 * display the error but continue processing to try
484 * to release the subbuffer
485 */
486 ERR("Error splicing to tracefile");
487 }
488 break;
489 case LTTNG_KERNEL_MMAP:
490 /* read the used subbuffer size */
491 err = kernctl_get_subbuf_size(infd, &len);
492 if (err != 0) {
493 ret = errno;
494 perror("Getting sub-buffer len failed.");
495 goto end;
496 }
497 /* write the subbuffer to the tracefile */
498 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
499 if (ret < 0) {
500 /*
501 * display the error but continue processing to try
502 * to release the subbuffer
503 */
504 ERR("Error writing to tracefile");
505 }
506 break;
507 default:
508 ERR("Unknown output method");
509 ret = -1;
510 }
511
512 err = kernctl_put_next_subbuf(infd);
513 if (err != 0) {
514 ret = errno;
515 if (errno == EFAULT) {
516 perror("Error in unreserving sub buffer\n");
517 } else if (errno == EIO) {
518 /* Should never happen with newer LTTng versions */
519 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
520 }
521 goto end;
522 }
523
524end:
525 return ret;
526}
527
528/*
529 * kconsumerd_consumerd_recv_fd
530 *
531 * Receives an array of file descriptors and the associated
532 * structures describing each fd (path name).
533 * Returns the size of received data
534 */
535static int kconsumerd_consumerd_recv_fd(int sfd, int size,
536 enum kconsumerd_command cmd_type)
537{
538 struct msghdr msg;
539 struct iovec iov[1];
540 int ret = 0, i, tmp2;
541 struct cmsghdr *cmsg;
542 int nb_fd;
543 char recv_fd[CMSG_SPACE(sizeof(int))];
544 struct lttcomm_kconsumerd_msg lkm;
545
546 /* the number of fds we are about to receive */
547 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
548
549 for (i = 0; i < nb_fd; i++) {
550 memset(&msg, 0, sizeof(msg));
551
552 /* Prepare to receive the structures */
553 iov[0].iov_base = &lkm;
554 iov[0].iov_len = sizeof(lkm);
555 msg.msg_iov = iov;
556 msg.msg_iovlen = 1;
557
558 msg.msg_control = recv_fd;
559 msg.msg_controllen = sizeof(recv_fd);
560
561 DBG("Waiting to receive fd");
562 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
563 perror("recvmsg");
564 continue;
565 }
566
567 if (ret != (size / nb_fd)) {
568 ERR("Received only %d, expected %d", ret, size);
569 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
570 goto end;
571 }
572
573 cmsg = CMSG_FIRSTHDR(&msg);
574 if (!cmsg) {
575 ERR("Invalid control message header");
576 ret = -1;
577 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
578 goto end;
579 }
580 /* if we received fds */
581 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
582 switch (cmd_type) {
583 case ADD_STREAM:
584 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
585 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
586 if (ret < 0) {
587 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
588 goto end;
589 }
590 break;
591 case UPDATE_STREAM:
592 kconsumerd_change_fd_state(lkm.fd, lkm.state);
593 break;
594 default:
595 break;
596 }
597 /* flag to tell the polling thread to update its fd array */
598 kconsumerd_update_fd_array = 1;
599 /* signal the poll thread */
600 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
601 } else {
602 ERR("Didn't received any fd");
603 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
604 ret = -1;
605 goto end;
606 }
607 }
608
609end:
1ce86c9a
JD
610 return ret;
611}
612
613/*
614 * kconsumerd_thread_poll_fds
615 *
616 * This thread polls the fds in the ltt_fd_list to consume the data
617 * and write it to tracefile if necessary.
618 */
619void *kconsumerd_thread_poll_fds(void *data)
620{
621 int num_rdy, num_hup, high_prio, ret, i;
622 struct pollfd *pollfd = NULL;
623 /* local view of the fds */
624 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
625 /* local view of kconsumerd_fds_count */
626 int nb_fd = 0;
627 char tmp;
628 int tmp2;
629
630 ret = pipe(kconsumerd_thread_pipe);
631 if (ret < 0) {
632 perror("Error creating pipe");
633 goto end;
634 }
635
636 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
637
638 while (1) {
639 high_prio = 0;
640 num_hup = 0;
641
642 /*
643 * the ltt_fd_list has been updated, we need to update our
644 * local array as well
645 */
646 if (kconsumerd_update_fd_array == 1) {
647 if (pollfd != NULL) {
648 free(pollfd);
649 pollfd = NULL;
650 }
651 if (local_kconsumerd_fd != NULL) {
652 free(local_kconsumerd_fd);
653 local_kconsumerd_fd = NULL;
654 }
0237248c
DG
655
656 /* Lock mutex for fds count */
657 pthread_mutex_lock(&kconsumerd_lock_fds);
1ce86c9a
JD
658 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
659 pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd));
660 if (pollfd == NULL) {
661 perror("pollfd malloc");
662 goto end;
663 }
0237248c 664
1ce86c9a
JD
665 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
666 local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) *
667 sizeof(struct kconsumerd_fd));
668 if (local_kconsumerd_fd == NULL) {
669 perror("local_kconsumerd_fd malloc");
670 goto end;
671 }
672 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
673 if (ret < 0) {
674 ERR("Error in allocating pollfd or local_outfds");
675 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
676 goto end;
677 }
0237248c
DG
678 /* Unlock mutex for fds count */
679 pthread_mutex_unlock(&kconsumerd_lock_fds);
680
1ce86c9a
JD
681 nb_fd = ret;
682 }
683
684 /* poll on the array of fds */
685 DBG("polling on %d fd", nb_fd + 1);
686 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
687 DBG("poll num_rdy : %d", num_rdy);
688 if (num_rdy == -1) {
689 perror("Poll error");
690 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
691 goto end;
692 } else if (num_rdy == 0) {
693 DBG("Polling thread timed out");
694 goto end;
695 }
696
697 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
698 if (nb_fd == 0 && kconsumerd_quit == 1) {
699 goto end;
700 }
701
702 /*
703 * if only the kconsumerd_poll_pipe triggered poll to return just
704 * return to the beginning of the loop to update the array
705 */
706 if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
707 DBG("kconsumerd_poll_pipe wake up");
708 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
709 continue;
710 }
711
712 /* Take care of high priority channels first. */
713 for (i = 0; i < nb_fd; i++) {
714 switch(pollfd[i].revents) {
715 case POLLERR:
716 ERR("Error returned in polling fd %d.", pollfd[i].fd);
717 kconsumerd_del_fd(local_kconsumerd_fd[i]);
718 kconsumerd_update_fd_array = 1;
719 num_hup++;
720 break;
721 case POLLHUP:
722 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
723 kconsumerd_del_fd(local_kconsumerd_fd[i]);
724 kconsumerd_update_fd_array = 1;
725 num_hup++;
726 break;
727 case POLLNVAL:
728 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
729 kconsumerd_del_fd(local_kconsumerd_fd[i]);
730 kconsumerd_update_fd_array = 1;
731 num_hup++;
732 break;
733 case POLLPRI:
734 DBG("Urgent read on fd %d", pollfd[i].fd);
735 high_prio = 1;
736 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
737 /* it's ok to have an unavailable sub-buffer */
738 if (ret == EAGAIN) {
739 ret = 0;
740 }
741 break;
742 }
743 }
744
745 /* If every buffer FD has hung up, we end the read loop here */
746 if (nb_fd > 0 && num_hup == nb_fd) {
747 DBG("every buffer FD has hung up\n");
748 if (kconsumerd_quit == 1) {
749 goto end;
750 }
751 continue;
752 }
753
754 /* Take care of low priority channels. */
755 if (high_prio == 0) {
756 for (i = 0; i < nb_fd; i++) {
757 if (pollfd[i].revents == POLLIN) {
758 DBG("Normal read on fd %d", pollfd[i].fd);
759 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
760 /* it's ok to have an unavailable subbuffer */
761 if (ret == EAGAIN) {
762 ret = 0;
763 }
764 }
765 }
766 }
767 }
768end:
0237248c 769 pthread_mutex_unlock(&kconsumerd_lock_fds);
1ce86c9a
JD
770 DBG("polling thread exiting");
771 if (pollfd != NULL) {
772 free(pollfd);
773 pollfd = NULL;
774 }
775 if (local_kconsumerd_fd != NULL) {
776 free(local_kconsumerd_fd);
777 local_kconsumerd_fd = NULL;
778 }
779 kconsumerd_cleanup();
780 return NULL;
781}
782
783/*
784 * kconsumerd_create_poll_pipe
785 *
786 * create the pipe to wake to polling thread when needed
787 */
788int kconsumerd_create_poll_pipe()
789{
790 return pipe(kconsumerd_poll_pipe);
791}
792
793/*
794 * kconsumerd_thread_receive_fds
795 *
796 * This thread listens on the consumerd socket and
797 * receives the file descriptors from ltt-sessiond
798 */
799void *kconsumerd_thread_receive_fds(void *data)
800{
801 int sock, client_socket, ret;
802 struct lttcomm_kconsumerd_header tmp;
803
804 DBG("Creating command socket %s", kconsumerd_command_sock_path);
805 unlink(kconsumerd_command_sock_path);
806 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
807 if (client_socket < 0) {
808 ERR("Cannot create command socket");
809 goto end;
810 }
811
812 ret = lttcomm_listen_unix_sock(client_socket);
813 if (ret < 0) {
814 goto end;
815 }
816
817 DBG("Sending ready command to ltt-sessiond");
818 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
819 if (ret < 0) {
820 ERR("Error sending ready command to ltt-sessiond");
821 goto end;
822 }
823
824 /* Blocking call, waiting for transmission */
825 sock = lttcomm_accept_unix_sock(client_socket);
826 if (sock <= 0) {
827 WARN("On accept");
828 goto end;
829 }
830 while (1) {
831 /* We first get the number of fd we are about to receive */
832 ret = lttcomm_recv_unix_sock(sock, &tmp,
833 sizeof(struct lttcomm_kconsumerd_header));
834 if (ret <= 0) {
835 ERR("Communication interrupted on command socket");
836 goto end;
837 }
838 if (tmp.cmd_type == STOP) {
839 DBG("Received STOP command");
840 goto end;
841 }
842 /* we received a command to add or update fds */
843 ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
844 if (ret <= 0) {
845 ERR("Receiving the FD, exiting");
846 goto end;
847 }
848 }
849
850end:
851 DBG("kconsumerd_thread_receive_fds exiting");
852
853 /*
854 * when all fds have hung up, the polling thread
855 * can exit cleanly
856 */
857 kconsumerd_quit = 1;
858
859 /*
860 * 2s of grace period, if no polling events occur during
861 * this period, the polling thread will exit even if there
862 * are still open FDs (should not happen, but safety mechanism).
863 */
864 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
865
866 /* wake up the polling thread */
867 ret = write(kconsumerd_poll_pipe[1], "4", 1);
868 if (ret < 0) {
869 perror("poll pipe write");
870 }
871 return NULL;
872}
873
874/*
875 * kconsumerd_cleanup
876 *
877 * Cleanup the daemon's socket on exit
878 */
879void kconsumerd_cleanup()
880{
881 struct kconsumerd_fd *iter;
882
883 /* remove the socket file */
884 unlink(kconsumerd_command_sock_path);
885
886 /* close all outfd */
887 cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
888 kconsumerd_del_fd(iter);
889 }
890}
891
892/*
893 * kconsumerd_send_error
894 *
895 * send return code to ltt-sessiond
896 */
897int kconsumerd_send_error(enum lttcomm_return_code cmd)
898{
899 if (kconsumerd_error_socket > 0) {
900 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
901 sizeof(enum lttcomm_sessiond_command));
902 }
903
904 return 0;
905}
This page took 0.055153 seconds and 4 git commands to generate.