Rename liblttsessiondcomm to liblttng-sessiond-comm, install it.
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
CommitLineData
1ce86c9a
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
82a3637f
DG
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
1ce86c9a
JD
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <urcu/list.h>
159c7ff4 31#include <assert.h>
1ce86c9a
JD
32
33#include "libkernelctl.h"
34#include "liblttkconsumerd.h"
35#include "lttngerr.h"
36
242cd187
MD
37static
38struct kconsumerd_global_data {
39 /*
40 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
41 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
42 * ensures the count matches the number of items in the fd_list.
43 * It ensures the list updates *always* trigger an fd_array
44 * update (therefore need to make list update vs
45 * kconsumerd_data.need_update flag update atomic, and also flag
46 * read, fd array and flag clear atomic).
47 */
48 pthread_mutex_t lock;
49 /*
50 * Number of element for the list below. Protected by
51 * kconsumerd_data.lock.
52 */
53 unsigned int fds_count;
54 /*
55 * List of FDs. Protected by kconsumerd_data.lock.
56 */
57 struct kconsumerd_fd_list fd_list;
58 /*
59 * Flag specifying if the local array of FDs needs update in the
60 * poll function. Protected by kconsumerd_data.lock.
61 */
62 unsigned int need_update;
63} kconsumerd_data = {
64 .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
1ce86c9a
JD
65};
66
1ce86c9a
JD
67/* communication with splice */
68static int kconsumerd_thread_pipe[2];
69
70/* pipe to wake the poll thread when necessary */
71static int kconsumerd_poll_pipe[2];
72
4de84ad9
JD
73/* to let the signal handler wake up the fd receiver thread */
74static int kconsumerd_should_quit[2];
fec07047 75
1ce86c9a
JD
76/* timeout parameter, to control the polling thread grace period */
77static int kconsumerd_poll_timeout = -1;
78
79/* socket to communicate errors with sessiond */
80static int kconsumerd_error_socket;
81
82/* socket to exchange commands with sessiond */
83static char *kconsumerd_command_sock_path;
84
3dcd2721
MD
85/*
86 * flag to inform the polling thread to quit when all fd hung up.
87 * Updated by the kconsumerd_thread_receive_fds when it notices that all
88 * fds has hung up. Also updated by the signal handler
89 * (kconsumerd_should_exit()). Read by the polling threads.
90 */
91static volatile int kconsumerd_quit = 0;
1ce86c9a
JD
92
93/*
94 * kconsumerd_set_error_socket
95 *
96 * Set the error socket
97 */
98void kconsumerd_set_error_socket(int sock)
99{
100 kconsumerd_error_socket = sock;
101}
102
103/*
104 * kconsumerd_set_command_socket_path
105 *
106 * Set the command socket path
107 */
108void kconsumerd_set_command_socket_path(char *sock)
109{
110 kconsumerd_command_sock_path = sock;
111}
112
38079a1b
DG
113/*
114 * kconsumerd_find_session_fd
115 *
116 * Find a session fd in the global list.
fec07047 117 * The kconsumerd_data.lock must be locked during this call
38079a1b
DG
118 *
119 * Return 1 if found else 0
120 */
121static int kconsumerd_find_session_fd(int fd)
122{
123 struct kconsumerd_fd *iter;
124
242cd187 125 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
38079a1b
DG
126 if (iter->sessiond_fd == fd) {
127 DBG("Duplicate session fd %d", fd);
38079a1b
DG
128 return 1;
129 }
130 }
38079a1b
DG
131
132 return 0;
133}
134
1ce86c9a
JD
135/*
136 * kconsumerd_del_fd
137 *
138 * Remove a fd from the global list protected by a mutex
139 */
140static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
141{
242cd187 142 pthread_mutex_lock(&kconsumerd_data.lock);
1ce86c9a 143 cds_list_del(&lcf->list);
242cd187
MD
144 if (kconsumerd_data.fds_count > 0) {
145 kconsumerd_data.fds_count--;
1ce86c9a
JD
146 if (lcf != NULL) {
147 close(lcf->out_fd);
148 close(lcf->consumerd_fd);
149 free(lcf);
150 lcf = NULL;
151 }
152 }
242cd187
MD
153 kconsumerd_data.need_update = 1;
154 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
155}
156
157/*
158 * kconsumerd_add_fd
159 *
160 * Add a fd to the global list protected by a mutex
161 */
162static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
163{
1ce86c9a 164 int ret;
38079a1b
DG
165 struct kconsumerd_fd *tmp_fd;
166
242cd187 167 pthread_mutex_lock(&kconsumerd_data.lock);
38079a1b
DG
168 /* Check if already exist */
169 ret = kconsumerd_find_session_fd(buf->fd);
170 if (ret == 1) {
171 goto end;
172 }
1ce86c9a
JD
173
174 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
175 tmp_fd->sessiond_fd = buf->fd;
176 tmp_fd->consumerd_fd = consumerd_fd;
177 tmp_fd->state = buf->state;
178 tmp_fd->max_sb_size = buf->max_sb_size;
179 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
99497cd0 180 tmp_fd->path_name[PATH_MAX - 1] = '\0';
1ce86c9a
JD
181
182 /* Opening the tracefile in write mode */
183 ret = open(tmp_fd->path_name,
184 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
185 if (ret < 0) {
186 ERR("Opening %s", tmp_fd->path_name);
187 perror("open");
188 goto end;
189 }
190 tmp_fd->out_fd = ret;
191 tmp_fd->out_fd_offset = 0;
192
193 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
194 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
195
242cd187
MD
196 cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
197 kconsumerd_data.fds_count++;
198 kconsumerd_data.need_update = 1;
1ce86c9a 199end:
242cd187 200 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
201 return ret;
202}
203
204/*
205 * kconsumerd_change_fd_state
206 *
207 * Update a fd according to what we just received
208 */
209static void kconsumerd_change_fd_state(int sessiond_fd,
210 enum kconsumerd_fd_state state)
211{
212 struct kconsumerd_fd *iter;
0237248c 213
242cd187
MD
214 pthread_mutex_lock(&kconsumerd_data.lock);
215 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
216 if (iter->sessiond_fd == sessiond_fd) {
217 iter->state = state;
218 break;
219 }
220 }
242cd187
MD
221 kconsumerd_data.need_update = 1;
222 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
223}
224
225/*
226 * kconsumerd_update_poll_array
227 *
228 * Allocate the pollfd structure and the local view of the out fds
229 * to avoid doing a lookup in the linked list and concurrency issues
230 * when writing is needed.
231 * Returns the number of fds in the structures
242cd187 232 * Called with kconsumerd_data.lock held.
1ce86c9a
JD
233 */
234static int kconsumerd_update_poll_array(struct pollfd **pollfd,
235 struct kconsumerd_fd **local_kconsumerd_fd)
236{
237 struct kconsumerd_fd *iter;
238 int i = 0;
239
240 DBG("Updating poll fd array");
1ce86c9a 241
242cd187 242 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
243 DBG("Inside for each");
244 if (iter->state == ACTIVE_FD) {
245 DBG("Active FD %d", iter->consumerd_fd);
246 (*pollfd)[i].fd = iter->consumerd_fd;
247 (*pollfd)[i].events = POLLIN | POLLPRI;
248 local_kconsumerd_fd[i] = iter;
249 i++;
250 }
251 }
252
253 /*
254 * insert the kconsumerd_poll_pipe at the end of the array and don't
255 * increment i so nb_fd is the number of real FD
256 */
257 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
258 (*pollfd)[i].events = POLLIN;
1ce86c9a
JD
259 return i;
260}
261
262
263/*
264 * kconsumerd_on_read_subbuffer_mmap
265 *
266 * mmap the ring buffer, read it and write the data to the tracefile.
267 * Returns the number of bytes written
268 */
269static int kconsumerd_on_read_subbuffer_mmap(
270 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
271{
272 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
273 char *mmap_base;
274 char *padding = NULL;
275 long ret = 0;
276 off_t orig_offset = kconsumerd_fd->out_fd_offset;
277 int fd = kconsumerd_fd->consumerd_fd;
278 int outfd = kconsumerd_fd->out_fd;
279
280 /* get the padded subbuffer size to know the padding required */
281 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
282 if (ret != 0) {
283 ret = errno;
284 perror("kernctl_get_padded_subbuf_size");
285 goto end;
286 }
287 padding_len = padded_len - len;
288 padding = malloc(padding_len * sizeof(char));
289 memset(padding, '\0', padding_len);
290
291 /* get the len of the mmap region */
292 ret = kernctl_get_mmap_len(fd, &mmap_len);
293 if (ret != 0) {
294 ret = errno;
295 perror("kernctl_get_mmap_len");
296 goto end;
297 }
298
299 /* get the offset inside the fd to mmap */
300 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
301 if (ret != 0) {
302 ret = errno;
303 perror("kernctl_get_mmap_read_offset");
304 goto end;
305 }
306
307 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
308 if (mmap_base == MAP_FAILED) {
309 perror("Error mmaping");
310 ret = -1;
311 goto end;
312 }
313
314 while (len > 0) {
315 ret = write(outfd, mmap_base, len);
316 if (ret >= len) {
317 len = 0;
318 } else if (ret < 0) {
319 ret = errno;
320 perror("Error in file write");
321 goto end;
322 }
323 /* This won't block, but will start writeout asynchronously */
324 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
325 SYNC_FILE_RANGE_WRITE);
326 kconsumerd_fd->out_fd_offset += ret;
327 }
328
329 /* once all the data is written, write the padding to disk */
330 ret = write(outfd, padding, padding_len);
331 if (ret < 0) {
332 ret = errno;
333 perror("Error writing padding to file");
334 goto end;
335 }
336
337 /*
338 * This does a blocking write-and-wait on any page that belongs to the
339 * subbuffer prior to the one we just wrote.
340 * Don't care about error values, as these are just hints and ways to
341 * limit the amount of page cache used.
342 */
343 if (orig_offset >= kconsumerd_fd->max_sb_size) {
344 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
345 kconsumerd_fd->max_sb_size,
346 SYNC_FILE_RANGE_WAIT_BEFORE
347 | SYNC_FILE_RANGE_WRITE
348 | SYNC_FILE_RANGE_WAIT_AFTER);
349
350 /*
351 * Give hints to the kernel about how we access the file:
352 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
353 * we write it.
354 *
355 * We need to call fadvise again after the file grows because the
356 * kernel does not seem to apply fadvise to non-existing parts of the
357 * file.
358 *
359 * Call fadvise _after_ having waited for the page writeback to
360 * complete because the dirty page writeback semantic is not well
361 * defined. So it can be expected to lead to lower throughput in
362 * streaming.
363 */
364 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
365 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
366 }
367 goto end;
368
369end:
370 if (padding != NULL) {
371 free(padding);
372 }
373 return ret;
374}
375
376/*
377 * kconsumerd_on_read_subbuffer
378 *
379 * Splice the data from the ring buffer to the tracefile.
380 * Returns the number of bytes spliced
381 */
382static int kconsumerd_on_read_subbuffer(
383 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
384{
385 long ret = 0;
386 loff_t offset = 0;
387 off_t orig_offset = kconsumerd_fd->out_fd_offset;
388 int fd = kconsumerd_fd->consumerd_fd;
389 int outfd = kconsumerd_fd->out_fd;
390
391 while (len > 0) {
392 DBG("splice chan to pipe offset %lu (fd : %d)",
393 (unsigned long)offset, fd);
394 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
395 SPLICE_F_MOVE | SPLICE_F_MORE);
396 DBG("splice chan to pipe ret %ld", ret);
397 if (ret < 0) {
398 ret = errno;
399 perror("Error in relay splice");
400 goto splice_error;
401 }
402
403 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
404 SPLICE_F_MOVE | SPLICE_F_MORE);
405 DBG("splice pipe to file %ld", ret);
406 if (ret < 0) {
407 ret = errno;
408 perror("Error in file splice");
409 goto splice_error;
410 }
411 if (ret >= len) {
412 len = 0;
413 }
414 /* This won't block, but will start writeout asynchronously */
415 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
416 SYNC_FILE_RANGE_WRITE);
417 kconsumerd_fd->out_fd_offset += ret;
418 }
419
420 /*
421 * This does a blocking write-and-wait on any page that belongs to the
422 * subbuffer prior to the one we just wrote.
423 * Don't care about error values, as these are just hints and ways to
424 * limit the amount of page cache used.
425 */
426 if (orig_offset >= kconsumerd_fd->max_sb_size) {
427 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
428 kconsumerd_fd->max_sb_size,
429 SYNC_FILE_RANGE_WAIT_BEFORE
430 | SYNC_FILE_RANGE_WRITE
431 | SYNC_FILE_RANGE_WAIT_AFTER);
432 /*
433 * Give hints to the kernel about how we access the file:
434 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
435 * we write it.
436 *
437 * We need to call fadvise again after the file grows because the
438 * kernel does not seem to apply fadvise to non-existing parts of the
439 * file.
440 *
441 * Call fadvise _after_ having waited for the page writeback to
442 * complete because the dirty page writeback semantic is not well
443 * defined. So it can be expected to lead to lower throughput in
444 * streaming.
445 */
446 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
447 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
448 }
449 goto end;
450
451splice_error:
452 /* send the appropriate error description to sessiond */
453 switch(ret) {
454 case EBADF:
455 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
456 break;
457 case EINVAL:
458 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
459 break;
460 case ENOMEM:
461 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
462 break;
463 case ESPIPE:
464 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
465 break;
466 }
467
468end:
469 return ret;
470}
471
472/*
473 * kconsumerd_read_subbuffer
474 *
475 * Consume data on a file descriptor and write it on a trace file
476 */
477static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
478{
479 unsigned long len;
480 int err;
481 long ret = 0;
482 int infd = kconsumerd_fd->consumerd_fd;
483
484 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
485 /* Get the next subbuffer */
486 err = kernctl_get_next_subbuf(infd);
487 if (err != 0) {
488 ret = errno;
489 perror("Reserving sub buffer failed (everything is normal, "
490 "it is due to concurrency)");
491 goto end;
492 }
493
494 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
7d29a247 495 case LTTNG_EVENT_SPLICE:
1ce86c9a
JD
496 /* read the whole subbuffer */
497 err = kernctl_get_padded_subbuf_size(infd, &len);
498 if (err != 0) {
499 ret = errno;
500 perror("Getting sub-buffer len failed.");
501 goto end;
502 }
503
504 /* splice the subbuffer to the tracefile */
505 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
506 if (ret < 0) {
507 /*
508 * display the error but continue processing to try
509 * to release the subbuffer
510 */
511 ERR("Error splicing to tracefile");
512 }
513 break;
7d29a247 514 case LTTNG_EVENT_MMAP:
1ce86c9a
JD
515 /* read the used subbuffer size */
516 err = kernctl_get_subbuf_size(infd, &len);
517 if (err != 0) {
518 ret = errno;
519 perror("Getting sub-buffer len failed.");
520 goto end;
521 }
522 /* write the subbuffer to the tracefile */
523 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
524 if (ret < 0) {
525 /*
526 * display the error but continue processing to try
527 * to release the subbuffer
528 */
529 ERR("Error writing to tracefile");
530 }
531 break;
532 default:
533 ERR("Unknown output method");
534 ret = -1;
535 }
536
537 err = kernctl_put_next_subbuf(infd);
538 if (err != 0) {
539 ret = errno;
540 if (errno == EFAULT) {
541 perror("Error in unreserving sub buffer\n");
542 } else if (errno == EIO) {
543 /* Should never happen with newer LTTng versions */
544 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
545 }
546 goto end;
547 }
548
549end:
550 return ret;
551}
552
4de84ad9
JD
553/*
554 * kconsumerd_poll_socket
555 *
556 * Poll on the should_quit pipe and the command socket
557 * return -1 on error and should exit, 0 if data is
558 * available on the command socket
559 */
560int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
561{
562 int num_rdy;
563
564 num_rdy = poll(kconsumerd_sockpoll, 2, -1);
565 if (num_rdy == -1) {
566 perror("Poll error");
567 goto exit;
568 }
569 if (kconsumerd_sockpoll[0].revents == POLLIN) {
570 DBG("kconsumerd_should_quit wake up");
571 goto exit;
572 }
573 return 0;
574
575exit:
576 return -1;
577}
578
1ce86c9a
JD
579/*
580 * kconsumerd_consumerd_recv_fd
581 *
582 * Receives an array of file descriptors and the associated
583 * structures describing each fd (path name).
584 * Returns the size of received data
585 */
4de84ad9
JD
586static int kconsumerd_consumerd_recv_fd(int sfd,
587 struct pollfd *kconsumerd_sockpoll, int size,
1ce86c9a
JD
588 enum kconsumerd_command cmd_type)
589{
1ce86c9a
JD
590 struct iovec iov[1];
591 int ret = 0, i, tmp2;
592 struct cmsghdr *cmsg;
593 int nb_fd;
594 char recv_fd[CMSG_SPACE(sizeof(int))];
595 struct lttcomm_kconsumerd_msg lkm;
596
597 /* the number of fds we are about to receive */
598 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
599
159c7ff4 600 /*
8b97b5dd 601 * nb_fd is the number of fds we receive. One fd per recvmsg.
159c7ff4 602 */
1ce86c9a 603 for (i = 0; i < nb_fd; i++) {
159c7ff4 604 struct msghdr msg = { 0 };
1ce86c9a
JD
605
606 /* Prepare to receive the structures */
607 iov[0].iov_base = &lkm;
608 iov[0].iov_len = sizeof(lkm);
609 msg.msg_iov = iov;
610 msg.msg_iovlen = 1;
611
612 msg.msg_control = recv_fd;
613 msg.msg_controllen = sizeof(recv_fd);
614
615 DBG("Waiting to receive fd");
4de84ad9
JD
616 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
617 goto end;
618 }
619
1ce86c9a
JD
620 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
621 perror("recvmsg");
622 continue;
623 }
624
625 if (ret != (size / nb_fd)) {
626 ERR("Received only %d, expected %d", ret, size);
627 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
628 goto end;
629 }
630
631 cmsg = CMSG_FIRSTHDR(&msg);
632 if (!cmsg) {
633 ERR("Invalid control message header");
634 ret = -1;
635 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
636 goto end;
637 }
159c7ff4 638
1ce86c9a
JD
639 /* if we received fds */
640 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
641 switch (cmd_type) {
642 case ADD_STREAM:
159c7ff4
MD
643 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
644 ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
1ce86c9a
JD
645 if (ret < 0) {
646 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
647 goto end;
648 }
649 break;
650 case UPDATE_STREAM:
651 kconsumerd_change_fd_state(lkm.fd, lkm.state);
652 break;
653 default:
654 break;
655 }
1ce86c9a
JD
656 /* signal the poll thread */
657 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
f40799e8
DG
658 if (tmp2 < 0) {
659 perror("write kconsumerd poll");
660 }
1ce86c9a
JD
661 } else {
662 ERR("Didn't received any fd");
663 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
664 ret = -1;
665 goto end;
666 }
667 }
668
669end:
1ce86c9a
JD
670 return ret;
671}
672
673/*
674 * kconsumerd_thread_poll_fds
675 *
676 * This thread polls the fds in the ltt_fd_list to consume the data
677 * and write it to tracefile if necessary.
678 */
679void *kconsumerd_thread_poll_fds(void *data)
680{
681 int num_rdy, num_hup, high_prio, ret, i;
682 struct pollfd *pollfd = NULL;
683 /* local view of the fds */
684 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
242cd187 685 /* local view of kconsumerd_data.fds_count */
1ce86c9a
JD
686 int nb_fd = 0;
687 char tmp;
688 int tmp2;
689
690 ret = pipe(kconsumerd_thread_pipe);
691 if (ret < 0) {
692 perror("Error creating pipe");
693 goto end;
694 }
695
696 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
697
698 while (1) {
699 high_prio = 0;
700 num_hup = 0;
701
702 /*
703 * the ltt_fd_list has been updated, we need to update our
704 * local array as well
705 */
242cd187
MD
706 pthread_mutex_lock(&kconsumerd_data.lock);
707 if (kconsumerd_data.need_update) {
1ce86c9a
JD
708 if (pollfd != NULL) {
709 free(pollfd);
710 pollfd = NULL;
711 }
712 if (local_kconsumerd_fd != NULL) {
713 free(local_kconsumerd_fd);
714 local_kconsumerd_fd = NULL;
715 }
0237248c 716
1ce86c9a 717 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 718 pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
1ce86c9a
JD
719 if (pollfd == NULL) {
720 perror("pollfd malloc");
242cd187 721 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
722 goto end;
723 }
0237248c 724
1ce86c9a 725 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 726 local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
1ce86c9a
JD
727 sizeof(struct kconsumerd_fd));
728 if (local_kconsumerd_fd == NULL) {
729 perror("local_kconsumerd_fd malloc");
242cd187 730 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
731 goto end;
732 }
733 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
734 if (ret < 0) {
735 ERR("Error in allocating pollfd or local_outfds");
736 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
242cd187 737 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
738 goto end;
739 }
740 nb_fd = ret;
242cd187 741 kconsumerd_data.need_update = 0;
1ce86c9a 742 }
242cd187 743 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
744
745 /* poll on the array of fds */
746 DBG("polling on %d fd", nb_fd + 1);
747 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
748 DBG("poll num_rdy : %d", num_rdy);
749 if (num_rdy == -1) {
750 perror("Poll error");
751 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
752 goto end;
753 } else if (num_rdy == 0) {
754 DBG("Polling thread timed out");
755 goto end;
756 }
757
758 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
759 if (nb_fd == 0 && kconsumerd_quit == 1) {
760 goto end;
761 }
762
763 /*
242cd187
MD
764 * If the kconsumerd_poll_pipe triggered poll go
765 * directly to the beginning of the loop to update the
766 * array. We want to prioritize array update over
767 * low-priority reads.
1ce86c9a 768 */
242cd187 769 if (pollfd[nb_fd].revents == POLLIN) {
1ce86c9a
JD
770 DBG("kconsumerd_poll_pipe wake up");
771 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
f40799e8
DG
772 if (tmp2 < 0) {
773 perror("read kconsumerd poll");
774 }
1ce86c9a
JD
775 continue;
776 }
777
778 /* Take care of high priority channels first. */
779 for (i = 0; i < nb_fd; i++) {
780 switch(pollfd[i].revents) {
781 case POLLERR:
782 ERR("Error returned in polling fd %d.", pollfd[i].fd);
783 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
784 num_hup++;
785 break;
786 case POLLHUP:
787 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
788 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
789 num_hup++;
790 break;
791 case POLLNVAL:
792 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
793 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
794 num_hup++;
795 break;
796 case POLLPRI:
797 DBG("Urgent read on fd %d", pollfd[i].fd);
798 high_prio = 1;
799 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
800 /* it's ok to have an unavailable sub-buffer */
801 if (ret == EAGAIN) {
802 ret = 0;
803 }
804 break;
805 }
806 }
807
808 /* If every buffer FD has hung up, we end the read loop here */
809 if (nb_fd > 0 && num_hup == nb_fd) {
810 DBG("every buffer FD has hung up\n");
811 if (kconsumerd_quit == 1) {
812 goto end;
813 }
814 continue;
815 }
816
817 /* Take care of low priority channels. */
818 if (high_prio == 0) {
819 for (i = 0; i < nb_fd; i++) {
820 if (pollfd[i].revents == POLLIN) {
821 DBG("Normal read on fd %d", pollfd[i].fd);
822 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
823 /* it's ok to have an unavailable subbuffer */
824 if (ret == EAGAIN) {
825 ret = 0;
826 }
827 }
828 }
829 }
830 }
831end:
832 DBG("polling thread exiting");
833 if (pollfd != NULL) {
834 free(pollfd);
835 pollfd = NULL;
836 }
837 if (local_kconsumerd_fd != NULL) {
838 free(local_kconsumerd_fd);
839 local_kconsumerd_fd = NULL;
840 }
1ce86c9a
JD
841 return NULL;
842}
843
844/*
4de84ad9 845 * kconsumerd_init(void)
1ce86c9a 846 *
4de84ad9
JD
847 * initialise the necessary environnement :
848 * - inform the polling thread to update the polling array
849 * - create the poll_pipe
850 * - create the should_quit pipe (for signal handler)
1ce86c9a 851 */
4de84ad9 852int kconsumerd_init(void)
1ce86c9a 853{
4de84ad9
JD
854 int ret;
855
856 /* need to update the polling array at init time */
857 kconsumerd_data.need_update = 1;
858
859 ret = pipe(kconsumerd_poll_pipe);
860 if (ret < 0) {
861 perror("Error creating poll pipe");
862 goto end;
863 }
864
865 ret = pipe(kconsumerd_should_quit);
866 if (ret < 0) {
867 perror("Error creating recv pipe");
868 goto end;
869 }
870
871end:
872 return ret;
1ce86c9a
JD
873}
874
875/*
876 * kconsumerd_thread_receive_fds
877 *
878 * This thread listens on the consumerd socket and
879 * receives the file descriptors from ltt-sessiond
880 */
881void *kconsumerd_thread_receive_fds(void *data)
882{
883 int sock, client_socket, ret;
884 struct lttcomm_kconsumerd_header tmp;
4de84ad9
JD
885 /*
886 * structure to poll for incoming data on communication socket
887 * avoids making blocking sockets
888 */
889 struct pollfd kconsumerd_sockpoll[2];
890
1ce86c9a
JD
891
892 DBG("Creating command socket %s", kconsumerd_command_sock_path);
893 unlink(kconsumerd_command_sock_path);
894 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
895 if (client_socket < 0) {
896 ERR("Cannot create command socket");
897 goto end;
898 }
899
900 ret = lttcomm_listen_unix_sock(client_socket);
901 if (ret < 0) {
902 goto end;
903 }
904
905 DBG("Sending ready command to ltt-sessiond");
906 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
907 if (ret < 0) {
908 ERR("Error sending ready command to ltt-sessiond");
909 goto end;
910 }
911
4de84ad9
JD
912 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
913 if (ret < 0) {
914 perror("fcntl O_NONBLOCK");
915 goto end;
916 }
917
918 /* prepare the FDs to poll : to client socket and the should_quit pipe */
919 kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
920 kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
921 kconsumerd_sockpoll[1].fd = client_socket;
922 kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
923
924 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
925 goto end;
926 }
927 DBG("Connection on client_socket");
928
1ce86c9a
JD
929 /* Blocking call, waiting for transmission */
930 sock = lttcomm_accept_unix_sock(client_socket);
931 if (sock <= 0) {
932 WARN("On accept");
933 goto end;
934 }
4de84ad9
JD
935 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
936 if (ret < 0) {
937 perror("fcntl O_NONBLOCK");
938 goto end;
939 }
940
941 /* update the polling structure to poll on the established socket */
942 kconsumerd_sockpoll[1].fd = sock;
943 kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
944
1ce86c9a 945 while (1) {
4de84ad9
JD
946 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
947 goto end;
948 }
949 DBG("Incoming fds on sock");
950
1ce86c9a
JD
951 /* We first get the number of fd we are about to receive */
952 ret = lttcomm_recv_unix_sock(sock, &tmp,
953 sizeof(struct lttcomm_kconsumerd_header));
954 if (ret <= 0) {
955 ERR("Communication interrupted on command socket");
956 goto end;
957 }
958 if (tmp.cmd_type == STOP) {
959 DBG("Received STOP command");
960 goto end;
961 }
3dcd2721
MD
962 if (kconsumerd_quit) {
963 DBG("kconsumerd_thread_receive_fds received quit from signal");
964 goto end;
965 }
4de84ad9 966
1ce86c9a 967 /* we received a command to add or update fds */
4de84ad9
JD
968 ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
969 tmp.payload_size, tmp.cmd_type);
1ce86c9a
JD
970 if (ret <= 0) {
971 ERR("Receiving the FD, exiting");
972 goto end;
973 }
4de84ad9 974 DBG("received fds on sock");
1ce86c9a
JD
975 }
976
977end:
978 DBG("kconsumerd_thread_receive_fds exiting");
979
980 /*
981 * when all fds have hung up, the polling thread
982 * can exit cleanly
983 */
984 kconsumerd_quit = 1;
985
986 /*
987 * 2s of grace period, if no polling events occur during
988 * this period, the polling thread will exit even if there
989 * are still open FDs (should not happen, but safety mechanism).
990 */
991 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
992
993 /* wake up the polling thread */
994 ret = write(kconsumerd_poll_pipe[1], "4", 1);
995 if (ret < 0) {
996 perror("poll pipe write");
997 }
998 return NULL;
999}
1000
1001/*
1002 * kconsumerd_cleanup
1003 *
1004 * Cleanup the daemon's socket on exit
1005 */
3dcd2721 1006void kconsumerd_cleanup(void)
1ce86c9a 1007{
fbf10601 1008 struct kconsumerd_fd *iter, *tmp;
1ce86c9a
JD
1009
1010 /* remove the socket file */
1011 unlink(kconsumerd_command_sock_path);
1012
3dcd2721
MD
1013 /*
1014 * close all outfd. Called when there are no more threads
1015 * running (after joining on the threads), no need to protect
1016 * list iteration with mutex.
1017 */
fbf10601 1018 cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
1019 kconsumerd_del_fd(iter);
1020 }
1021}
1022
3dcd2721 1023/*
4de84ad9
JD
1024 * kconsumerd_should_exit
1025 *
3dcd2721
MD
1026 * Called from signal handler.
1027 */
1028void kconsumerd_should_exit(void)
1029{
4de84ad9 1030 int ret;
3dcd2721 1031 kconsumerd_quit = 1;
4de84ad9 1032 ret = write(kconsumerd_should_quit[1], "4", 1);
f40799e8
DG
1033 if (ret < 0) {
1034 perror("write kconsumerd quit");
1035 }
3dcd2721
MD
1036}
1037
1ce86c9a
JD
1038/*
1039 * kconsumerd_send_error
1040 *
1041 * send return code to ltt-sessiond
1042 */
1043int kconsumerd_send_error(enum lttcomm_return_code cmd)
1044{
1045 if (kconsumerd_error_socket > 0) {
1046 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
1047 sizeof(enum lttcomm_sessiond_command));
1048 }
1049
1050 return 0;
1051}
This page took 0.110248 seconds and 4 git commands to generate.