Fix: sessiond: use system LTTng-UST headers when available
[lttng-tools.git] / src / common / consumer / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _LGPL_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer/consumer-stream.h>
30 #include <common/consumer/consumer-timer.h>
31 #include <common/consumer/consumer-testpoint.h>
32 #include <common/ust-consumer/ust-consumer.h>
33
34 typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35 typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37 typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
39
40 static struct timer_signal_data timer_signal = {
41 .tid = 0,
42 .setup_done = 0,
43 .qs_done = 0,
44 .lock = PTHREAD_MUTEX_INITIALIZER,
45 };
46
47 /*
48 * Set custom signal mask to current thread.
49 */
50 static void setmask(sigset_t *mask)
51 {
52 int ret;
53
54 ret = sigemptyset(mask);
55 if (ret) {
56 PERROR("sigemptyset");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
59 if (ret) {
60 PERROR("sigaddset switch");
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
63 if (ret) {
64 PERROR("sigaddset teardown");
65 }
66 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
67 if (ret) {
68 PERROR("sigaddset live");
69 }
70 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
71 if (ret) {
72 PERROR("sigaddset monitor");
73 }
74 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
75 if (ret) {
76 PERROR("sigaddset exit");
77 }
78 }
79
80 static int channel_monitor_pipe = -1;
81
82 /*
83 * Execute action on a timer switch.
84 *
85 * Beware: metadata_switch_timer() should *never* take a mutex also held
86 * while consumer_timer_switch_stop() is called. It would result in
87 * deadlocks.
88 */
89 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
90 siginfo_t *si)
91 {
92 int ret;
93 struct lttng_consumer_channel *channel;
94
95 channel = si->si_value.sival_ptr;
96 assert(channel);
97
98 if (channel->switch_timer_error) {
99 return;
100 }
101
102 DBG("Switch timer for channel %" PRIu64, channel->key);
103 switch (ctx->type) {
104 case LTTNG_CONSUMER32_UST:
105 case LTTNG_CONSUMER64_UST:
106 /*
107 * Locks taken by lttng_ustconsumer_request_metadata():
108 * - metadata_socket_lock
109 * - Calling lttng_ustconsumer_recv_metadata():
110 * - channel->metadata_cache->lock
111 * - Calling consumer_metadata_cache_flushed():
112 * - channel->timer_lock
113 * - channel->metadata_cache->lock
114 *
115 * Ensure that neither consumer_data.lock nor
116 * channel->lock are taken within this function, since
117 * they are held while consumer_timer_switch_stop() is
118 * called.
119 */
120 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
121 if (ret < 0) {
122 channel->switch_timer_error = 1;
123 }
124 break;
125 case LTTNG_CONSUMER_KERNEL:
126 case LTTNG_CONSUMER_UNKNOWN:
127 assert(0);
128 break;
129 }
130 }
131
132 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
133 uint64_t stream_id)
134 {
135 int ret;
136 struct ctf_packet_index index;
137
138 memset(&index, 0, sizeof(index));
139 index.stream_id = htobe64(stream_id);
140 index.timestamp_end = htobe64(ts);
141 ret = consumer_stream_write_index(stream, &index);
142 if (ret < 0) {
143 goto error;
144 }
145
146 error:
147 return ret;
148 }
149
150 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
151 {
152 uint64_t ts, stream_id;
153 int ret;
154
155 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
156 if (ret < 0) {
157 ERR("Failed to get the current timestamp");
158 goto end;
159 }
160 ret = kernctl_buffer_flush(stream->wait_fd);
161 if (ret < 0) {
162 ERR("Failed to flush kernel stream");
163 goto end;
164 }
165 ret = kernctl_snapshot(stream->wait_fd);
166 if (ret < 0) {
167 if (ret != -EAGAIN && ret != -ENODATA) {
168 PERROR("live timer kernel snapshot");
169 ret = -1;
170 goto end;
171 }
172 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
173 if (ret < 0) {
174 PERROR("kernctl_get_stream_id");
175 goto end;
176 }
177 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
178 ret = send_empty_index(stream, ts, stream_id);
179 if (ret < 0) {
180 goto end;
181 }
182 }
183 ret = 0;
184 end:
185 return ret;
186 }
187
188 static int check_kernel_stream(struct lttng_consumer_stream *stream)
189 {
190 int ret;
191
192 /*
193 * While holding the stream mutex, try to take a snapshot, if it
194 * succeeds, it means that data is ready to be sent, just let the data
195 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
196 * means that there is no data to read after the flush, so we can
197 * safely send the empty index.
198 *
199 * Doing a trylock and checking if waiting on metadata if
200 * trylock fails. Bail out of the stream is indeed waiting for
201 * metadata to be pushed. Busy wait on trylock otherwise.
202 */
203 for (;;) {
204 ret = pthread_mutex_trylock(&stream->lock);
205 switch (ret) {
206 case 0:
207 break; /* We have the lock. */
208 case EBUSY:
209 pthread_mutex_lock(&stream->metadata_timer_lock);
210 if (stream->waiting_on_metadata) {
211 ret = 0;
212 stream->missed_metadata_flush = true;
213 pthread_mutex_unlock(&stream->metadata_timer_lock);
214 goto end; /* Bail out. */
215 }
216 pthread_mutex_unlock(&stream->metadata_timer_lock);
217 /* Try again. */
218 caa_cpu_relax();
219 continue;
220 default:
221 ERR("Unexpected pthread_mutex_trylock error %d", ret);
222 ret = -1;
223 goto end;
224 }
225 break;
226 }
227 ret = consumer_flush_kernel_index(stream);
228 pthread_mutex_unlock(&stream->lock);
229 end:
230 return ret;
231 }
232
233 int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
234 {
235 uint64_t ts, stream_id;
236 int ret;
237
238 ret = cds_lfht_is_node_deleted(&stream->node.node);
239 if (ret) {
240 goto end;
241 }
242
243 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
244 if (ret < 0) {
245 ERR("Failed to get the current timestamp");
246 goto end;
247 }
248 lttng_ustconsumer_flush_buffer(stream, 1);
249 ret = lttng_ustconsumer_take_snapshot(stream);
250 if (ret < 0) {
251 if (ret != -EAGAIN) {
252 ERR("Taking UST snapshot");
253 ret = -1;
254 goto end;
255 }
256 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
257 if (ret < 0) {
258 PERROR("ustctl_get_stream_id");
259 goto end;
260 }
261 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
262 ret = send_empty_index(stream, ts, stream_id);
263 if (ret < 0) {
264 goto end;
265 }
266 }
267 ret = 0;
268 end:
269 return ret;
270 }
271
272 static int check_ust_stream(struct lttng_consumer_stream *stream)
273 {
274 int ret;
275
276 assert(stream);
277 assert(stream->ustream);
278 /*
279 * While holding the stream mutex, try to take a snapshot, if it
280 * succeeds, it means that data is ready to be sent, just let the data
281 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
282 * means that there is no data to read after the flush, so we can
283 * safely send the empty index.
284 *
285 * Doing a trylock and checking if waiting on metadata if
286 * trylock fails. Bail out of the stream is indeed waiting for
287 * metadata to be pushed. Busy wait on trylock otherwise.
288 */
289 for (;;) {
290 ret = pthread_mutex_trylock(&stream->lock);
291 switch (ret) {
292 case 0:
293 break; /* We have the lock. */
294 case EBUSY:
295 pthread_mutex_lock(&stream->metadata_timer_lock);
296 if (stream->waiting_on_metadata) {
297 ret = 0;
298 stream->missed_metadata_flush = true;
299 pthread_mutex_unlock(&stream->metadata_timer_lock);
300 goto end; /* Bail out. */
301 }
302 pthread_mutex_unlock(&stream->metadata_timer_lock);
303 /* Try again. */
304 caa_cpu_relax();
305 continue;
306 default:
307 ERR("Unexpected pthread_mutex_trylock error %d", ret);
308 ret = -1;
309 goto end;
310 }
311 break;
312 }
313 ret = consumer_flush_ust_index(stream);
314 pthread_mutex_unlock(&stream->lock);
315 end:
316 return ret;
317 }
318
319 /*
320 * Execute action on a live timer
321 */
322 static void live_timer(struct lttng_consumer_local_data *ctx,
323 siginfo_t *si)
324 {
325 int ret;
326 struct lttng_consumer_channel *channel;
327 struct lttng_consumer_stream *stream;
328 struct lttng_ht *ht;
329 struct lttng_ht_iter iter;
330
331 channel = si->si_value.sival_ptr;
332 assert(channel);
333
334 if (channel->switch_timer_error) {
335 goto error;
336 }
337 ht = consumer_data.stream_per_chan_id_ht;
338
339 DBG("Live timer for channel %" PRIu64, channel->key);
340
341 rcu_read_lock();
342 switch (ctx->type) {
343 case LTTNG_CONSUMER32_UST:
344 case LTTNG_CONSUMER64_UST:
345 cds_lfht_for_each_entry_duplicate(ht->ht,
346 ht->hash_fct(&channel->key, lttng_ht_seed),
347 ht->match_fct, &channel->key, &iter.iter,
348 stream, node_channel_id.node) {
349 ret = check_ust_stream(stream);
350 if (ret < 0) {
351 goto error_unlock;
352 }
353 }
354 break;
355 case LTTNG_CONSUMER_KERNEL:
356 cds_lfht_for_each_entry_duplicate(ht->ht,
357 ht->hash_fct(&channel->key, lttng_ht_seed),
358 ht->match_fct, &channel->key, &iter.iter,
359 stream, node_channel_id.node) {
360 ret = check_kernel_stream(stream);
361 if (ret < 0) {
362 goto error_unlock;
363 }
364 }
365 break;
366 case LTTNG_CONSUMER_UNKNOWN:
367 assert(0);
368 break;
369 }
370
371 error_unlock:
372 rcu_read_unlock();
373
374 error:
375 return;
376 }
377
378 static
379 void consumer_timer_signal_thread_qs(unsigned int signr)
380 {
381 sigset_t pending_set;
382 int ret;
383
384 /*
385 * We need to be the only thread interacting with the thread
386 * that manages signals for teardown synchronization.
387 */
388 pthread_mutex_lock(&timer_signal.lock);
389
390 /* Ensure we don't have any signal queued for this channel. */
391 for (;;) {
392 ret = sigemptyset(&pending_set);
393 if (ret == -1) {
394 PERROR("sigemptyset");
395 }
396 ret = sigpending(&pending_set);
397 if (ret == -1) {
398 PERROR("sigpending");
399 }
400 if (!sigismember(&pending_set, signr)) {
401 break;
402 }
403 caa_cpu_relax();
404 }
405
406 /*
407 * From this point, no new signal handler will be fired that would try to
408 * access "chan". However, we still need to wait for any currently
409 * executing handler to complete.
410 */
411 cmm_smp_mb();
412 CMM_STORE_SHARED(timer_signal.qs_done, 0);
413 cmm_smp_mb();
414
415 /*
416 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
417 * up.
418 */
419 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
420
421 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
422 caa_cpu_relax();
423 }
424 cmm_smp_mb();
425
426 pthread_mutex_unlock(&timer_signal.lock);
427 }
428
429 /*
430 * Start a timer channel timer which will fire at a given interval
431 * (timer_interval_us)and fire a given signal (signal).
432 *
433 * Returns a negative value on error, 0 if a timer was created, and
434 * a positive value if no timer was created (not an error).
435 */
436 static
437 int consumer_channel_timer_start(timer_t *timer_id,
438 struct lttng_consumer_channel *channel,
439 unsigned int timer_interval_us, int signal)
440 {
441 int ret = 0, delete_ret;
442 struct sigevent sev;
443 struct itimerspec its;
444
445 assert(channel);
446 assert(channel->key);
447
448 if (timer_interval_us == 0) {
449 /* No creation needed; not an error. */
450 ret = 1;
451 goto end;
452 }
453
454 sev.sigev_notify = SIGEV_SIGNAL;
455 sev.sigev_signo = signal;
456 sev.sigev_value.sival_ptr = channel;
457 ret = timer_create(CLOCKID, &sev, timer_id);
458 if (ret == -1) {
459 PERROR("timer_create");
460 goto end;
461 }
462
463 its.it_value.tv_sec = timer_interval_us / 1000000;
464 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
465 its.it_interval.tv_sec = its.it_value.tv_sec;
466 its.it_interval.tv_nsec = its.it_value.tv_nsec;
467
468 ret = timer_settime(*timer_id, 0, &its, NULL);
469 if (ret == -1) {
470 PERROR("timer_settime");
471 goto error_destroy_timer;
472 }
473 end:
474 return ret;
475 error_destroy_timer:
476 delete_ret = timer_delete(*timer_id);
477 if (delete_ret == -1) {
478 PERROR("timer_delete");
479 }
480 goto end;
481 }
482
483 static
484 int consumer_channel_timer_stop(timer_t *timer_id, int signal)
485 {
486 int ret = 0;
487
488 ret = timer_delete(*timer_id);
489 if (ret == -1) {
490 PERROR("timer_delete");
491 goto end;
492 }
493
494 consumer_timer_signal_thread_qs(signal);
495 *timer_id = 0;
496 end:
497 return ret;
498 }
499
500 /*
501 * Set the channel's switch timer.
502 */
503 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
504 unsigned int switch_timer_interval_us)
505 {
506 int ret;
507
508 assert(channel);
509 assert(channel->key);
510
511 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
512 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
513
514 channel->switch_timer_enabled = !!(ret == 0);
515 }
516
517 /*
518 * Stop and delete the channel's switch timer.
519 */
520 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
521 {
522 int ret;
523
524 assert(channel);
525
526 ret = consumer_channel_timer_stop(&channel->switch_timer,
527 LTTNG_CONSUMER_SIG_SWITCH);
528 if (ret == -1) {
529 ERR("Failed to stop switch timer");
530 }
531
532 channel->switch_timer_enabled = 0;
533 }
534
535 /*
536 * Set the channel's live timer.
537 */
538 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
539 unsigned int live_timer_interval_us)
540 {
541 int ret;
542
543 assert(channel);
544 assert(channel->key);
545
546 ret = consumer_channel_timer_start(&channel->live_timer, channel,
547 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
548
549 channel->live_timer_enabled = !!(ret == 0);
550 }
551
552 /*
553 * Stop and delete the channel's live timer.
554 */
555 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
556 {
557 int ret;
558
559 assert(channel);
560
561 ret = consumer_channel_timer_stop(&channel->live_timer,
562 LTTNG_CONSUMER_SIG_LIVE);
563 if (ret == -1) {
564 ERR("Failed to stop live timer");
565 }
566
567 channel->live_timer_enabled = 0;
568 }
569
570 /*
571 * Set the channel's monitoring timer.
572 *
573 * Returns a negative value on error, 0 if a timer was created, and
574 * a positive value if no timer was created (not an error).
575 */
576 int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
577 unsigned int monitor_timer_interval_us)
578 {
579 int ret;
580
581 assert(channel);
582 assert(channel->key);
583 assert(!channel->monitor_timer_enabled);
584
585 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
586 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
587 channel->monitor_timer_enabled = !!(ret == 0);
588 return ret;
589 }
590
591 /*
592 * Stop and delete the channel's monitoring timer.
593 */
594 int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
595 {
596 int ret;
597
598 assert(channel);
599 assert(channel->monitor_timer_enabled);
600
601 ret = consumer_channel_timer_stop(&channel->monitor_timer,
602 LTTNG_CONSUMER_SIG_MONITOR);
603 if (ret == -1) {
604 ERR("Failed to stop live timer");
605 goto end;
606 }
607
608 channel->monitor_timer_enabled = 0;
609 end:
610 return ret;
611 }
612
613 /*
614 * Block the RT signals for the entire process. It must be called from the
615 * consumer main before creating the threads
616 */
617 int consumer_signal_init(void)
618 {
619 int ret;
620 sigset_t mask;
621
622 /* Block signal for entire process, so only our thread processes it. */
623 setmask(&mask);
624 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
625 if (ret) {
626 errno = ret;
627 PERROR("pthread_sigmask");
628 return -1;
629 }
630 return 0;
631 }
632
633 static
634 int sample_channel_positions(struct lttng_consumer_channel *channel,
635 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
636 sample_positions_cb sample, get_consumed_cb get_consumed,
637 get_produced_cb get_produced)
638 {
639 int ret = 0;
640 struct lttng_ht_iter iter;
641 struct lttng_consumer_stream *stream;
642 bool empty_channel = true;
643 uint64_t high = 0, low = UINT64_MAX;
644 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
645
646 *_total_consumed = 0;
647
648 rcu_read_lock();
649
650 cds_lfht_for_each_entry_duplicate(ht->ht,
651 ht->hash_fct(&channel->key, lttng_ht_seed),
652 ht->match_fct, &channel->key,
653 &iter.iter, stream, node_channel_id.node) {
654 unsigned long produced, consumed, usage;
655
656 empty_channel = false;
657
658 pthread_mutex_lock(&stream->lock);
659 if (cds_lfht_is_node_deleted(&stream->node.node)) {
660 goto next;
661 }
662
663 ret = sample(stream);
664 if (ret) {
665 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
666 pthread_mutex_unlock(&stream->lock);
667 goto end;
668 }
669 ret = get_consumed(stream, &consumed);
670 if (ret) {
671 ERR("Failed to get buffer consumed position in monitor timer");
672 pthread_mutex_unlock(&stream->lock);
673 goto end;
674 }
675 ret = get_produced(stream, &produced);
676 if (ret) {
677 ERR("Failed to get buffer produced position in monitor timer");
678 pthread_mutex_unlock(&stream->lock);
679 goto end;
680 }
681
682 usage = produced - consumed;
683 high = (usage > high) ? usage : high;
684 low = (usage < low) ? usage : low;
685
686 /*
687 * We don't use consumed here for 2 reasons:
688 * - output_written takes into account the padding written in the
689 * tracefiles when we stop the session;
690 * - the consumed position is not the accurate representation of what
691 * was extracted from a buffer in overwrite mode.
692 */
693 *_total_consumed += stream->output_written;
694 next:
695 pthread_mutex_unlock(&stream->lock);
696 }
697
698 *_highest_use = high;
699 *_lowest_use = low;
700 end:
701 rcu_read_unlock();
702 if (empty_channel) {
703 ret = -1;
704 }
705 return ret;
706 }
707
708 /*
709 * Execute action on a monitor timer.
710 */
711 static
712 void monitor_timer(struct lttng_consumer_channel *channel)
713 {
714 int ret;
715 int channel_monitor_pipe =
716 consumer_timer_thread_get_channel_monitor_pipe();
717 struct lttcomm_consumer_channel_monitor_msg msg = {
718 .key = channel->key,
719 };
720 sample_positions_cb sample;
721 get_consumed_cb get_consumed;
722 get_produced_cb get_produced;
723 uint64_t lowest = 0, highest = 0, total_consumed = 0;
724
725 assert(channel);
726
727 if (channel_monitor_pipe < 0) {
728 return;
729 }
730
731 switch (consumer_data.type) {
732 case LTTNG_CONSUMER_KERNEL:
733 sample = lttng_kconsumer_sample_snapshot_positions;
734 get_consumed = lttng_kconsumer_get_consumed_snapshot;
735 get_produced = lttng_kconsumer_get_produced_snapshot;
736 break;
737 case LTTNG_CONSUMER32_UST:
738 case LTTNG_CONSUMER64_UST:
739 sample = lttng_ustconsumer_sample_snapshot_positions;
740 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
741 get_produced = lttng_ustconsumer_get_produced_snapshot;
742 break;
743 default:
744 abort();
745 }
746
747 ret = sample_channel_positions(channel, &highest, &lowest,
748 &total_consumed, sample, get_consumed, get_produced);
749 if (ret) {
750 return;
751 }
752 msg.highest = highest;
753 msg.lowest = lowest;
754 msg.total_consumed = total_consumed;
755
756 /*
757 * Writes performed here are assumed to be atomic which is only
758 * guaranteed for sizes < than PIPE_BUF.
759 */
760 assert(sizeof(msg) <= PIPE_BUF);
761
762 do {
763 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
764 } while (ret == -1 && errno == EINTR);
765 if (ret == -1) {
766 if (errno == EAGAIN) {
767 /* Not an error, the sample is merely dropped. */
768 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
769 channel->key);
770 } else {
771 PERROR("write to the channel monitor pipe");
772 }
773 } else {
774 DBG("Sent channel monitoring sample for channel key %" PRIu64
775 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
776 channel->key, msg.highest, msg.lowest);
777 }
778 }
779
780 int consumer_timer_thread_get_channel_monitor_pipe(void)
781 {
782 return uatomic_read(&channel_monitor_pipe);
783 }
784
785 int consumer_timer_thread_set_channel_monitor_pipe(int fd)
786 {
787 int ret;
788
789 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
790 if (ret != -1) {
791 ret = -1;
792 goto end;
793 }
794 ret = 0;
795 end:
796 return ret;
797 }
798
799 /*
800 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
801 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
802 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
803 */
804 void *consumer_timer_thread(void *data)
805 {
806 int signr;
807 sigset_t mask;
808 siginfo_t info;
809 struct lttng_consumer_local_data *ctx = data;
810
811 rcu_register_thread();
812
813 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
814
815 if (testpoint(consumerd_thread_metadata_timer)) {
816 goto error_testpoint;
817 }
818
819 health_code_update();
820
821 /* Only self thread will receive signal mask. */
822 setmask(&mask);
823 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
824
825 while (1) {
826 health_code_update();
827
828 health_poll_entry();
829 signr = sigwaitinfo(&mask, &info);
830 health_poll_exit();
831
832 /*
833 * NOTE: cascading conditions are used instead of a switch case
834 * since the use of SIGRTMIN in the definition of the signals'
835 * values prevents the reduction to an integer constant.
836 */
837 if (signr == -1) {
838 if (errno != EINTR) {
839 PERROR("sigwaitinfo");
840 }
841 continue;
842 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
843 metadata_switch_timer(ctx, &info);
844 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
845 cmm_smp_mb();
846 CMM_STORE_SHARED(timer_signal.qs_done, 1);
847 cmm_smp_mb();
848 DBG("Signal timer metadata thread teardown");
849 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
850 live_timer(ctx, &info);
851 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
852 struct lttng_consumer_channel *channel;
853
854 channel = info.si_value.sival_ptr;
855 monitor_timer(channel);
856 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
857 assert(CMM_LOAD_SHARED(consumer_quit));
858 goto end;
859 } else {
860 ERR("Unexpected signal %d\n", info.si_signo);
861 }
862 }
863
864 error_testpoint:
865 /* Only reached in testpoint error */
866 health_error();
867 end:
868 health_unregister(health_consumerd);
869 rcu_unregister_thread();
870 return NULL;
871 }
This page took 0.046672 seconds and 5 git commands to generate.