2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
5 * SPDX-License-Identifier: GPL-2.0-only
10 #include <common/common.hpp>
11 #include <common/compat/endian.hpp>
12 #include <common/consumer/consumer-stream.hpp>
13 #include <common/consumer/consumer-testpoint.hpp>
14 #include <common/consumer/consumer-timer.hpp>
15 #include <common/kernel-consumer/kernel-consumer.hpp>
16 #include <common/kernel-ctl/kernel-ctl.hpp>
17 #include <common/urcu.hpp>
18 #include <common/ust-consumer/ust-consumer.hpp>
20 #include <bin/lttng-consumerd/health-consumerd.hpp>
24 using sample_positions_cb
= int (*)(struct lttng_consumer_stream
*);
25 using get_consumed_cb
= int (*)(struct lttng_consumer_stream
*, unsigned long *);
26 using get_produced_cb
= int (*)(struct lttng_consumer_stream
*, unsigned long *);
27 using flush_index_cb
= int (*)(struct lttng_consumer_stream
*);
29 static struct timer_signal_data timer_signal
= {
33 .lock
= PTHREAD_MUTEX_INITIALIZER
,
37 * Set custom signal mask to current thread.
39 static void setmask(sigset_t
*mask
)
43 ret
= sigemptyset(mask
);
45 PERROR("sigemptyset");
47 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_SWITCH
);
49 PERROR("sigaddset switch");
51 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_TEARDOWN
);
53 PERROR("sigaddset teardown");
55 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_LIVE
);
57 PERROR("sigaddset live");
59 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_MONITOR
);
61 PERROR("sigaddset monitor");
63 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_EXIT
);
65 PERROR("sigaddset exit");
69 static int the_channel_monitor_pipe
= -1;
72 * Execute action on a timer switch.
74 * Beware: metadata_switch_timer() should *never* take a mutex also held
75 * while consumer_timer_switch_stop() is called. It would result in
78 static void metadata_switch_timer(struct lttng_consumer_local_data
*ctx
, siginfo_t
*si
)
81 struct lttng_consumer_channel
*channel
;
83 channel
= (lttng_consumer_channel
*) si
->si_value
.sival_ptr
;
84 LTTNG_ASSERT(channel
);
86 if (channel
->switch_timer_error
) {
90 DBG("Switch timer for channel %" PRIu64
, channel
->key
);
92 case LTTNG_CONSUMER32_UST
:
93 case LTTNG_CONSUMER64_UST
:
95 * Locks taken by lttng_ustconsumer_request_metadata():
96 * - metadata_socket_lock
97 * - Calling lttng_ustconsumer_recv_metadata():
98 * - channel->metadata_cache->lock
99 * - Calling consumer_metadata_cache_flushed():
100 * - channel->timer_lock
101 * - channel->metadata_cache->lock
103 * Ensure that neither consumer_data.lock nor
104 * channel->lock are taken within this function, since
105 * they are held while consumer_timer_switch_stop() is
108 ret
= lttng_ustconsumer_request_metadata(ctx
, channel
, 1, 1);
110 channel
->switch_timer_error
= 1;
113 case LTTNG_CONSUMER_KERNEL
:
114 case LTTNG_CONSUMER_UNKNOWN
:
120 static int send_empty_index(struct lttng_consumer_stream
*stream
, uint64_t ts
, uint64_t stream_id
)
123 struct ctf_packet_index index
;
125 memset(&index
, 0, sizeof(index
));
126 index
.stream_id
= htobe64(stream_id
);
127 index
.timestamp_end
= htobe64(ts
);
128 ret
= consumer_stream_write_index(stream
, &index
);
137 int consumer_flush_kernel_index(struct lttng_consumer_stream
*stream
)
139 uint64_t ts
, stream_id
;
142 ret
= kernctl_get_current_timestamp(stream
->wait_fd
, &ts
);
144 ERR("Failed to get the current timestamp");
147 ret
= kernctl_buffer_flush(stream
->wait_fd
);
149 ERR("Failed to flush kernel stream");
152 ret
= kernctl_snapshot(stream
->wait_fd
);
154 if (ret
!= -EAGAIN
&& ret
!= -ENODATA
) {
155 PERROR("live timer kernel snapshot");
159 ret
= kernctl_get_stream_id(stream
->wait_fd
, &stream_id
);
161 PERROR("kernctl_get_stream_id");
164 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
165 ret
= send_empty_index(stream
, ts
, stream_id
);
175 static int check_stream(struct lttng_consumer_stream
*stream
, flush_index_cb flush_index
)
180 * While holding the stream mutex, try to take a snapshot, if it
181 * succeeds, it means that data is ready to be sent, just let the data
182 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
183 * means that there is no data to read after the flush, so we can
184 * safely send the empty index.
186 * Doing a trylock and checking if waiting on metadata if
187 * trylock fails. Bail out of the stream is indeed waiting for
188 * metadata to be pushed. Busy wait on trylock otherwise.
191 ret
= pthread_mutex_trylock(&stream
->lock
);
194 break; /* We have the lock. */
196 pthread_mutex_lock(&stream
->metadata_timer_lock
);
197 if (stream
->waiting_on_metadata
) {
199 stream
->missed_metadata_flush
= true;
200 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
201 goto end
; /* Bail out. */
203 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
208 ERR("Unexpected pthread_mutex_trylock error %d", ret
);
214 ret
= flush_index(stream
);
215 pthread_mutex_unlock(&stream
->lock
);
220 int consumer_flush_ust_index(struct lttng_consumer_stream
*stream
)
222 uint64_t ts
, stream_id
;
225 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
230 ret
= lttng_ustconsumer_get_current_timestamp(stream
, &ts
);
232 ERR("Failed to get the current timestamp");
235 ret
= lttng_ustconsumer_flush_buffer(stream
, 1);
237 ERR("Failed to flush buffer while flushing index");
240 ret
= lttng_ustconsumer_take_snapshot(stream
);
242 if (ret
!= -EAGAIN
) {
243 ERR("Taking UST snapshot");
247 ret
= lttng_ustconsumer_get_stream_id(stream
, &stream_id
);
249 PERROR("lttng_ust_ctl_get_stream_id");
252 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
253 ret
= send_empty_index(stream
, ts
, stream_id
);
264 * Execute action on a live timer
266 static void live_timer(struct lttng_consumer_local_data
*ctx
, siginfo_t
*si
)
269 struct lttng_consumer_channel
*channel
;
270 struct lttng_consumer_stream
*stream
;
271 struct lttng_ht_iter iter
;
272 const struct lttng_ht
*ht
= the_consumer_data
.stream_per_chan_id_ht
;
273 const flush_index_cb flush_index
= ctx
->type
== LTTNG_CONSUMER_KERNEL
?
274 consumer_flush_kernel_index
:
275 consumer_flush_ust_index
;
277 channel
= (lttng_consumer_channel
*) si
->si_value
.sival_ptr
;
278 LTTNG_ASSERT(channel
);
280 if (channel
->switch_timer_error
) {
284 DBG("Live timer for channel %" PRIu64
, channel
->key
);
287 lttng::urcu::read_lock_guard read_lock
;
288 cds_lfht_for_each_entry_duplicate(ht
->ht
,
289 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
294 node_channel_id
.node
)
296 ret
= check_stream(stream
, flush_index
);
308 static void consumer_timer_signal_thread_qs(unsigned int signr
)
310 sigset_t pending_set
;
314 * We need to be the only thread interacting with the thread
315 * that manages signals for teardown synchronization.
317 pthread_mutex_lock(&timer_signal
.lock
);
319 /* Ensure we don't have any signal queued for this channel. */
321 ret
= sigemptyset(&pending_set
);
323 PERROR("sigemptyset");
325 ret
= sigpending(&pending_set
);
327 PERROR("sigpending");
329 if (!sigismember(&pending_set
, signr
)) {
336 * From this point, no new signal handler will be fired that would try to
337 * access "chan". However, we still need to wait for any currently
338 * executing handler to complete.
341 CMM_STORE_SHARED(timer_signal
.qs_done
, 0);
345 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
348 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN
);
350 while (!CMM_LOAD_SHARED(timer_signal
.qs_done
)) {
355 pthread_mutex_unlock(&timer_signal
.lock
);
359 * Start a timer channel timer which will fire at a given interval
360 * (timer_interval_us)and fire a given signal (signal).
362 * Returns a negative value on error, 0 if a timer was created, and
363 * a positive value if no timer was created (not an error).
365 static int consumer_channel_timer_start(timer_t
*timer_id
,
366 struct lttng_consumer_channel
*channel
,
367 unsigned int timer_interval_us
,
370 int ret
= 0, delete_ret
;
371 struct sigevent sev
= {};
372 struct itimerspec its
;
374 LTTNG_ASSERT(channel
);
375 LTTNG_ASSERT(channel
->key
);
377 if (timer_interval_us
== 0) {
378 /* No creation needed; not an error. */
383 sev
.sigev_notify
= SIGEV_SIGNAL
;
384 sev
.sigev_signo
= signal
;
385 sev
.sigev_value
.sival_ptr
= channel
;
386 ret
= timer_create(CLOCKID
, &sev
, timer_id
);
388 PERROR("timer_create");
392 its
.it_value
.tv_sec
= timer_interval_us
/ 1000000;
393 its
.it_value
.tv_nsec
= (timer_interval_us
% 1000000) * 1000;
394 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
395 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
397 ret
= timer_settime(*timer_id
, 0, &its
, nullptr);
399 PERROR("timer_settime");
400 goto error_destroy_timer
;
405 delete_ret
= timer_delete(*timer_id
);
406 if (delete_ret
== -1) {
407 PERROR("timer_delete");
412 static int consumer_channel_timer_stop(timer_t
*timer_id
, int signal
)
416 ret
= timer_delete(*timer_id
);
418 PERROR("timer_delete");
422 consumer_timer_signal_thread_qs(signal
);
429 * Set the channel's switch timer.
431 void consumer_timer_switch_start(struct lttng_consumer_channel
*channel
,
432 unsigned int switch_timer_interval_us
)
436 LTTNG_ASSERT(channel
);
437 LTTNG_ASSERT(channel
->key
);
439 ret
= consumer_channel_timer_start(&channel
->switch_timer
,
441 switch_timer_interval_us
,
442 LTTNG_CONSUMER_SIG_SWITCH
);
444 channel
->switch_timer_enabled
= !!(ret
== 0);
448 * Stop and delete the channel's switch timer.
450 void consumer_timer_switch_stop(struct lttng_consumer_channel
*channel
)
454 LTTNG_ASSERT(channel
);
456 ret
= consumer_channel_timer_stop(&channel
->switch_timer
, LTTNG_CONSUMER_SIG_SWITCH
);
458 ERR("Failed to stop switch timer");
461 channel
->switch_timer_enabled
= 0;
465 * Set the channel's live timer.
467 void consumer_timer_live_start(struct lttng_consumer_channel
*channel
,
468 unsigned int live_timer_interval_us
)
472 LTTNG_ASSERT(channel
);
473 LTTNG_ASSERT(channel
->key
);
475 ret
= consumer_channel_timer_start(
476 &channel
->live_timer
, channel
, live_timer_interval_us
, LTTNG_CONSUMER_SIG_LIVE
);
478 channel
->live_timer_enabled
= !!(ret
== 0);
482 * Stop and delete the channel's live timer.
484 void consumer_timer_live_stop(struct lttng_consumer_channel
*channel
)
488 LTTNG_ASSERT(channel
);
490 ret
= consumer_channel_timer_stop(&channel
->live_timer
, LTTNG_CONSUMER_SIG_LIVE
);
492 ERR("Failed to stop live timer");
495 channel
->live_timer_enabled
= 0;
499 * Set the channel's monitoring timer.
501 * Returns a negative value on error, 0 if a timer was created, and
502 * a positive value if no timer was created (not an error).
504 int consumer_timer_monitor_start(struct lttng_consumer_channel
*channel
,
505 unsigned int monitor_timer_interval_us
)
509 LTTNG_ASSERT(channel
);
510 LTTNG_ASSERT(channel
->key
);
511 LTTNG_ASSERT(!channel
->monitor_timer_enabled
);
513 ret
= consumer_channel_timer_start(&channel
->monitor_timer
,
515 monitor_timer_interval_us
,
516 LTTNG_CONSUMER_SIG_MONITOR
);
517 channel
->monitor_timer_enabled
= !!(ret
== 0);
522 * Stop and delete the channel's monitoring timer.
524 int consumer_timer_monitor_stop(struct lttng_consumer_channel
*channel
)
528 LTTNG_ASSERT(channel
);
529 LTTNG_ASSERT(channel
->monitor_timer_enabled
);
531 ret
= consumer_channel_timer_stop(&channel
->monitor_timer
, LTTNG_CONSUMER_SIG_MONITOR
);
533 ERR("Failed to stop live timer");
537 channel
->monitor_timer_enabled
= 0;
543 * Block the RT signals for the entire process. It must be called from the
544 * consumer main before creating the threads
546 int consumer_signal_init()
551 /* Block signal for entire process, so only our thread processes it. */
553 ret
= pthread_sigmask(SIG_BLOCK
, &mask
, nullptr);
556 PERROR("pthread_sigmask");
562 static int sample_channel_positions(struct lttng_consumer_channel
*channel
,
563 uint64_t *_highest_use
,
564 uint64_t *_lowest_use
,
565 uint64_t *_total_consumed
,
566 sample_positions_cb sample
,
567 get_consumed_cb get_consumed
,
568 get_produced_cb get_produced
)
571 struct lttng_ht_iter iter
;
572 struct lttng_consumer_stream
*stream
;
573 bool empty_channel
= true;
574 uint64_t high
= 0, low
= UINT64_MAX
;
575 struct lttng_ht
*ht
= the_consumer_data
.stream_per_chan_id_ht
;
577 *_total_consumed
= 0;
579 lttng::urcu::read_lock_guard read_lock
;
581 cds_lfht_for_each_entry_duplicate(ht
->ht
,
582 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
587 node_channel_id
.node
)
589 unsigned long produced
, consumed
, usage
;
591 empty_channel
= false;
593 pthread_mutex_lock(&stream
->lock
);
594 if (cds_lfht_is_node_deleted(&stream
->node
.node
)) {
598 ret
= sample(stream
);
600 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)",
602 pthread_mutex_unlock(&stream
->lock
);
605 ret
= get_consumed(stream
, &consumed
);
607 ERR("Failed to get buffer consumed position in monitor timer");
608 pthread_mutex_unlock(&stream
->lock
);
611 ret
= get_produced(stream
, &produced
);
613 ERR("Failed to get buffer produced position in monitor timer");
614 pthread_mutex_unlock(&stream
->lock
);
618 usage
= produced
- consumed
;
619 high
= (usage
> high
) ? usage
: high
;
620 low
= (usage
< low
) ? usage
: low
;
623 * We don't use consumed here for 2 reasons:
624 * - output_written takes into account the padding written in the
625 * tracefiles when we stop the session;
626 * - the consumed position is not the accurate representation of what
627 * was extracted from a buffer in overwrite mode.
629 *_total_consumed
+= stream
->output_written
;
631 pthread_mutex_unlock(&stream
->lock
);
634 *_highest_use
= high
;
643 /* Sample and send channel buffering statistics to the session daemon. */
644 void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel
*channel
)
647 int channel_monitor_pipe
= consumer_timer_thread_get_channel_monitor_pipe();
648 struct lttcomm_consumer_channel_monitor_msg msg
= {
650 .session_id
= channel
->session_id
,
653 .consumed_since_last_sample
= 0,
655 sample_positions_cb sample
;
656 get_consumed_cb get_consumed
;
657 get_produced_cb get_produced
;
658 uint64_t lowest
= 0, highest
= 0, total_consumed
= 0;
660 LTTNG_ASSERT(channel
);
662 if (channel_monitor_pipe
< 0) {
666 switch (the_consumer_data
.type
) {
667 case LTTNG_CONSUMER_KERNEL
:
668 sample
= lttng_kconsumer_sample_snapshot_positions
;
669 get_consumed
= lttng_kconsumer_get_consumed_snapshot
;
670 get_produced
= lttng_kconsumer_get_produced_snapshot
;
672 case LTTNG_CONSUMER32_UST
:
673 case LTTNG_CONSUMER64_UST
:
674 sample
= lttng_ustconsumer_sample_snapshot_positions
;
675 get_consumed
= lttng_ustconsumer_get_consumed_snapshot
;
676 get_produced
= lttng_ustconsumer_get_produced_snapshot
;
682 ret
= sample_channel_positions(
683 channel
, &highest
, &lowest
, &total_consumed
, sample
, get_consumed
, get_produced
);
688 msg
.highest
= highest
;
690 msg
.consumed_since_last_sample
= total_consumed
- channel
->last_consumed_size_sample_sent
;
693 * Writes performed here are assumed to be atomic which is only
694 * guaranteed for sizes < than PIPE_BUF.
696 LTTNG_ASSERT(sizeof(msg
) <= PIPE_BUF
);
699 ret
= write(channel_monitor_pipe
, &msg
, sizeof(msg
));
700 } while (ret
== -1 && errno
== EINTR
);
702 if (errno
== EAGAIN
) {
703 /* Not an error, the sample is merely dropped. */
704 DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64
,
707 PERROR("write to the channel monitor pipe");
710 DBG("Sent channel monitoring sample for channel key %" PRIu64
711 ", (highest = %" PRIu64
", lowest = %" PRIu64
")",
715 channel
->last_consumed_size_sample_sent
= msg
.consumed_since_last_sample
;
719 int consumer_timer_thread_get_channel_monitor_pipe()
721 return uatomic_read(&the_channel_monitor_pipe
);
724 int consumer_timer_thread_set_channel_monitor_pipe(int fd
)
728 ret
= uatomic_cmpxchg(&the_channel_monitor_pipe
, -1, fd
);
739 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
740 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
741 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
743 void *consumer_timer_thread(void *data
)
748 struct lttng_consumer_local_data
*ctx
= (lttng_consumer_local_data
*) data
;
750 rcu_register_thread();
752 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_METADATA_TIMER
);
754 if (testpoint(consumerd_thread_metadata_timer
)) {
755 goto error_testpoint
;
758 health_code_update();
760 /* Only self thread will receive signal mask. */
762 CMM_STORE_SHARED(timer_signal
.tid
, pthread_self());
765 health_code_update();
768 signr
= sigwaitinfo(&mask
, &info
);
772 * NOTE: cascading conditions are used instead of a switch case
773 * since the use of SIGRTMIN in the definition of the signals'
774 * values prevents the reduction to an integer constant.
777 if (errno
!= EINTR
) {
778 PERROR("sigwaitinfo");
781 } else if (signr
== LTTNG_CONSUMER_SIG_SWITCH
) {
782 metadata_switch_timer(ctx
, &info
);
783 } else if (signr
== LTTNG_CONSUMER_SIG_TEARDOWN
) {
785 CMM_STORE_SHARED(timer_signal
.qs_done
, 1);
787 DBG("Signal timer metadata thread teardown");
788 } else if (signr
== LTTNG_CONSUMER_SIG_LIVE
) {
789 live_timer(ctx
, &info
);
790 } else if (signr
== LTTNG_CONSUMER_SIG_MONITOR
) {
791 struct lttng_consumer_channel
*channel
;
793 channel
= (lttng_consumer_channel
*) info
.si_value
.sival_ptr
;
794 sample_and_send_channel_buffer_stats(channel
);
795 } else if (signr
== LTTNG_CONSUMER_SIG_EXIT
) {
796 LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit
));
799 ERR("Unexpected signal %d\n", info
.si_signo
);
804 /* Only reached in testpoint error */
807 health_unregister(health_consumerd
);
808 rcu_unregister_thread();