2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/relayd.h>
34 #include <common/sessiond-comm/sessiond-comm.h>
35 #include <common/kernel-consumer/kernel-consumer.h>
36 #include <common/relayd/relayd.h>
37 #include <common/ust-consumer/ust-consumer.h>
41 struct lttng_consumer_global_data consumer_data
= {
44 .type
= LTTNG_CONSUMER_UNKNOWN
,
47 /* timeout parameter, to control the polling thread grace period. */
48 int consumer_poll_timeout
= -1;
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
56 volatile int consumer_quit
= 0;
59 * Find a stream. The consumer_data.lock must be locked during this
62 static struct lttng_consumer_stream
*consumer_find_stream(int key
)
64 struct lttng_ht_iter iter
;
65 struct lttng_ht_node_ulong
*node
;
66 struct lttng_consumer_stream
*stream
= NULL
;
68 /* Negative keys are lookup failures */
74 lttng_ht_lookup(consumer_data
.stream_ht
, (void *)((unsigned long) key
),
76 node
= lttng_ht_iter_get_node_ulong(&iter
);
78 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
86 static void consumer_steal_stream_key(int key
)
88 struct lttng_consumer_stream
*stream
;
91 stream
= consumer_find_stream(key
);
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
99 stream
->node
.key
= -1;
104 static struct lttng_consumer_channel
*consumer_find_channel(int key
)
106 struct lttng_ht_iter iter
;
107 struct lttng_ht_node_ulong
*node
;
108 struct lttng_consumer_channel
*channel
= NULL
;
110 /* Negative keys are lookup failures */
116 lttng_ht_lookup(consumer_data
.channel_ht
, (void *)((unsigned long) key
),
118 node
= lttng_ht_iter_get_node_ulong(&iter
);
120 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
128 static void consumer_steal_channel_key(int key
)
130 struct lttng_consumer_channel
*channel
;
133 channel
= consumer_find_channel(key
);
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
141 channel
->node
.key
= -1;
147 void consumer_free_stream(struct rcu_head
*head
)
149 struct lttng_ht_node_ulong
*node
=
150 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
151 struct lttng_consumer_stream
*stream
=
152 caa_container_of(node
, struct lttng_consumer_stream
, node
);
158 * RCU protected relayd socket pair free.
160 static void consumer_rcu_free_relayd(struct rcu_head
*head
)
162 struct lttng_ht_node_ulong
*node
=
163 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
164 struct consumer_relayd_sock_pair
*relayd
=
165 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
171 * Destroy and free relayd socket pair object.
173 * This function MUST be called with the consumer_data lock acquired.
175 void consumer_destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
178 struct lttng_ht_iter iter
;
180 if (relayd
== NULL
) {
184 DBG("Consumer destroy and close relayd socket pair");
186 iter
.iter
.node
= &relayd
->node
.node
;
187 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
189 /* We assume the relayd was already destroyed */
193 /* Close all sockets */
194 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
195 (void) relayd_close(&relayd
->control_sock
);
196 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
197 (void) relayd_close(&relayd
->data_sock
);
199 /* RCU free() call */
200 call_rcu(&relayd
->node
.head
, consumer_rcu_free_relayd
);
204 * Remove a stream from the global list protected by a mutex. This
205 * function is also responsible for freeing its data structures.
207 void consumer_del_stream(struct lttng_consumer_stream
*stream
)
210 struct lttng_ht_iter iter
;
211 struct lttng_consumer_channel
*free_chan
= NULL
;
212 struct consumer_relayd_sock_pair
*relayd
;
216 pthread_mutex_lock(&consumer_data
.lock
);
218 switch (consumer_data
.type
) {
219 case LTTNG_CONSUMER_KERNEL
:
220 if (stream
->mmap_base
!= NULL
) {
221 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
227 case LTTNG_CONSUMER32_UST
:
228 case LTTNG_CONSUMER64_UST
:
229 lttng_ustconsumer_del_stream(stream
);
232 ERR("Unknown consumer_data type");
238 iter
.iter
.node
= &stream
->node
.node
;
239 ret
= lttng_ht_del(consumer_data
.stream_ht
, &iter
);
244 if (consumer_data
.stream_count
<= 0) {
247 consumer_data
.stream_count
--;
251 if (stream
->out_fd
>= 0) {
252 ret
= close(stream
->out_fd
);
257 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
258 ret
= close(stream
->wait_fd
);
263 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
264 ret
= close(stream
->shm_fd
);
270 /* Check and cleanup relayd */
272 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
273 if (relayd
!= NULL
) {
274 uatomic_dec(&relayd
->refcount
);
275 assert(uatomic_read(&relayd
->refcount
) >= 0);
277 ret
= relayd_send_close_stream(&relayd
->control_sock
,
278 stream
->relayd_stream_id
,
279 stream
->next_net_seq_num
- 1);
281 ERR("Unable to close stream on the relayd. Continuing");
282 /* Continue here. There is nothing we can do for the relayd.*/
285 /* Both conditions are met, we destroy the relayd. */
286 if (uatomic_read(&relayd
->refcount
) == 0 &&
287 uatomic_read(&relayd
->destroy_flag
)) {
288 consumer_destroy_relayd(relayd
);
293 if (!--stream
->chan
->refcount
) {
294 free_chan
= stream
->chan
;
298 call_rcu(&stream
->node
.head
, consumer_free_stream
);
300 consumer_data
.need_update
= 1;
301 pthread_mutex_unlock(&consumer_data
.lock
);
304 consumer_del_channel(free_chan
);
307 struct lttng_consumer_stream
*consumer_allocate_stream(
308 int channel_key
, int stream_key
,
309 int shm_fd
, int wait_fd
,
310 enum lttng_consumer_stream_state state
,
312 enum lttng_event_output output
,
313 const char *path_name
,
319 struct lttng_consumer_stream
*stream
;
322 stream
= zmalloc(sizeof(*stream
));
323 if (stream
== NULL
) {
324 perror("malloc struct lttng_consumer_stream");
327 stream
->chan
= consumer_find_channel(channel_key
);
329 perror("Unable to find channel key");
332 stream
->chan
->refcount
++;
333 stream
->key
= stream_key
;
334 stream
->shm_fd
= shm_fd
;
335 stream
->wait_fd
= wait_fd
;
337 stream
->out_fd_offset
= 0;
338 stream
->state
= state
;
339 stream
->mmap_len
= mmap_len
;
340 stream
->mmap_base
= NULL
;
341 stream
->output
= output
;
344 stream
->net_seq_idx
= net_index
;
345 stream
->metadata_flag
= metadata_flag
;
346 strncpy(stream
->path_name
, path_name
, sizeof(stream
->path_name
));
347 stream
->path_name
[sizeof(stream
->path_name
) - 1] = '\0';
348 lttng_ht_node_init_ulong(&stream
->node
, stream
->key
);
349 lttng_ht_node_init_ulong(&stream
->waitfd_node
, stream
->wait_fd
);
351 switch (consumer_data
.type
) {
352 case LTTNG_CONSUMER_KERNEL
:
354 case LTTNG_CONSUMER32_UST
:
355 case LTTNG_CONSUMER64_UST
:
356 stream
->cpu
= stream
->chan
->cpucount
++;
357 ret
= lttng_ustconsumer_allocate_stream(stream
);
364 ERR("Unknown consumer_data type");
368 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
369 stream
->path_name
, stream
->key
,
372 (unsigned long long) stream
->mmap_len
,
374 stream
->net_seq_idx
);
380 * Add a stream to the global list protected by a mutex.
382 int consumer_add_stream(struct lttng_consumer_stream
*stream
)
385 struct lttng_ht_node_ulong
*node
;
386 struct lttng_ht_iter iter
;
387 struct consumer_relayd_sock_pair
*relayd
;
389 pthread_mutex_lock(&consumer_data
.lock
);
390 /* Steal stream identifier, for UST */
391 consumer_steal_stream_key(stream
->key
);
394 lttng_ht_lookup(consumer_data
.stream_ht
,
395 (void *)((unsigned long) stream
->key
), &iter
);
396 node
= lttng_ht_iter_get_node_ulong(&iter
);
399 /* Stream already exist. Ignore the insertion */
403 lttng_ht_add_unique_ulong(consumer_data
.stream_ht
, &stream
->node
);
405 /* Check and cleanup relayd */
406 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
407 if (relayd
!= NULL
) {
408 uatomic_inc(&relayd
->refcount
);
412 /* Update consumer data */
413 consumer_data
.stream_count
++;
414 consumer_data
.need_update
= 1;
416 switch (consumer_data
.type
) {
417 case LTTNG_CONSUMER_KERNEL
:
419 case LTTNG_CONSUMER32_UST
:
420 case LTTNG_CONSUMER64_UST
:
421 /* Streams are in CPU number order (we rely on this) */
422 stream
->cpu
= stream
->chan
->nr_streams
++;
425 ERR("Unknown consumer_data type");
431 pthread_mutex_unlock(&consumer_data
.lock
);
437 * Add relayd socket to global consumer data hashtable.
439 int consumer_add_relayd(struct consumer_relayd_sock_pair
*relayd
)
442 struct lttng_ht_node_ulong
*node
;
443 struct lttng_ht_iter iter
;
445 if (relayd
== NULL
) {
452 lttng_ht_lookup(consumer_data
.relayd_ht
,
453 (void *)((unsigned long) relayd
->net_seq_idx
), &iter
);
454 node
= lttng_ht_iter_get_node_ulong(&iter
);
457 /* Relayd already exist. Ignore the insertion */
460 lttng_ht_add_unique_ulong(consumer_data
.relayd_ht
, &relayd
->node
);
469 * Allocate and return a consumer relayd socket.
471 struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
474 struct consumer_relayd_sock_pair
*obj
= NULL
;
476 /* Negative net sequence index is a failure */
477 if (net_seq_idx
< 0) {
481 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
483 PERROR("zmalloc relayd sock");
487 obj
->net_seq_idx
= net_seq_idx
;
489 obj
->destroy_flag
= 0;
490 lttng_ht_node_init_ulong(&obj
->node
, obj
->net_seq_idx
);
491 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
498 * Find a relayd socket pair in the global consumer data.
500 * Return the object if found else NULL.
501 * RCU read-side lock must be held across this call and while using the
504 struct consumer_relayd_sock_pair
*consumer_find_relayd(int key
)
506 struct lttng_ht_iter iter
;
507 struct lttng_ht_node_ulong
*node
;
508 struct consumer_relayd_sock_pair
*relayd
= NULL
;
510 /* Negative keys are lookup failures */
515 lttng_ht_lookup(consumer_data
.relayd_ht
, (void *)((unsigned long) key
),
517 node
= lttng_ht_iter_get_node_ulong(&iter
);
519 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
527 * Handle stream for relayd transmission if the stream applies for network
528 * streaming where the net sequence index is set.
530 * Return destination file descriptor or negative value on error.
532 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream
*stream
,
536 struct consumer_relayd_sock_pair
*relayd
;
537 struct lttcomm_relayd_data_hdr data_hdr
;
542 /* Reset data header */
543 memset(&data_hdr
, 0, sizeof(data_hdr
));
546 /* Get relayd reference of the stream. */
547 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
548 if (relayd
== NULL
) {
549 /* Stream is either local or corrupted */
553 DBG("Consumer found relayd socks with index %d", stream
->net_seq_idx
);
554 if (stream
->metadata_flag
) {
555 /* Caller MUST acquire the relayd control socket lock */
556 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
561 /* Metadata are always sent on the control socket. */
562 outfd
= relayd
->control_sock
.fd
;
564 /* Set header with stream information */
565 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
566 data_hdr
.data_size
= htobe32(data_size
);
567 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
++);
568 /* Other fields are zeroed previously */
570 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
576 /* Set to go on data socket */
577 outfd
= relayd
->data_sock
.fd
;
586 * Update a stream according to what we just received.
588 void consumer_change_stream_state(int stream_key
,
589 enum lttng_consumer_stream_state state
)
591 struct lttng_consumer_stream
*stream
;
593 pthread_mutex_lock(&consumer_data
.lock
);
594 stream
= consumer_find_stream(stream_key
);
596 stream
->state
= state
;
598 consumer_data
.need_update
= 1;
599 pthread_mutex_unlock(&consumer_data
.lock
);
603 void consumer_free_channel(struct rcu_head
*head
)
605 struct lttng_ht_node_ulong
*node
=
606 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
607 struct lttng_consumer_channel
*channel
=
608 caa_container_of(node
, struct lttng_consumer_channel
, node
);
614 * Remove a channel from the global list protected by a mutex. This
615 * function is also responsible for freeing its data structures.
617 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
620 struct lttng_ht_iter iter
;
622 pthread_mutex_lock(&consumer_data
.lock
);
624 switch (consumer_data
.type
) {
625 case LTTNG_CONSUMER_KERNEL
:
627 case LTTNG_CONSUMER32_UST
:
628 case LTTNG_CONSUMER64_UST
:
629 lttng_ustconsumer_del_channel(channel
);
632 ERR("Unknown consumer_data type");
638 iter
.iter
.node
= &channel
->node
.node
;
639 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
643 if (channel
->mmap_base
!= NULL
) {
644 ret
= munmap(channel
->mmap_base
, channel
->mmap_len
);
649 if (channel
->wait_fd
>= 0 && !channel
->wait_fd_is_copy
) {
650 ret
= close(channel
->wait_fd
);
655 if (channel
->shm_fd
>= 0 && channel
->wait_fd
!= channel
->shm_fd
) {
656 ret
= close(channel
->shm_fd
);
662 call_rcu(&channel
->node
.head
, consumer_free_channel
);
664 pthread_mutex_unlock(&consumer_data
.lock
);
667 struct lttng_consumer_channel
*consumer_allocate_channel(
669 int shm_fd
, int wait_fd
,
671 uint64_t max_sb_size
)
673 struct lttng_consumer_channel
*channel
;
676 channel
= zmalloc(sizeof(*channel
));
677 if (channel
== NULL
) {
678 perror("malloc struct lttng_consumer_channel");
681 channel
->key
= channel_key
;
682 channel
->shm_fd
= shm_fd
;
683 channel
->wait_fd
= wait_fd
;
684 channel
->mmap_len
= mmap_len
;
685 channel
->max_sb_size
= max_sb_size
;
686 channel
->refcount
= 0;
687 channel
->nr_streams
= 0;
688 lttng_ht_node_init_ulong(&channel
->node
, channel
->key
);
690 switch (consumer_data
.type
) {
691 case LTTNG_CONSUMER_KERNEL
:
692 channel
->mmap_base
= NULL
;
693 channel
->mmap_len
= 0;
695 case LTTNG_CONSUMER32_UST
:
696 case LTTNG_CONSUMER64_UST
:
697 ret
= lttng_ustconsumer_allocate_channel(channel
);
704 ERR("Unknown consumer_data type");
708 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
709 channel
->key
, channel
->shm_fd
, channel
->wait_fd
,
710 (unsigned long long) channel
->mmap_len
,
711 (unsigned long long) channel
->max_sb_size
);
717 * Add a channel to the global list protected by a mutex.
719 int consumer_add_channel(struct lttng_consumer_channel
*channel
)
721 struct lttng_ht_node_ulong
*node
;
722 struct lttng_ht_iter iter
;
724 pthread_mutex_lock(&consumer_data
.lock
);
725 /* Steal channel identifier, for UST */
726 consumer_steal_channel_key(channel
->key
);
729 lttng_ht_lookup(consumer_data
.channel_ht
,
730 (void *)((unsigned long) channel
->key
), &iter
);
731 node
= lttng_ht_iter_get_node_ulong(&iter
);
733 /* Channel already exist. Ignore the insertion */
737 lttng_ht_add_unique_ulong(consumer_data
.channel_ht
, &channel
->node
);
741 pthread_mutex_unlock(&consumer_data
.lock
);
747 * Allocate the pollfd structure and the local view of the out fds to avoid
748 * doing a lookup in the linked list and concurrency issues when writing is
749 * needed. Called with consumer_data.lock held.
751 * Returns the number of fds in the structures.
753 int consumer_update_poll_array(
754 struct lttng_consumer_local_data
*ctx
, struct pollfd
**pollfd
,
755 struct lttng_consumer_stream
**local_stream
,
756 struct lttng_ht
*metadata_ht
)
759 struct lttng_ht_iter iter
;
760 struct lttng_consumer_stream
*stream
;
762 DBG("Updating poll fd array");
764 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, stream
,
766 if (stream
->state
!= LTTNG_CONSUMER_ACTIVE_STREAM
) {
769 DBG("Active FD %d", stream
->wait_fd
);
770 (*pollfd
)[i
].fd
= stream
->wait_fd
;
771 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
772 if (stream
->metadata_flag
&& metadata_ht
) {
773 lttng_ht_add_unique_ulong(metadata_ht
, &stream
->waitfd_node
);
774 DBG("Active FD added to metadata hash table");
776 local_stream
[i
] = stream
;
782 * Insert the consumer_poll_pipe at the end of the array and don't
783 * increment i so nb_fd is the number of real FD.
785 (*pollfd
)[i
].fd
= ctx
->consumer_poll_pipe
[0];
786 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
791 * Poll on the should_quit pipe and the command socket return -1 on error and
792 * should exit, 0 if data is available on the command socket
794 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
799 num_rdy
= poll(consumer_sockpoll
, 2, -1);
802 * Restart interrupted system call.
804 if (errno
== EINTR
) {
807 perror("Poll error");
810 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
811 DBG("consumer_should_quit wake up");
821 * Set the error socket.
823 void lttng_consumer_set_error_sock(
824 struct lttng_consumer_local_data
*ctx
, int sock
)
826 ctx
->consumer_error_socket
= sock
;
830 * Set the command socket path.
832 void lttng_consumer_set_command_sock_path(
833 struct lttng_consumer_local_data
*ctx
, char *sock
)
835 ctx
->consumer_command_sock_path
= sock
;
839 * Send return code to the session daemon.
840 * If the socket is not defined, we return 0, it is not a fatal error
842 int lttng_consumer_send_error(
843 struct lttng_consumer_local_data
*ctx
, int cmd
)
845 if (ctx
->consumer_error_socket
> 0) {
846 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
847 sizeof(enum lttcomm_sessiond_command
));
854 * Close all the tracefiles and stream fds, should be called when all instances
857 void lttng_consumer_cleanup(void)
859 struct lttng_ht_iter iter
;
860 struct lttng_ht_node_ulong
*node
;
865 * close all outfd. Called when there are no more threads running (after
866 * joining on the threads), no need to protect list iteration with mutex.
868 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, node
,
870 struct lttng_consumer_stream
*stream
=
871 caa_container_of(node
, struct lttng_consumer_stream
, node
);
872 consumer_del_stream(stream
);
875 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, node
,
877 struct lttng_consumer_channel
*channel
=
878 caa_container_of(node
, struct lttng_consumer_channel
, node
);
879 consumer_del_channel(channel
);
884 lttng_ht_destroy(consumer_data
.stream_ht
);
885 lttng_ht_destroy(consumer_data
.channel_ht
);
889 * Called from signal handler.
891 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
896 ret
= write(ctx
->consumer_should_quit
[1], "4", 1);
897 } while (ret
< 0 && errno
== EINTR
);
899 perror("write consumer quit");
903 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
906 int outfd
= stream
->out_fd
;
909 * This does a blocking write-and-wait on any page that belongs to the
910 * subbuffer prior to the one we just wrote.
911 * Don't care about error values, as these are just hints and ways to
912 * limit the amount of page cache used.
914 if (orig_offset
< stream
->chan
->max_sb_size
) {
917 lttng_sync_file_range(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
918 stream
->chan
->max_sb_size
,
919 SYNC_FILE_RANGE_WAIT_BEFORE
920 | SYNC_FILE_RANGE_WRITE
921 | SYNC_FILE_RANGE_WAIT_AFTER
);
923 * Give hints to the kernel about how we access the file:
924 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
927 * We need to call fadvise again after the file grows because the
928 * kernel does not seem to apply fadvise to non-existing parts of the
931 * Call fadvise _after_ having waited for the page writeback to
932 * complete because the dirty page writeback semantic is not well
933 * defined. So it can be expected to lead to lower throughput in
936 posix_fadvise(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
937 stream
->chan
->max_sb_size
, POSIX_FADV_DONTNEED
);
941 * Initialise the necessary environnement :
942 * - create a new context
943 * - create the poll_pipe
944 * - create the should_quit pipe (for signal handler)
945 * - create the thread pipe (for splice)
947 * Takes a function pointer as argument, this function is called when data is
948 * available on a buffer. This function is responsible to do the
949 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
950 * buffer configuration and then kernctl_put_next_subbuf at the end.
952 * Returns a pointer to the new context or NULL on error.
954 struct lttng_consumer_local_data
*lttng_consumer_create(
955 enum lttng_consumer_type type
,
956 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
957 struct lttng_consumer_local_data
*ctx
),
958 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
959 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
960 int (*update_stream
)(int stream_key
, uint32_t state
))
963 struct lttng_consumer_local_data
*ctx
;
965 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
966 consumer_data
.type
== type
);
967 consumer_data
.type
= type
;
969 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
971 perror("allocating context");
975 ctx
->consumer_error_socket
= -1;
976 /* assign the callbacks */
977 ctx
->on_buffer_ready
= buffer_ready
;
978 ctx
->on_recv_channel
= recv_channel
;
979 ctx
->on_recv_stream
= recv_stream
;
980 ctx
->on_update_stream
= update_stream
;
982 ret
= pipe(ctx
->consumer_poll_pipe
);
984 perror("Error creating poll pipe");
985 goto error_poll_pipe
;
988 /* set read end of the pipe to non-blocking */
989 ret
= fcntl(ctx
->consumer_poll_pipe
[0], F_SETFL
, O_NONBLOCK
);
991 perror("fcntl O_NONBLOCK");
992 goto error_poll_fcntl
;
995 /* set write end of the pipe to non-blocking */
996 ret
= fcntl(ctx
->consumer_poll_pipe
[1], F_SETFL
, O_NONBLOCK
);
998 perror("fcntl O_NONBLOCK");
999 goto error_poll_fcntl
;
1002 ret
= pipe(ctx
->consumer_should_quit
);
1004 perror("Error creating recv pipe");
1005 goto error_quit_pipe
;
1008 ret
= pipe(ctx
->consumer_thread_pipe
);
1010 perror("Error creating thread pipe");
1011 goto error_thread_pipe
;
1018 for (i
= 0; i
< 2; i
++) {
1021 err
= close(ctx
->consumer_should_quit
[i
]);
1028 for (i
= 0; i
< 2; i
++) {
1031 err
= close(ctx
->consumer_poll_pipe
[i
]);
1043 * Close all fds associated with the instance and free the context.
1045 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1049 ret
= close(ctx
->consumer_error_socket
);
1053 ret
= close(ctx
->consumer_thread_pipe
[0]);
1057 ret
= close(ctx
->consumer_thread_pipe
[1]);
1061 ret
= close(ctx
->consumer_poll_pipe
[0]);
1065 ret
= close(ctx
->consumer_poll_pipe
[1]);
1069 ret
= close(ctx
->consumer_should_quit
[0]);
1073 ret
= close(ctx
->consumer_should_quit
[1]);
1077 unlink(ctx
->consumer_command_sock_path
);
1082 * Mmap the ring buffer, read it and write the data to the tracefile.
1084 * Returns the number of bytes written
1086 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1087 struct lttng_consumer_local_data
*ctx
,
1088 struct lttng_consumer_stream
*stream
, unsigned long len
)
1090 switch (consumer_data
.type
) {
1091 case LTTNG_CONSUMER_KERNEL
:
1092 return lttng_kconsumer_on_read_subbuffer_mmap(ctx
, stream
, len
);
1093 case LTTNG_CONSUMER32_UST
:
1094 case LTTNG_CONSUMER64_UST
:
1095 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx
, stream
, len
);
1097 ERR("Unknown consumer_data type");
1105 * Splice the data from the ring buffer to the tracefile.
1107 * Returns the number of bytes spliced.
1109 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1110 struct lttng_consumer_local_data
*ctx
,
1111 struct lttng_consumer_stream
*stream
, unsigned long len
)
1113 switch (consumer_data
.type
) {
1114 case LTTNG_CONSUMER_KERNEL
:
1115 return lttng_kconsumer_on_read_subbuffer_splice(ctx
, stream
, len
);
1116 case LTTNG_CONSUMER32_UST
:
1117 case LTTNG_CONSUMER64_UST
:
1120 ERR("Unknown consumer_data type");
1128 * Take a snapshot for a specific fd
1130 * Returns 0 on success, < 0 on error
1132 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data
*ctx
,
1133 struct lttng_consumer_stream
*stream
)
1135 switch (consumer_data
.type
) {
1136 case LTTNG_CONSUMER_KERNEL
:
1137 return lttng_kconsumer_take_snapshot(ctx
, stream
);
1138 case LTTNG_CONSUMER32_UST
:
1139 case LTTNG_CONSUMER64_UST
:
1140 return lttng_ustconsumer_take_snapshot(ctx
, stream
);
1142 ERR("Unknown consumer_data type");
1150 * Get the produced position
1152 * Returns 0 on success, < 0 on error
1154 int lttng_consumer_get_produced_snapshot(
1155 struct lttng_consumer_local_data
*ctx
,
1156 struct lttng_consumer_stream
*stream
,
1159 switch (consumer_data
.type
) {
1160 case LTTNG_CONSUMER_KERNEL
:
1161 return lttng_kconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1162 case LTTNG_CONSUMER32_UST
:
1163 case LTTNG_CONSUMER64_UST
:
1164 return lttng_ustconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1166 ERR("Unknown consumer_data type");
1172 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
1173 int sock
, struct pollfd
*consumer_sockpoll
)
1175 switch (consumer_data
.type
) {
1176 case LTTNG_CONSUMER_KERNEL
:
1177 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1178 case LTTNG_CONSUMER32_UST
:
1179 case LTTNG_CONSUMER64_UST
:
1180 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1182 ERR("Unknown consumer_data type");
1189 * This thread polls the fds in the set to consume the data and write
1190 * it to tracefile if necessary.
1192 void *lttng_consumer_thread_poll_fds(void *data
)
1194 int num_rdy
, num_hup
, high_prio
, ret
, i
;
1195 struct pollfd
*pollfd
= NULL
;
1196 /* local view of the streams */
1197 struct lttng_consumer_stream
**local_stream
= NULL
;
1198 /* local view of consumer_data.fds_count */
1200 struct lttng_consumer_local_data
*ctx
= data
;
1201 struct lttng_ht
*metadata_ht
;
1202 struct lttng_ht_iter iter
;
1203 struct lttng_ht_node_ulong
*node
;
1204 struct lttng_consumer_stream
*metadata_stream
;
1207 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1209 rcu_register_thread();
1211 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
1218 * the fds set has been updated, we need to update our
1219 * local array as well
1221 pthread_mutex_lock(&consumer_data
.lock
);
1222 if (consumer_data
.need_update
) {
1223 if (pollfd
!= NULL
) {
1227 if (local_stream
!= NULL
) {
1229 local_stream
= NULL
;
1232 /* allocate for all fds + 1 for the consumer_poll_pipe */
1233 pollfd
= zmalloc((consumer_data
.stream_count
+ 1) * sizeof(struct pollfd
));
1234 if (pollfd
== NULL
) {
1235 perror("pollfd malloc");
1236 pthread_mutex_unlock(&consumer_data
.lock
);
1240 /* allocate for all fds + 1 for the consumer_poll_pipe */
1241 local_stream
= zmalloc((consumer_data
.stream_count
+ 1) *
1242 sizeof(struct lttng_consumer_stream
));
1243 if (local_stream
== NULL
) {
1244 perror("local_stream malloc");
1245 pthread_mutex_unlock(&consumer_data
.lock
);
1248 ret
= consumer_update_poll_array(ctx
, &pollfd
, local_stream
,
1251 ERR("Error in allocating pollfd or local_outfds");
1252 lttng_consumer_send_error(ctx
, CONSUMERD_POLL_ERROR
);
1253 pthread_mutex_unlock(&consumer_data
.lock
);
1257 consumer_data
.need_update
= 0;
1259 pthread_mutex_unlock(&consumer_data
.lock
);
1261 /* No FDs and consumer_quit, consumer_cleanup the thread */
1262 if (nb_fd
== 0 && consumer_quit
== 1) {
1265 /* poll on the array of fds */
1267 DBG("polling on %d fd", nb_fd
+ 1);
1268 num_rdy
= poll(pollfd
, nb_fd
+ 1, consumer_poll_timeout
);
1269 DBG("poll num_rdy : %d", num_rdy
);
1270 if (num_rdy
== -1) {
1272 * Restart interrupted system call.
1274 if (errno
== EINTR
) {
1277 perror("Poll error");
1278 lttng_consumer_send_error(ctx
, CONSUMERD_POLL_ERROR
);
1280 } else if (num_rdy
== 0) {
1281 DBG("Polling thread timed out");
1286 * If the consumer_poll_pipe triggered poll go directly to the
1287 * beginning of the loop to update the array. We want to prioritize
1288 * array update over low-priority reads.
1290 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
1291 size_t pipe_readlen
;
1294 DBG("consumer_poll_pipe wake up");
1295 /* Consume 1 byte of pipe data */
1297 pipe_readlen
= read(ctx
->consumer_poll_pipe
[0], &tmp
, 1);
1298 } while (pipe_readlen
== -1 && errno
== EINTR
);
1302 /* Take care of high priority channels first. */
1303 for (i
= 0; i
< nb_fd
; i
++) {
1304 /* Lookup for metadata which is the highest priority */
1305 lttng_ht_lookup(metadata_ht
,
1306 (void *)((unsigned long) pollfd
[i
].fd
), &iter
);
1307 node
= lttng_ht_iter_get_node_ulong(&iter
);
1309 (pollfd
[i
].revents
& (POLLIN
| POLLPRI
))) {
1310 DBG("Urgent metadata read on fd %d", pollfd
[i
].fd
);
1311 metadata_stream
= caa_container_of(node
,
1312 struct lttng_consumer_stream
, waitfd_node
);
1314 len
= ctx
->on_buffer_ready(metadata_stream
, ctx
);
1315 /* it's ok to have an unavailable sub-buffer */
1316 if (len
< 0 && len
!= -EAGAIN
) {
1318 } else if (len
> 0) {
1319 metadata_stream
->data_read
= 1;
1321 } else if (pollfd
[i
].revents
& POLLPRI
) {
1322 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
1324 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1325 /* it's ok to have an unavailable sub-buffer */
1326 if (len
< 0 && len
!= -EAGAIN
) {
1328 } else if (len
> 0) {
1329 local_stream
[i
]->data_read
= 1;
1335 * If we read high prio channel in this loop, try again
1336 * for more high prio data.
1342 /* Take care of low priority channels. */
1343 for (i
= 0; i
< nb_fd
; i
++) {
1344 if ((pollfd
[i
].revents
& POLLIN
) ||
1345 local_stream
[i
]->hangup_flush_done
) {
1346 DBG("Normal read on fd %d", pollfd
[i
].fd
);
1347 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1348 /* it's ok to have an unavailable sub-buffer */
1349 if (len
< 0 && len
!= -EAGAIN
) {
1351 } else if (len
> 0) {
1352 local_stream
[i
]->data_read
= 1;
1357 /* Handle hangup and errors */
1358 for (i
= 0; i
< nb_fd
; i
++) {
1359 if (!local_stream
[i
]->hangup_flush_done
1360 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
1361 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
1362 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
1363 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1365 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
1366 /* Attempt read again, for the data we just flushed. */
1367 local_stream
[i
]->data_read
= 1;
1370 * If the poll flag is HUP/ERR/NVAL and we have
1371 * read no data in this pass, we can remove the
1372 * stream from its hash table.
1374 if ((pollfd
[i
].revents
& POLLHUP
)) {
1375 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
1376 if (!local_stream
[i
]->data_read
) {
1377 if (local_stream
[i
]->metadata_flag
) {
1378 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1379 ret
= lttng_ht_del(metadata_ht
, &iter
);
1382 consumer_del_stream(local_stream
[i
]);
1385 } else if (pollfd
[i
].revents
& POLLERR
) {
1386 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
1387 if (!local_stream
[i
]->data_read
) {
1388 if (local_stream
[i
]->metadata_flag
) {
1389 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1390 ret
= lttng_ht_del(metadata_ht
, &iter
);
1393 consumer_del_stream(local_stream
[i
]);
1396 } else if (pollfd
[i
].revents
& POLLNVAL
) {
1397 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
1398 if (!local_stream
[i
]->data_read
) {
1399 if (local_stream
[i
]->metadata_flag
) {
1400 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1401 ret
= lttng_ht_del(metadata_ht
, &iter
);
1404 consumer_del_stream(local_stream
[i
]);
1408 local_stream
[i
]->data_read
= 0;
1412 DBG("polling thread exiting");
1413 if (pollfd
!= NULL
) {
1417 if (local_stream
!= NULL
) {
1419 local_stream
= NULL
;
1421 rcu_unregister_thread();
1426 * This thread listens on the consumerd socket and receives the file
1427 * descriptors from the session daemon.
1429 void *lttng_consumer_thread_receive_fds(void *data
)
1431 int sock
, client_socket
, ret
;
1433 * structure to poll for incoming data on communication socket avoids
1434 * making blocking sockets.
1436 struct pollfd consumer_sockpoll
[2];
1437 struct lttng_consumer_local_data
*ctx
= data
;
1439 rcu_register_thread();
1441 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
1442 unlink(ctx
->consumer_command_sock_path
);
1443 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
1444 if (client_socket
< 0) {
1445 ERR("Cannot create command socket");
1449 ret
= lttcomm_listen_unix_sock(client_socket
);
1454 DBG("Sending ready command to lttng-sessiond");
1455 ret
= lttng_consumer_send_error(ctx
, CONSUMERD_COMMAND_SOCK_READY
);
1456 /* return < 0 on error, but == 0 is not fatal */
1458 ERR("Error sending ready command to lttng-sessiond");
1462 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
1464 perror("fcntl O_NONBLOCK");
1468 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1469 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
1470 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
1471 consumer_sockpoll
[1].fd
= client_socket
;
1472 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
1474 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
1477 DBG("Connection on client_socket");
1479 /* Blocking call, waiting for transmission */
1480 sock
= lttcomm_accept_unix_sock(client_socket
);
1485 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
1487 perror("fcntl O_NONBLOCK");
1491 /* update the polling structure to poll on the established socket */
1492 consumer_sockpoll
[1].fd
= sock
;
1493 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
1496 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
1499 DBG("Incoming command on sock");
1500 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1501 if (ret
== -ENOENT
) {
1502 DBG("Received STOP command");
1506 ERR("Communication interrupted on command socket");
1509 if (consumer_quit
) {
1510 DBG("consumer_thread_receive_fds received quit from signal");
1513 DBG("received fds on sock");
1516 DBG("consumer_thread_receive_fds exiting");
1519 * when all fds have hung up, the polling thread
1525 * 2s of grace period, if no polling events occur during
1526 * this period, the polling thread will exit even if there
1527 * are still open FDs (should not happen, but safety mechanism).
1529 consumer_poll_timeout
= LTTNG_CONSUMER_POLL_TIMEOUT
;
1532 * Wake-up the other end by writing a null byte in the pipe
1533 * (non-blocking). Important note: Because writing into the
1534 * pipe is non-blocking (and therefore we allow dropping wakeup
1535 * data, as long as there is wakeup data present in the pipe
1536 * buffer to wake up the other end), the other end should
1537 * perform the following sequence for waiting:
1538 * 1) empty the pipe (reads).
1539 * 2) perform update operation.
1540 * 3) wait on the pipe (poll).
1543 ret
= write(ctx
->consumer_poll_pipe
[1], "", 1);
1544 } while (ret
< 0 && errno
== EINTR
);
1545 rcu_unregister_thread();
1549 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
1550 struct lttng_consumer_local_data
*ctx
)
1552 switch (consumer_data
.type
) {
1553 case LTTNG_CONSUMER_KERNEL
:
1554 return lttng_kconsumer_read_subbuffer(stream
, ctx
);
1555 case LTTNG_CONSUMER32_UST
:
1556 case LTTNG_CONSUMER64_UST
:
1557 return lttng_ustconsumer_read_subbuffer(stream
, ctx
);
1559 ERR("Unknown consumer_data type");
1565 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
1567 switch (consumer_data
.type
) {
1568 case LTTNG_CONSUMER_KERNEL
:
1569 return lttng_kconsumer_on_recv_stream(stream
);
1570 case LTTNG_CONSUMER32_UST
:
1571 case LTTNG_CONSUMER64_UST
:
1572 return lttng_ustconsumer_on_recv_stream(stream
);
1574 ERR("Unknown consumer_data type");
1581 * Allocate and set consumer data hash tables.
1583 void lttng_consumer_init(void)
1585 consumer_data
.stream_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1586 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1587 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);