Cleanup header
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
CommitLineData
1ce86c9a
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
82a3637f
DG
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
1ce86c9a
JD
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <urcu/list.h>
159c7ff4 31#include <assert.h>
1ce86c9a
JD
32
33#include "libkernelctl.h"
34#include "liblttkconsumerd.h"
35#include "lttngerr.h"
36
242cd187
MD
37static
38struct kconsumerd_global_data {
39 /*
40 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
41 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
42 * ensures the count matches the number of items in the fd_list.
43 * It ensures the list updates *always* trigger an fd_array
44 * update (therefore need to make list update vs
45 * kconsumerd_data.need_update flag update atomic, and also flag
46 * read, fd array and flag clear atomic).
47 */
48 pthread_mutex_t lock;
49 /*
50 * Number of element for the list below. Protected by
51 * kconsumerd_data.lock.
52 */
53 unsigned int fds_count;
54 /*
55 * List of FDs. Protected by kconsumerd_data.lock.
56 */
57 struct kconsumerd_fd_list fd_list;
58 /*
59 * Flag specifying if the local array of FDs needs update in the
60 * poll function. Protected by kconsumerd_data.lock.
61 */
62 unsigned int need_update;
63} kconsumerd_data = {
64 .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
1ce86c9a
JD
65};
66
1ce86c9a
JD
67/* communication with splice */
68static int kconsumerd_thread_pipe[2];
69
70/* pipe to wake the poll thread when necessary */
71static int kconsumerd_poll_pipe[2];
72
4de84ad9
JD
73/* to let the signal handler wake up the fd receiver thread */
74static int kconsumerd_should_quit[2];
fec07047 75
1ce86c9a
JD
76/* timeout parameter, to control the polling thread grace period */
77static int kconsumerd_poll_timeout = -1;
78
79/* socket to communicate errors with sessiond */
80static int kconsumerd_error_socket;
81
82/* socket to exchange commands with sessiond */
83static char *kconsumerd_command_sock_path;
84
3dcd2721
MD
85/*
86 * flag to inform the polling thread to quit when all fd hung up.
87 * Updated by the kconsumerd_thread_receive_fds when it notices that all
88 * fds has hung up. Also updated by the signal handler
89 * (kconsumerd_should_exit()). Read by the polling threads.
90 */
91static volatile int kconsumerd_quit = 0;
1ce86c9a
JD
92
93/*
94 * kconsumerd_set_error_socket
95 *
96 * Set the error socket
97 */
98void kconsumerd_set_error_socket(int sock)
99{
100 kconsumerd_error_socket = sock;
101}
102
103/*
104 * kconsumerd_set_command_socket_path
105 *
106 * Set the command socket path
107 */
108void kconsumerd_set_command_socket_path(char *sock)
109{
110 kconsumerd_command_sock_path = sock;
111}
112
38079a1b
DG
113/*
114 * kconsumerd_find_session_fd
115 *
116 * Find a session fd in the global list.
fec07047 117 * The kconsumerd_data.lock must be locked during this call
38079a1b
DG
118 *
119 * Return 1 if found else 0
120 */
121static int kconsumerd_find_session_fd(int fd)
122{
123 struct kconsumerd_fd *iter;
124
242cd187 125 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
38079a1b
DG
126 if (iter->sessiond_fd == fd) {
127 DBG("Duplicate session fd %d", fd);
38079a1b
DG
128 return 1;
129 }
130 }
38079a1b
DG
131
132 return 0;
133}
134
1ce86c9a
JD
135/*
136 * kconsumerd_del_fd
137 *
138 * Remove a fd from the global list protected by a mutex
139 */
140static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
141{
242cd187 142 pthread_mutex_lock(&kconsumerd_data.lock);
1ce86c9a 143 cds_list_del(&lcf->list);
242cd187
MD
144 if (kconsumerd_data.fds_count > 0) {
145 kconsumerd_data.fds_count--;
1ce86c9a
JD
146 if (lcf != NULL) {
147 close(lcf->out_fd);
148 close(lcf->consumerd_fd);
149 free(lcf);
150 lcf = NULL;
151 }
152 }
242cd187
MD
153 kconsumerd_data.need_update = 1;
154 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
155}
156
157/*
158 * kconsumerd_add_fd
159 *
160 * Add a fd to the global list protected by a mutex
161 */
162static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
163{
1ce86c9a 164 int ret;
38079a1b
DG
165 struct kconsumerd_fd *tmp_fd;
166
242cd187 167 pthread_mutex_lock(&kconsumerd_data.lock);
38079a1b
DG
168 /* Check if already exist */
169 ret = kconsumerd_find_session_fd(buf->fd);
170 if (ret == 1) {
171 goto end;
172 }
1ce86c9a
JD
173
174 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
175 tmp_fd->sessiond_fd = buf->fd;
176 tmp_fd->consumerd_fd = consumerd_fd;
177 tmp_fd->state = buf->state;
178 tmp_fd->max_sb_size = buf->max_sb_size;
179 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
180
181 /* Opening the tracefile in write mode */
182 ret = open(tmp_fd->path_name,
183 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
184 if (ret < 0) {
185 ERR("Opening %s", tmp_fd->path_name);
186 perror("open");
187 goto end;
188 }
189 tmp_fd->out_fd = ret;
190 tmp_fd->out_fd_offset = 0;
191
192 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
193 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
194
242cd187
MD
195 cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
196 kconsumerd_data.fds_count++;
197 kconsumerd_data.need_update = 1;
1ce86c9a 198end:
242cd187 199 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
200 return ret;
201}
202
203/*
204 * kconsumerd_change_fd_state
205 *
206 * Update a fd according to what we just received
207 */
208static void kconsumerd_change_fd_state(int sessiond_fd,
209 enum kconsumerd_fd_state state)
210{
211 struct kconsumerd_fd *iter;
0237248c 212
242cd187
MD
213 pthread_mutex_lock(&kconsumerd_data.lock);
214 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
215 if (iter->sessiond_fd == sessiond_fd) {
216 iter->state = state;
217 break;
218 }
219 }
242cd187
MD
220 kconsumerd_data.need_update = 1;
221 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
222}
223
224/*
225 * kconsumerd_update_poll_array
226 *
227 * Allocate the pollfd structure and the local view of the out fds
228 * to avoid doing a lookup in the linked list and concurrency issues
229 * when writing is needed.
230 * Returns the number of fds in the structures
242cd187 231 * Called with kconsumerd_data.lock held.
1ce86c9a
JD
232 */
233static int kconsumerd_update_poll_array(struct pollfd **pollfd,
234 struct kconsumerd_fd **local_kconsumerd_fd)
235{
236 struct kconsumerd_fd *iter;
237 int i = 0;
238
239 DBG("Updating poll fd array");
1ce86c9a 240
242cd187 241 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
242 DBG("Inside for each");
243 if (iter->state == ACTIVE_FD) {
244 DBG("Active FD %d", iter->consumerd_fd);
245 (*pollfd)[i].fd = iter->consumerd_fd;
246 (*pollfd)[i].events = POLLIN | POLLPRI;
247 local_kconsumerd_fd[i] = iter;
248 i++;
249 }
250 }
251
252 /*
253 * insert the kconsumerd_poll_pipe at the end of the array and don't
254 * increment i so nb_fd is the number of real FD
255 */
256 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
257 (*pollfd)[i].events = POLLIN;
1ce86c9a
JD
258 return i;
259}
260
261
262/*
263 * kconsumerd_on_read_subbuffer_mmap
264 *
265 * mmap the ring buffer, read it and write the data to the tracefile.
266 * Returns the number of bytes written
267 */
268static int kconsumerd_on_read_subbuffer_mmap(
269 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
270{
271 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
272 char *mmap_base;
273 char *padding = NULL;
274 long ret = 0;
275 off_t orig_offset = kconsumerd_fd->out_fd_offset;
276 int fd = kconsumerd_fd->consumerd_fd;
277 int outfd = kconsumerd_fd->out_fd;
278
279 /* get the padded subbuffer size to know the padding required */
280 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
281 if (ret != 0) {
282 ret = errno;
283 perror("kernctl_get_padded_subbuf_size");
284 goto end;
285 }
286 padding_len = padded_len - len;
287 padding = malloc(padding_len * sizeof(char));
288 memset(padding, '\0', padding_len);
289
290 /* get the len of the mmap region */
291 ret = kernctl_get_mmap_len(fd, &mmap_len);
292 if (ret != 0) {
293 ret = errno;
294 perror("kernctl_get_mmap_len");
295 goto end;
296 }
297
298 /* get the offset inside the fd to mmap */
299 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
300 if (ret != 0) {
301 ret = errno;
302 perror("kernctl_get_mmap_read_offset");
303 goto end;
304 }
305
306 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
307 if (mmap_base == MAP_FAILED) {
308 perror("Error mmaping");
309 ret = -1;
310 goto end;
311 }
312
313 while (len > 0) {
314 ret = write(outfd, mmap_base, len);
315 if (ret >= len) {
316 len = 0;
317 } else if (ret < 0) {
318 ret = errno;
319 perror("Error in file write");
320 goto end;
321 }
322 /* This won't block, but will start writeout asynchronously */
323 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
324 SYNC_FILE_RANGE_WRITE);
325 kconsumerd_fd->out_fd_offset += ret;
326 }
327
328 /* once all the data is written, write the padding to disk */
329 ret = write(outfd, padding, padding_len);
330 if (ret < 0) {
331 ret = errno;
332 perror("Error writing padding to file");
333 goto end;
334 }
335
336 /*
337 * This does a blocking write-and-wait on any page that belongs to the
338 * subbuffer prior to the one we just wrote.
339 * Don't care about error values, as these are just hints and ways to
340 * limit the amount of page cache used.
341 */
342 if (orig_offset >= kconsumerd_fd->max_sb_size) {
343 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
344 kconsumerd_fd->max_sb_size,
345 SYNC_FILE_RANGE_WAIT_BEFORE
346 | SYNC_FILE_RANGE_WRITE
347 | SYNC_FILE_RANGE_WAIT_AFTER);
348
349 /*
350 * Give hints to the kernel about how we access the file:
351 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
352 * we write it.
353 *
354 * We need to call fadvise again after the file grows because the
355 * kernel does not seem to apply fadvise to non-existing parts of the
356 * file.
357 *
358 * Call fadvise _after_ having waited for the page writeback to
359 * complete because the dirty page writeback semantic is not well
360 * defined. So it can be expected to lead to lower throughput in
361 * streaming.
362 */
363 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
364 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
365 }
366 goto end;
367
368end:
369 if (padding != NULL) {
370 free(padding);
371 }
372 return ret;
373}
374
375/*
376 * kconsumerd_on_read_subbuffer
377 *
378 * Splice the data from the ring buffer to the tracefile.
379 * Returns the number of bytes spliced
380 */
381static int kconsumerd_on_read_subbuffer(
382 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
383{
384 long ret = 0;
385 loff_t offset = 0;
386 off_t orig_offset = kconsumerd_fd->out_fd_offset;
387 int fd = kconsumerd_fd->consumerd_fd;
388 int outfd = kconsumerd_fd->out_fd;
389
390 while (len > 0) {
391 DBG("splice chan to pipe offset %lu (fd : %d)",
392 (unsigned long)offset, fd);
393 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
394 SPLICE_F_MOVE | SPLICE_F_MORE);
395 DBG("splice chan to pipe ret %ld", ret);
396 if (ret < 0) {
397 ret = errno;
398 perror("Error in relay splice");
399 goto splice_error;
400 }
401
402 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
403 SPLICE_F_MOVE | SPLICE_F_MORE);
404 DBG("splice pipe to file %ld", ret);
405 if (ret < 0) {
406 ret = errno;
407 perror("Error in file splice");
408 goto splice_error;
409 }
410 if (ret >= len) {
411 len = 0;
412 }
413 /* This won't block, but will start writeout asynchronously */
414 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
415 SYNC_FILE_RANGE_WRITE);
416 kconsumerd_fd->out_fd_offset += ret;
417 }
418
419 /*
420 * This does a blocking write-and-wait on any page that belongs to the
421 * subbuffer prior to the one we just wrote.
422 * Don't care about error values, as these are just hints and ways to
423 * limit the amount of page cache used.
424 */
425 if (orig_offset >= kconsumerd_fd->max_sb_size) {
426 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
427 kconsumerd_fd->max_sb_size,
428 SYNC_FILE_RANGE_WAIT_BEFORE
429 | SYNC_FILE_RANGE_WRITE
430 | SYNC_FILE_RANGE_WAIT_AFTER);
431 /*
432 * Give hints to the kernel about how we access the file:
433 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
434 * we write it.
435 *
436 * We need to call fadvise again after the file grows because the
437 * kernel does not seem to apply fadvise to non-existing parts of the
438 * file.
439 *
440 * Call fadvise _after_ having waited for the page writeback to
441 * complete because the dirty page writeback semantic is not well
442 * defined. So it can be expected to lead to lower throughput in
443 * streaming.
444 */
445 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
446 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
447 }
448 goto end;
449
450splice_error:
451 /* send the appropriate error description to sessiond */
452 switch(ret) {
453 case EBADF:
454 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
455 break;
456 case EINVAL:
457 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
458 break;
459 case ENOMEM:
460 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
461 break;
462 case ESPIPE:
463 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
464 break;
465 }
466
467end:
468 return ret;
469}
470
471/*
472 * kconsumerd_read_subbuffer
473 *
474 * Consume data on a file descriptor and write it on a trace file
475 */
476static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
477{
478 unsigned long len;
479 int err;
480 long ret = 0;
481 int infd = kconsumerd_fd->consumerd_fd;
482
483 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
484 /* Get the next subbuffer */
485 err = kernctl_get_next_subbuf(infd);
486 if (err != 0) {
487 ret = errno;
488 perror("Reserving sub buffer failed (everything is normal, "
489 "it is due to concurrency)");
490 goto end;
491 }
492
493 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
7d29a247 494 case LTTNG_EVENT_SPLICE:
1ce86c9a
JD
495 /* read the whole subbuffer */
496 err = kernctl_get_padded_subbuf_size(infd, &len);
497 if (err != 0) {
498 ret = errno;
499 perror("Getting sub-buffer len failed.");
500 goto end;
501 }
502
503 /* splice the subbuffer to the tracefile */
504 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
505 if (ret < 0) {
506 /*
507 * display the error but continue processing to try
508 * to release the subbuffer
509 */
510 ERR("Error splicing to tracefile");
511 }
512 break;
7d29a247 513 case LTTNG_EVENT_MMAP:
1ce86c9a
JD
514 /* read the used subbuffer size */
515 err = kernctl_get_subbuf_size(infd, &len);
516 if (err != 0) {
517 ret = errno;
518 perror("Getting sub-buffer len failed.");
519 goto end;
520 }
521 /* write the subbuffer to the tracefile */
522 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
523 if (ret < 0) {
524 /*
525 * display the error but continue processing to try
526 * to release the subbuffer
527 */
528 ERR("Error writing to tracefile");
529 }
530 break;
531 default:
532 ERR("Unknown output method");
533 ret = -1;
534 }
535
536 err = kernctl_put_next_subbuf(infd);
537 if (err != 0) {
538 ret = errno;
539 if (errno == EFAULT) {
540 perror("Error in unreserving sub buffer\n");
541 } else if (errno == EIO) {
542 /* Should never happen with newer LTTng versions */
543 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
544 }
545 goto end;
546 }
547
548end:
549 return ret;
550}
551
4de84ad9
JD
552/*
553 * kconsumerd_poll_socket
554 *
555 * Poll on the should_quit pipe and the command socket
556 * return -1 on error and should exit, 0 if data is
557 * available on the command socket
558 */
559int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
560{
561 int num_rdy;
562
563 num_rdy = poll(kconsumerd_sockpoll, 2, -1);
564 if (num_rdy == -1) {
565 perror("Poll error");
566 goto exit;
567 }
568 if (kconsumerd_sockpoll[0].revents == POLLIN) {
569 DBG("kconsumerd_should_quit wake up");
570 goto exit;
571 }
572 return 0;
573
574exit:
575 return -1;
576}
577
1ce86c9a
JD
578/*
579 * kconsumerd_consumerd_recv_fd
580 *
581 * Receives an array of file descriptors and the associated
582 * structures describing each fd (path name).
583 * Returns the size of received data
584 */
4de84ad9
JD
585static int kconsumerd_consumerd_recv_fd(int sfd,
586 struct pollfd *kconsumerd_sockpoll, int size,
1ce86c9a
JD
587 enum kconsumerd_command cmd_type)
588{
1ce86c9a
JD
589 struct iovec iov[1];
590 int ret = 0, i, tmp2;
591 struct cmsghdr *cmsg;
592 int nb_fd;
593 char recv_fd[CMSG_SPACE(sizeof(int))];
594 struct lttcomm_kconsumerd_msg lkm;
595
596 /* the number of fds we are about to receive */
597 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
598
159c7ff4
MD
599 /*
600 * Note: only supporting receiving one FD at a time for now.
601 * This code needs fixing if we wish to receive more (a single
602 * receive for the whole fd batch rather than one per fd).
603 */
604 assert(nb_fd == 1);
605
1ce86c9a 606 for (i = 0; i < nb_fd; i++) {
159c7ff4 607 struct msghdr msg = { 0 };
1ce86c9a
JD
608
609 /* Prepare to receive the structures */
610 iov[0].iov_base = &lkm;
611 iov[0].iov_len = sizeof(lkm);
612 msg.msg_iov = iov;
613 msg.msg_iovlen = 1;
614
615 msg.msg_control = recv_fd;
616 msg.msg_controllen = sizeof(recv_fd);
617
618 DBG("Waiting to receive fd");
4de84ad9
JD
619 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
620 goto end;
621 }
622
1ce86c9a
JD
623 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
624 perror("recvmsg");
625 continue;
626 }
627
628 if (ret != (size / nb_fd)) {
629 ERR("Received only %d, expected %d", ret, size);
630 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
631 goto end;
632 }
633
634 cmsg = CMSG_FIRSTHDR(&msg);
635 if (!cmsg) {
636 ERR("Invalid control message header");
637 ret = -1;
638 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
639 goto end;
640 }
159c7ff4 641
1ce86c9a
JD
642 /* if we received fds */
643 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
644 switch (cmd_type) {
645 case ADD_STREAM:
159c7ff4
MD
646 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
647 ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
1ce86c9a
JD
648 if (ret < 0) {
649 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
650 goto end;
651 }
652 break;
653 case UPDATE_STREAM:
654 kconsumerd_change_fd_state(lkm.fd, lkm.state);
655 break;
656 default:
657 break;
658 }
1ce86c9a
JD
659 /* signal the poll thread */
660 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
f40799e8
DG
661 if (tmp2 < 0) {
662 perror("write kconsumerd poll");
663 }
1ce86c9a
JD
664 } else {
665 ERR("Didn't received any fd");
666 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
667 ret = -1;
668 goto end;
669 }
670 }
671
672end:
1ce86c9a
JD
673 return ret;
674}
675
676/*
677 * kconsumerd_thread_poll_fds
678 *
679 * This thread polls the fds in the ltt_fd_list to consume the data
680 * and write it to tracefile if necessary.
681 */
682void *kconsumerd_thread_poll_fds(void *data)
683{
684 int num_rdy, num_hup, high_prio, ret, i;
685 struct pollfd *pollfd = NULL;
686 /* local view of the fds */
687 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
242cd187 688 /* local view of kconsumerd_data.fds_count */
1ce86c9a
JD
689 int nb_fd = 0;
690 char tmp;
691 int tmp2;
692
693 ret = pipe(kconsumerd_thread_pipe);
694 if (ret < 0) {
695 perror("Error creating pipe");
696 goto end;
697 }
698
699 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
700
701 while (1) {
702 high_prio = 0;
703 num_hup = 0;
704
705 /*
706 * the ltt_fd_list has been updated, we need to update our
707 * local array as well
708 */
242cd187
MD
709 pthread_mutex_lock(&kconsumerd_data.lock);
710 if (kconsumerd_data.need_update) {
1ce86c9a
JD
711 if (pollfd != NULL) {
712 free(pollfd);
713 pollfd = NULL;
714 }
715 if (local_kconsumerd_fd != NULL) {
716 free(local_kconsumerd_fd);
717 local_kconsumerd_fd = NULL;
718 }
0237248c 719
1ce86c9a 720 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 721 pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
1ce86c9a
JD
722 if (pollfd == NULL) {
723 perror("pollfd malloc");
242cd187 724 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
725 goto end;
726 }
0237248c 727
1ce86c9a 728 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 729 local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
1ce86c9a
JD
730 sizeof(struct kconsumerd_fd));
731 if (local_kconsumerd_fd == NULL) {
732 perror("local_kconsumerd_fd malloc");
242cd187 733 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
734 goto end;
735 }
736 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
737 if (ret < 0) {
738 ERR("Error in allocating pollfd or local_outfds");
739 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
242cd187 740 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
741 goto end;
742 }
743 nb_fd = ret;
242cd187 744 kconsumerd_data.need_update = 0;
1ce86c9a 745 }
242cd187 746 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
747
748 /* poll on the array of fds */
749 DBG("polling on %d fd", nb_fd + 1);
750 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
751 DBG("poll num_rdy : %d", num_rdy);
752 if (num_rdy == -1) {
753 perror("Poll error");
754 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
755 goto end;
756 } else if (num_rdy == 0) {
757 DBG("Polling thread timed out");
758 goto end;
759 }
760
761 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
762 if (nb_fd == 0 && kconsumerd_quit == 1) {
763 goto end;
764 }
765
766 /*
242cd187
MD
767 * If the kconsumerd_poll_pipe triggered poll go
768 * directly to the beginning of the loop to update the
769 * array. We want to prioritize array update over
770 * low-priority reads.
1ce86c9a 771 */
242cd187 772 if (pollfd[nb_fd].revents == POLLIN) {
1ce86c9a
JD
773 DBG("kconsumerd_poll_pipe wake up");
774 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
f40799e8
DG
775 if (tmp2 < 0) {
776 perror("read kconsumerd poll");
777 }
1ce86c9a
JD
778 continue;
779 }
780
781 /* Take care of high priority channels first. */
782 for (i = 0; i < nb_fd; i++) {
783 switch(pollfd[i].revents) {
784 case POLLERR:
785 ERR("Error returned in polling fd %d.", pollfd[i].fd);
786 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
787 num_hup++;
788 break;
789 case POLLHUP:
790 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
791 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
792 num_hup++;
793 break;
794 case POLLNVAL:
795 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
796 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
797 num_hup++;
798 break;
799 case POLLPRI:
800 DBG("Urgent read on fd %d", pollfd[i].fd);
801 high_prio = 1;
802 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
803 /* it's ok to have an unavailable sub-buffer */
804 if (ret == EAGAIN) {
805 ret = 0;
806 }
807 break;
808 }
809 }
810
811 /* If every buffer FD has hung up, we end the read loop here */
812 if (nb_fd > 0 && num_hup == nb_fd) {
813 DBG("every buffer FD has hung up\n");
814 if (kconsumerd_quit == 1) {
815 goto end;
816 }
817 continue;
818 }
819
820 /* Take care of low priority channels. */
821 if (high_prio == 0) {
822 for (i = 0; i < nb_fd; i++) {
823 if (pollfd[i].revents == POLLIN) {
824 DBG("Normal read on fd %d", pollfd[i].fd);
825 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
826 /* it's ok to have an unavailable subbuffer */
827 if (ret == EAGAIN) {
828 ret = 0;
829 }
830 }
831 }
832 }
833 }
834end:
835 DBG("polling thread exiting");
836 if (pollfd != NULL) {
837 free(pollfd);
838 pollfd = NULL;
839 }
840 if (local_kconsumerd_fd != NULL) {
841 free(local_kconsumerd_fd);
842 local_kconsumerd_fd = NULL;
843 }
1ce86c9a
JD
844 return NULL;
845}
846
847/*
4de84ad9 848 * kconsumerd_init(void)
1ce86c9a 849 *
4de84ad9
JD
850 * initialise the necessary environnement :
851 * - inform the polling thread to update the polling array
852 * - create the poll_pipe
853 * - create the should_quit pipe (for signal handler)
1ce86c9a 854 */
4de84ad9 855int kconsumerd_init(void)
1ce86c9a 856{
4de84ad9
JD
857 int ret;
858
859 /* need to update the polling array at init time */
860 kconsumerd_data.need_update = 1;
861
862 ret = pipe(kconsumerd_poll_pipe);
863 if (ret < 0) {
864 perror("Error creating poll pipe");
865 goto end;
866 }
867
868 ret = pipe(kconsumerd_should_quit);
869 if (ret < 0) {
870 perror("Error creating recv pipe");
871 goto end;
872 }
873
874end:
875 return ret;
1ce86c9a
JD
876}
877
878/*
879 * kconsumerd_thread_receive_fds
880 *
881 * This thread listens on the consumerd socket and
882 * receives the file descriptors from ltt-sessiond
883 */
884void *kconsumerd_thread_receive_fds(void *data)
885{
886 int sock, client_socket, ret;
887 struct lttcomm_kconsumerd_header tmp;
4de84ad9
JD
888 /*
889 * structure to poll for incoming data on communication socket
890 * avoids making blocking sockets
891 */
892 struct pollfd kconsumerd_sockpoll[2];
893
1ce86c9a
JD
894
895 DBG("Creating command socket %s", kconsumerd_command_sock_path);
896 unlink(kconsumerd_command_sock_path);
897 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
898 if (client_socket < 0) {
899 ERR("Cannot create command socket");
900 goto end;
901 }
902
903 ret = lttcomm_listen_unix_sock(client_socket);
904 if (ret < 0) {
905 goto end;
906 }
907
908 DBG("Sending ready command to ltt-sessiond");
909 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
910 if (ret < 0) {
911 ERR("Error sending ready command to ltt-sessiond");
912 goto end;
913 }
914
4de84ad9
JD
915 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
916 if (ret < 0) {
917 perror("fcntl O_NONBLOCK");
918 goto end;
919 }
920
921 /* prepare the FDs to poll : to client socket and the should_quit pipe */
922 kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
923 kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
924 kconsumerd_sockpoll[1].fd = client_socket;
925 kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
926
927 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
928 goto end;
929 }
930 DBG("Connection on client_socket");
931
1ce86c9a
JD
932 /* Blocking call, waiting for transmission */
933 sock = lttcomm_accept_unix_sock(client_socket);
934 if (sock <= 0) {
935 WARN("On accept");
936 goto end;
937 }
4de84ad9
JD
938 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
939 if (ret < 0) {
940 perror("fcntl O_NONBLOCK");
941 goto end;
942 }
943
944 /* update the polling structure to poll on the established socket */
945 kconsumerd_sockpoll[1].fd = sock;
946 kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
947
1ce86c9a 948 while (1) {
4de84ad9
JD
949 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
950 goto end;
951 }
952 DBG("Incoming fds on sock");
953
1ce86c9a
JD
954 /* We first get the number of fd we are about to receive */
955 ret = lttcomm_recv_unix_sock(sock, &tmp,
956 sizeof(struct lttcomm_kconsumerd_header));
957 if (ret <= 0) {
958 ERR("Communication interrupted on command socket");
959 goto end;
960 }
961 if (tmp.cmd_type == STOP) {
962 DBG("Received STOP command");
963 goto end;
964 }
3dcd2721
MD
965 if (kconsumerd_quit) {
966 DBG("kconsumerd_thread_receive_fds received quit from signal");
967 goto end;
968 }
4de84ad9 969
1ce86c9a 970 /* we received a command to add or update fds */
4de84ad9
JD
971 ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
972 tmp.payload_size, tmp.cmd_type);
1ce86c9a
JD
973 if (ret <= 0) {
974 ERR("Receiving the FD, exiting");
975 goto end;
976 }
4de84ad9 977 DBG("received fds on sock");
1ce86c9a
JD
978 }
979
980end:
981 DBG("kconsumerd_thread_receive_fds exiting");
982
983 /*
984 * when all fds have hung up, the polling thread
985 * can exit cleanly
986 */
987 kconsumerd_quit = 1;
988
989 /*
990 * 2s of grace period, if no polling events occur during
991 * this period, the polling thread will exit even if there
992 * are still open FDs (should not happen, but safety mechanism).
993 */
994 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
995
996 /* wake up the polling thread */
997 ret = write(kconsumerd_poll_pipe[1], "4", 1);
998 if (ret < 0) {
999 perror("poll pipe write");
1000 }
1001 return NULL;
1002}
1003
1004/*
1005 * kconsumerd_cleanup
1006 *
1007 * Cleanup the daemon's socket on exit
1008 */
3dcd2721 1009void kconsumerd_cleanup(void)
1ce86c9a
JD
1010{
1011 struct kconsumerd_fd *iter;
1012
1013 /* remove the socket file */
1014 unlink(kconsumerd_command_sock_path);
1015
3dcd2721
MD
1016 /*
1017 * close all outfd. Called when there are no more threads
1018 * running (after joining on the threads), no need to protect
1019 * list iteration with mutex.
1020 */
242cd187 1021 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
1022 kconsumerd_del_fd(iter);
1023 }
1024}
1025
3dcd2721 1026/*
4de84ad9
JD
1027 * kconsumerd_should_exit
1028 *
3dcd2721
MD
1029 * Called from signal handler.
1030 */
1031void kconsumerd_should_exit(void)
1032{
4de84ad9 1033 int ret;
3dcd2721 1034 kconsumerd_quit = 1;
4de84ad9 1035 ret = write(kconsumerd_should_quit[1], "4", 1);
f40799e8
DG
1036 if (ret < 0) {
1037 perror("write kconsumerd quit");
1038 }
3dcd2721
MD
1039}
1040
1ce86c9a
JD
1041/*
1042 * kconsumerd_send_error
1043 *
1044 * send return code to ltt-sessiond
1045 */
1046int kconsumerd_send_error(enum lttcomm_return_code cmd)
1047{
1048 if (kconsumerd_error_socket > 0) {
1049 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
1050 sizeof(enum lttcomm_sessiond_command));
1051 }
1052
1053 return 0;
1054}
This page took 0.064618 seconds and 4 git commands to generate.