2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
30 #include <urcu/list.h>
32 #include <lttng/lttng-kconsumerd.h>
34 #include "kernelctl.h"
36 #include "lttng-sessiond-comm.h"
38 static struct lttng_kconsumerd_global_data
{
40 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
41 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It ensures
42 * the count matches the number of items in the fd_list. It ensures the
43 * list updates *always* trigger an fd_array update (therefore need to make
44 * list update vs kconsumerd_data.need_update flag update atomic, and also
45 * flag read, fd array and flag clear atomic).
49 * Number of element for the list below. Protected by kconsumerd_data.lock.
51 unsigned int fds_count
;
53 * List of FDs. Protected by kconsumerd_data.lock.
55 struct lttng_kconsumerd_fd_list fd_list
;
57 * Flag specifying if the local array of FDs needs update in the poll
58 * function. Protected by kconsumerd_data.lock.
60 unsigned int need_update
;
62 .fd_list
.head
= CDS_LIST_HEAD_INIT(kconsumerd_data
.fd_list
.head
),
67 /* timeout parameter, to control the polling thread grace period. */
68 static int kconsumerd_poll_timeout
= -1;
71 * Flag to inform the polling thread to quit when all fd hung up. Updated by
72 * the kconsumerd_thread_receive_fds when it notices that all fds has hung up.
73 * Also updated by the signal handler (kconsumerd_should_exit()). Read by the
76 static volatile int kconsumerd_quit
= 0;
79 * Find a session fd in the global list. The kconsumerd_data.lock must be
80 * locked during this call.
82 * Return 1 if found else 0.
84 static int kconsumerd_find_session_fd(int fd
)
86 struct lttng_kconsumerd_fd
*iter
;
88 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
89 if (iter
->sessiond_fd
== fd
) {
90 DBG("Duplicate session fd %d", fd
);
99 * Remove a fd from the global list protected by a mutex.
101 static void kconsumerd_del_fd(struct lttng_kconsumerd_fd
*lcf
)
104 pthread_mutex_lock(&kconsumerd_data
.lock
);
105 cds_list_del(&lcf
->list
);
106 if (kconsumerd_data
.fds_count
> 0) {
107 kconsumerd_data
.fds_count
--;
109 if (lcf
->mmap_base
!= NULL
) {
110 ret
= munmap(lcf
->mmap_base
, lcf
->mmap_len
);
115 if (lcf
->out_fd
!= 0) {
118 close(lcf
->consumerd_fd
);
123 kconsumerd_data
.need_update
= 1;
124 pthread_mutex_unlock(&kconsumerd_data
.lock
);
128 * Add a fd to the global list protected by a mutex.
130 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg
*buf
,
133 struct lttng_kconsumerd_fd
*tmp_fd
;
136 pthread_mutex_lock(&kconsumerd_data
.lock
);
137 /* Check if already exist */
138 ret
= kconsumerd_find_session_fd(buf
->fd
);
143 tmp_fd
= malloc(sizeof(struct lttng_kconsumerd_fd
));
144 tmp_fd
->sessiond_fd
= buf
->fd
;
145 tmp_fd
->consumerd_fd
= consumerd_fd
;
146 tmp_fd
->state
= buf
->state
;
147 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
149 tmp_fd
->out_fd_offset
= 0;
150 tmp_fd
->mmap_len
= 0;
151 tmp_fd
->mmap_base
= NULL
;
152 tmp_fd
->output
= buf
->output
;
153 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
154 tmp_fd
->path_name
[PATH_MAX
- 1] = '\0';
156 /* Opening the tracefile in write mode */
157 if (tmp_fd
->path_name
!= NULL
) {
158 ret
= open(tmp_fd
->path_name
,
159 O_WRONLY
|O_CREAT
|O_TRUNC
, S_IRWXU
|S_IRWXG
|S_IRWXO
);
161 ERR("Opening %s", tmp_fd
->path_name
);
165 tmp_fd
->out_fd
= ret
;
166 DBG("Adding %s (%d, %d, %d)", tmp_fd
->path_name
,
167 tmp_fd
->sessiond_fd
, tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
170 if (tmp_fd
->output
== LTTNG_EVENT_MMAP
) {
171 /* get the len of the mmap region */
172 ret
= kernctl_get_mmap_len(tmp_fd
->consumerd_fd
, &tmp_fd
->mmap_len
);
175 perror("kernctl_get_mmap_len");
179 tmp_fd
->mmap_base
= mmap(NULL
, tmp_fd
->mmap_len
,
180 PROT_READ
, MAP_PRIVATE
, tmp_fd
->consumerd_fd
, 0);
181 if (tmp_fd
->mmap_base
== MAP_FAILED
) {
182 perror("Error mmaping");
188 cds_list_add(&tmp_fd
->list
, &kconsumerd_data
.fd_list
.head
);
189 kconsumerd_data
.fds_count
++;
190 kconsumerd_data
.need_update
= 1;
192 pthread_mutex_unlock(&kconsumerd_data
.lock
);
197 * Update a fd according to what we just received.
199 static void kconsumerd_change_fd_state(int sessiond_fd
,
200 enum lttng_kconsumerd_fd_state state
)
202 struct lttng_kconsumerd_fd
*iter
;
204 pthread_mutex_lock(&kconsumerd_data
.lock
);
205 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
206 if (iter
->sessiond_fd
== sessiond_fd
) {
211 kconsumerd_data
.need_update
= 1;
212 pthread_mutex_unlock(&kconsumerd_data
.lock
);
216 * Allocate the pollfd structure and the local view of the out fds to avoid
217 * doing a lookup in the linked list and concurrency issues when writing is
218 * needed. Called with kconsumerd_data.lock held.
220 * Returns the number of fds in the structures.
222 static int kconsumerd_update_poll_array(
223 struct lttng_kconsumerd_local_data
*ctx
, struct pollfd
**pollfd
,
224 struct lttng_kconsumerd_fd
**local_kconsumerd_fd
)
226 struct lttng_kconsumerd_fd
*iter
;
229 DBG("Updating poll fd array");
230 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
231 if (iter
->state
== ACTIVE_FD
) {
232 DBG("Active FD %d", iter
->consumerd_fd
);
233 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
234 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
235 local_kconsumerd_fd
[i
] = iter
;
241 * Insert the kconsumerd_poll_pipe at the end of the array and don't
242 * increment i so nb_fd is the number of real FD.
244 (*pollfd
)[i
].fd
= ctx
->kconsumerd_poll_pipe
[0];
245 (*pollfd
)[i
].events
= POLLIN
;
250 * Receives an array of file descriptors and the associated structures
251 * describing each fd (path name).
253 * Returns the size of received data
255 static int kconsumerd_consumerd_recv_fd(
256 struct lttng_kconsumerd_local_data
*ctx
, int sfd
,
257 struct pollfd
*kconsumerd_sockpoll
, int size
,
258 enum lttng_kconsumerd_command cmd_type
)
261 int ret
= 0, i
, tmp2
;
262 struct cmsghdr
*cmsg
;
264 char recv_fd
[CMSG_SPACE(sizeof(int))];
265 struct lttcomm_kconsumerd_msg lkm
;
267 /* the number of fds we are about to receive */
268 nb_fd
= size
/ sizeof(struct lttcomm_kconsumerd_msg
);
271 * nb_fd is the number of fds we receive. One fd per recvmsg.
273 for (i
= 0; i
< nb_fd
; i
++) {
274 struct msghdr msg
= { 0 };
276 /* Prepare to receive the structures */
277 iov
[0].iov_base
= &lkm
;
278 iov
[0].iov_len
= sizeof(lkm
);
282 msg
.msg_control
= recv_fd
;
283 msg
.msg_controllen
= sizeof(recv_fd
);
285 DBG("Waiting to receive fd");
286 if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
290 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
295 if (ret
!= (size
/ nb_fd
)) {
296 ERR("Received only %d, expected %d", ret
, size
);
297 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
301 cmsg
= CMSG_FIRSTHDR(&msg
);
303 ERR("Invalid control message header");
305 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
309 /* if we received fds */
310 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
313 DBG("kconsumerd_add_fd %s (%d)", lkm
.path_name
,
314 ((int *) CMSG_DATA(cmsg
))[0]);
316 ret
= kconsumerd_add_fd(&lkm
, ((int *) CMSG_DATA(cmsg
))[0]);
318 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_OUTFD_ERROR
);
323 kconsumerd_change_fd_state(lkm
.fd
, lkm
.state
);
328 /* signal the poll thread */
329 tmp2
= write(ctx
->kconsumerd_poll_pipe
[1], "4", 1);
331 perror("write kconsumerd poll");
334 ERR("Didn't received any fd");
335 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_ERROR_RECV_FD
);
346 * Set the error socket.
348 void lttng_kconsumerd_set_error_sock(
349 struct lttng_kconsumerd_local_data
*ctx
, int sock
)
351 ctx
->kconsumerd_error_socket
= sock
;
355 * Set the command socket path.
358 void lttng_kconsumerd_set_command_sock_path(
359 struct lttng_kconsumerd_local_data
*ctx
, char *sock
)
361 ctx
->kconsumerd_command_sock_path
= sock
;
365 * Mmap the ring buffer, read it and write the data to the tracefile.
367 * Returns the number of bytes written
369 int lttng_kconsumerd_on_read_subbuffer_mmap(
370 struct lttng_kconsumerd_local_data
*ctx
,
371 struct lttng_kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
373 unsigned long mmap_offset
;
374 char *padding
= NULL
;
376 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
377 int fd
= kconsumerd_fd
->consumerd_fd
;
378 int outfd
= kconsumerd_fd
->out_fd
;
380 /* get the offset inside the fd to mmap */
381 ret
= kernctl_get_mmap_read_offset(fd
, &mmap_offset
);
384 perror("kernctl_get_mmap_read_offset");
389 ret
= write(outfd
, kconsumerd_fd
->mmap_base
+ mmap_offset
, len
);
392 } else if (ret
< 0) {
394 perror("Error in file write");
397 /* This won't block, but will start writeout asynchronously */
398 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
399 SYNC_FILE_RANGE_WRITE
);
400 kconsumerd_fd
->out_fd_offset
+= ret
;
404 * This does a blocking write-and-wait on any page that belongs to the
405 * subbuffer prior to the one we just wrote.
406 * Don't care about error values, as these are just hints and ways to
407 * limit the amount of page cache used.
409 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
410 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
411 kconsumerd_fd
->max_sb_size
,
412 SYNC_FILE_RANGE_WAIT_BEFORE
413 | SYNC_FILE_RANGE_WRITE
414 | SYNC_FILE_RANGE_WAIT_AFTER
);
417 * Give hints to the kernel about how we access the file:
418 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
421 * We need to call fadvise again after the file grows because the
422 * kernel does not seem to apply fadvise to non-existing parts of the
425 * Call fadvise _after_ having waited for the page writeback to
426 * complete because the dirty page writeback semantic is not well
427 * defined. So it can be expected to lead to lower throughput in
430 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
431 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
436 if (padding
!= NULL
) {
443 * Splice the data from the ring buffer to the tracefile.
445 * Returns the number of bytes spliced.
447 int lttng_kconsumerd_on_read_subbuffer_splice(
448 struct lttng_kconsumerd_local_data
*ctx
,
449 struct lttng_kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
453 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
454 int fd
= kconsumerd_fd
->consumerd_fd
;
455 int outfd
= kconsumerd_fd
->out_fd
;
458 DBG("splice chan to pipe offset %lu (fd : %d)",
459 (unsigned long)offset
, fd
);
460 ret
= splice(fd
, &offset
, ctx
->kconsumerd_thread_pipe
[1], NULL
, len
,
461 SPLICE_F_MOVE
| SPLICE_F_MORE
);
462 DBG("splice chan to pipe ret %ld", ret
);
465 perror("Error in relay splice");
469 ret
= splice(ctx
->kconsumerd_thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
470 SPLICE_F_MOVE
| SPLICE_F_MORE
);
471 DBG("splice pipe to file %ld", ret
);
474 perror("Error in file splice");
478 /* This won't block, but will start writeout asynchronously */
479 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
480 SYNC_FILE_RANGE_WRITE
);
481 kconsumerd_fd
->out_fd_offset
+= ret
;
485 * This does a blocking write-and-wait on any page that belongs to the
486 * subbuffer prior to the one we just wrote.
487 * Don't care about error values, as these are just hints and ways to
488 * limit the amount of page cache used.
490 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
491 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
492 kconsumerd_fd
->max_sb_size
,
493 SYNC_FILE_RANGE_WAIT_BEFORE
494 | SYNC_FILE_RANGE_WRITE
495 | SYNC_FILE_RANGE_WAIT_AFTER
);
497 * Give hints to the kernel about how we access the file:
498 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
501 * We need to call fadvise again after the file grows because the
502 * kernel does not seem to apply fadvise to non-existing parts of the
505 * Call fadvise _after_ having waited for the page writeback to
506 * complete because the dirty page writeback semantic is not well
507 * defined. So it can be expected to lead to lower throughput in
510 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
511 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
516 /* send the appropriate error description to sessiond */
519 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_EBADF
);
522 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_EINVAL
);
525 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_ENOMEM
);
528 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_SPLICE_ESPIPE
);
537 * Poll on the should_quit pipe and the command socket return -1 on error and
538 * should exit, 0 if data is available on the command socket
540 int lttng_kconsumerd_poll_socket(struct pollfd
*kconsumerd_sockpoll
)
544 num_rdy
= poll(kconsumerd_sockpoll
, 2, -1);
546 perror("Poll error");
549 if (kconsumerd_sockpoll
[0].revents
== POLLIN
) {
550 DBG("kconsumerd_should_quit wake up");
560 * This thread polls the fds in the ltt_fd_list to consume the data and write
561 * it to tracefile if necessary.
563 void *lttng_kconsumerd_thread_poll_fds(void *data
)
565 int num_rdy
, num_hup
, high_prio
, ret
, i
;
566 struct pollfd
*pollfd
= NULL
;
567 /* local view of the fds */
568 struct lttng_kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
569 /* local view of kconsumerd_data.fds_count */
573 struct lttng_kconsumerd_local_data
*ctx
= data
;
576 local_kconsumerd_fd
= malloc(sizeof(struct lttng_kconsumerd_fd
));
583 * the ltt_fd_list has been updated, we need to update our
584 * local array as well
586 pthread_mutex_lock(&kconsumerd_data
.lock
);
587 if (kconsumerd_data
.need_update
) {
588 if (pollfd
!= NULL
) {
592 if (local_kconsumerd_fd
!= NULL
) {
593 free(local_kconsumerd_fd
);
594 local_kconsumerd_fd
= NULL
;
597 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
598 pollfd
= malloc((kconsumerd_data
.fds_count
+ 1) * sizeof(struct pollfd
));
599 if (pollfd
== NULL
) {
600 perror("pollfd malloc");
601 pthread_mutex_unlock(&kconsumerd_data
.lock
);
605 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
606 local_kconsumerd_fd
= malloc((kconsumerd_data
.fds_count
+ 1) *
607 sizeof(struct lttng_kconsumerd_fd
));
608 if (local_kconsumerd_fd
== NULL
) {
609 perror("local_kconsumerd_fd malloc");
610 pthread_mutex_unlock(&kconsumerd_data
.lock
);
613 ret
= kconsumerd_update_poll_array(ctx
, &pollfd
, local_kconsumerd_fd
);
615 ERR("Error in allocating pollfd or local_outfds");
616 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_POLL_ERROR
);
617 pthread_mutex_unlock(&kconsumerd_data
.lock
);
621 kconsumerd_data
.need_update
= 0;
623 pthread_mutex_unlock(&kconsumerd_data
.lock
);
625 /* poll on the array of fds */
626 DBG("polling on %d fd", nb_fd
+ 1);
627 num_rdy
= poll(pollfd
, nb_fd
+ 1, kconsumerd_poll_timeout
);
628 DBG("poll num_rdy : %d", num_rdy
);
630 perror("Poll error");
631 lttng_kconsumerd_send_error(ctx
, KCONSUMERD_POLL_ERROR
);
633 } else if (num_rdy
== 0) {
634 DBG("Polling thread timed out");
638 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
639 if (nb_fd
== 0 && kconsumerd_quit
== 1) {
644 * If the kconsumerd_poll_pipe triggered poll go
645 * directly to the beginning of the loop to update the
646 * array. We want to prioritize array update over
647 * low-priority reads.
649 if (pollfd
[nb_fd
].revents
== POLLIN
) {
650 DBG("kconsumerd_poll_pipe wake up");
651 tmp2
= read(ctx
->kconsumerd_poll_pipe
[0], &tmp
, 1);
653 perror("read kconsumerd poll");
658 /* Take care of high priority channels first. */
659 for (i
= 0; i
< nb_fd
; i
++) {
660 switch(pollfd
[i
].revents
) {
662 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
663 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
667 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
668 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
672 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
673 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
677 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
679 ret
= ctx
->on_buffer_ready(local_kconsumerd_fd
[i
]);
680 /* it's ok to have an unavailable sub-buffer */
688 /* If every buffer FD has hung up, we end the read loop here */
689 if (nb_fd
> 0 && num_hup
== nb_fd
) {
690 DBG("every buffer FD has hung up\n");
691 if (kconsumerd_quit
== 1) {
697 /* Take care of low priority channels. */
698 if (high_prio
== 0) {
699 for (i
= 0; i
< nb_fd
; i
++) {
700 if (pollfd
[i
].revents
== POLLIN
) {
701 DBG("Normal read on fd %d", pollfd
[i
].fd
);
702 ret
= ctx
->on_buffer_ready(local_kconsumerd_fd
[i
]);
703 /* it's ok to have an unavailable subbuffer */
712 DBG("polling thread exiting");
713 if (pollfd
!= NULL
) {
717 if (local_kconsumerd_fd
!= NULL
) {
718 free(local_kconsumerd_fd
);
719 local_kconsumerd_fd
= NULL
;
725 * Initialise the necessary environnement :
726 * - create a new context
727 * - create the poll_pipe
728 * - create the should_quit pipe (for signal handler)
729 * - create the thread pipe (for splice)
731 * Takes a function pointer as argument, this function is called when data is
732 * available on a buffer. This function is responsible to do the
733 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
734 * buffer configuration and then kernctl_put_next_subbuf at the end.
736 * Returns a pointer to the new context or NULL on error.
738 struct lttng_kconsumerd_local_data
*lttng_kconsumerd_create(
739 int (*buffer_ready
)(struct lttng_kconsumerd_fd
*kconsumerd_fd
))
742 struct lttng_kconsumerd_local_data
*ctx
;
744 ctx
= malloc(sizeof(struct lttng_kconsumerd_local_data
));
746 perror("allocating context");
750 ctx
->on_buffer_ready
= buffer_ready
;
752 ret
= pipe(ctx
->kconsumerd_poll_pipe
);
754 perror("Error creating poll pipe");
759 ret
= pipe(ctx
->kconsumerd_should_quit
);
761 perror("Error creating recv pipe");
766 ret
= pipe(ctx
->kconsumerd_thread_pipe
);
768 perror("Error creating thread pipe");
778 * Close all fds associated with the instance and free the context.
780 void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data
*ctx
)
782 close(ctx
->kconsumerd_error_socket
);
783 close(ctx
->kconsumerd_thread_pipe
[0]);
784 close(ctx
->kconsumerd_thread_pipe
[1]);
785 close(ctx
->kconsumerd_poll_pipe
[0]);
786 close(ctx
->kconsumerd_poll_pipe
[1]);
787 close(ctx
->kconsumerd_should_quit
[0]);
788 close(ctx
->kconsumerd_should_quit
[1]);
789 unlink(ctx
->kconsumerd_command_sock_path
);
795 * This thread listens on the consumerd socket and receives the file
796 * descriptors from the session daemon.
798 void *lttng_kconsumerd_thread_receive_fds(void *data
)
800 int sock
, client_socket
, ret
;
801 struct lttcomm_kconsumerd_header tmp
;
803 * structure to poll for incoming data on communication socket avoids
804 * making blocking sockets.
806 struct pollfd kconsumerd_sockpoll
[2];
807 struct lttng_kconsumerd_local_data
*ctx
= data
;
810 DBG("Creating command socket %s", ctx
->kconsumerd_command_sock_path
);
811 unlink(ctx
->kconsumerd_command_sock_path
);
812 client_socket
= lttcomm_create_unix_sock(ctx
->kconsumerd_command_sock_path
);
813 if (client_socket
< 0) {
814 ERR("Cannot create command socket");
818 ret
= lttcomm_listen_unix_sock(client_socket
);
823 DBG("Sending ready command to ltt-sessiond");
824 ret
= lttng_kconsumerd_send_error(ctx
, KCONSUMERD_COMMAND_SOCK_READY
);
826 ERR("Error sending ready command to ltt-sessiond");
830 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
832 perror("fcntl O_NONBLOCK");
836 /* prepare the FDs to poll : to client socket and the should_quit pipe */
837 kconsumerd_sockpoll
[0].fd
= ctx
->kconsumerd_should_quit
[0];
838 kconsumerd_sockpoll
[0].events
= POLLIN
| POLLPRI
;
839 kconsumerd_sockpoll
[1].fd
= client_socket
;
840 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
842 if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
845 DBG("Connection on client_socket");
847 /* Blocking call, waiting for transmission */
848 sock
= lttcomm_accept_unix_sock(client_socket
);
853 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
855 perror("fcntl O_NONBLOCK");
859 /* update the polling structure to poll on the established socket */
860 kconsumerd_sockpoll
[1].fd
= sock
;
861 kconsumerd_sockpoll
[1].events
= POLLIN
| POLLPRI
;
864 if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll
) < 0) {
867 DBG("Incoming fds on sock");
869 /* We first get the number of fd we are about to receive */
870 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
871 sizeof(struct lttcomm_kconsumerd_header
));
873 ERR("Communication interrupted on command socket");
876 if (tmp
.cmd_type
== STOP
) {
877 DBG("Received STOP command");
880 if (kconsumerd_quit
) {
881 DBG("kconsumerd_thread_receive_fds received quit from signal");
885 /* we received a command to add or update fds */
886 ret
= kconsumerd_consumerd_recv_fd(ctx
, sock
, kconsumerd_sockpoll
,
887 tmp
.payload_size
, tmp
.cmd_type
);
889 ERR("Receiving the FD, exiting");
892 DBG("received fds on sock");
896 DBG("kconsumerd_thread_receive_fds exiting");
899 * when all fds have hung up, the polling thread
905 * 2s of grace period, if no polling events occur during
906 * this period, the polling thread will exit even if there
907 * are still open FDs (should not happen, but safety mechanism).
909 kconsumerd_poll_timeout
= LTTNG_KCONSUMERD_POLL_GRACE_PERIOD
;
911 /* wake up the polling thread */
912 ret
= write(ctx
->kconsumerd_poll_pipe
[1], "4", 1);
914 perror("poll pipe write");
920 * Close all the tracefiles and stream fds, should be called when all instances
923 void lttng_kconsumerd_cleanup(void)
925 struct lttng_kconsumerd_fd
*iter
, *tmp
;
928 * close all outfd. Called when there are no more threads
929 * running (after joining on the threads), no need to protect
930 * list iteration with mutex.
932 cds_list_for_each_entry_safe(iter
, tmp
,
933 &kconsumerd_data
.fd_list
.head
, list
) {
934 kconsumerd_del_fd(iter
);
939 * Called from signal handler.
941 void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data
*ctx
)
945 ret
= write(ctx
->kconsumerd_should_quit
[1], "4", 1);
947 perror("write kconsumerd quit");
952 * Send return code to the session daemon.
954 int lttng_kconsumerd_send_error(
955 struct lttng_kconsumerd_local_data
*ctx
, int cmd
)
957 if (ctx
->kconsumerd_error_socket
> 0) {
958 return lttcomm_send_unix_sock(ctx
->kconsumerd_error_socket
, &cmd
,
959 sizeof(enum lttcomm_sessiond_command
));