kerner-ctl: add RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK
[lttng-tools.git] / src / common / consumer / consumer-timer.c
1 /*
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9 #define _LGPL_SOURCE
10 #include <assert.h>
11 #include <inttypes.h>
12 #include <signal.h>
13
14 #include <bin/lttng-consumerd/health-consumerd.h>
15 #include <common/common.h>
16 #include <common/compat/endian.h>
17 #include <common/kernel-ctl/kernel-ctl.h>
18 #include <common/kernel-consumer/kernel-consumer.h>
19 #include <common/consumer/consumer-stream.h>
20 #include <common/consumer/consumer-timer.h>
21 #include <common/consumer/consumer-testpoint.h>
22 #include <common/ust-consumer/ust-consumer.h>
23
24 typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
25 typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
26 unsigned long *consumed);
27 typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
28 unsigned long *produced);
29
30 static struct timer_signal_data timer_signal = {
31 .tid = 0,
32 .setup_done = 0,
33 .qs_done = 0,
34 .lock = PTHREAD_MUTEX_INITIALIZER,
35 };
36
37 /*
38 * Set custom signal mask to current thread.
39 */
40 static void setmask(sigset_t *mask)
41 {
42 int ret;
43
44 ret = sigemptyset(mask);
45 if (ret) {
46 PERROR("sigemptyset");
47 }
48 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
49 if (ret) {
50 PERROR("sigaddset switch");
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
53 if (ret) {
54 PERROR("sigaddset teardown");
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
57 if (ret) {
58 PERROR("sigaddset live");
59 }
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
61 if (ret) {
62 PERROR("sigaddset monitor");
63 }
64 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
65 if (ret) {
66 PERROR("sigaddset exit");
67 }
68 }
69
70 static int channel_monitor_pipe = -1;
71
72 /*
73 * Execute action on a timer switch.
74 *
75 * Beware: metadata_switch_timer() should *never* take a mutex also held
76 * while consumer_timer_switch_stop() is called. It would result in
77 * deadlocks.
78 */
79 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
80 siginfo_t *si)
81 {
82 int ret;
83 struct lttng_consumer_channel *channel;
84
85 channel = si->si_value.sival_ptr;
86 assert(channel);
87
88 if (channel->switch_timer_error) {
89 return;
90 }
91
92 DBG("Switch timer for channel %" PRIu64, channel->key);
93 switch (ctx->type) {
94 case LTTNG_CONSUMER32_UST:
95 case LTTNG_CONSUMER64_UST:
96 /*
97 * Locks taken by lttng_ustconsumer_request_metadata():
98 * - metadata_socket_lock
99 * - Calling lttng_ustconsumer_recv_metadata():
100 * - channel->metadata_cache->lock
101 * - Calling consumer_metadata_cache_flushed():
102 * - channel->timer_lock
103 * - channel->metadata_cache->lock
104 *
105 * Ensure that neither consumer_data.lock nor
106 * channel->lock are taken within this function, since
107 * they are held while consumer_timer_switch_stop() is
108 * called.
109 */
110 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
111 if (ret < 0) {
112 channel->switch_timer_error = 1;
113 }
114 break;
115 case LTTNG_CONSUMER_KERNEL:
116 case LTTNG_CONSUMER_UNKNOWN:
117 assert(0);
118 break;
119 }
120 }
121
122 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
123 uint64_t stream_id)
124 {
125 int ret;
126 struct ctf_packet_index index;
127
128 memset(&index, 0, sizeof(index));
129 index.stream_id = htobe64(stream_id);
130 index.timestamp_end = htobe64(ts);
131 ret = consumer_stream_write_index(stream, &index);
132 if (ret < 0) {
133 goto error;
134 }
135
136 error:
137 return ret;
138 }
139
140 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
141 {
142 uint64_t ts, stream_id;
143 int ret;
144
145 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
146 if (ret < 0) {
147 ERR("Failed to get the current timestamp");
148 goto end;
149 }
150 ret = kernctl_buffer_flush(stream->wait_fd);
151 if (ret < 0) {
152 ERR("Failed to flush kernel stream");
153 goto end;
154 }
155 ret = kernctl_snapshot(stream->wait_fd);
156 if (ret < 0) {
157 if (ret != -EAGAIN && ret != -ENODATA) {
158 PERROR("live timer kernel snapshot");
159 ret = -1;
160 goto end;
161 }
162 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
163 if (ret < 0) {
164 PERROR("kernctl_get_stream_id");
165 goto end;
166 }
167 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
168 ret = send_empty_index(stream, ts, stream_id);
169 if (ret < 0) {
170 goto end;
171 }
172 }
173 ret = 0;
174 end:
175 return ret;
176 }
177
178 static int check_kernel_stream(struct lttng_consumer_stream *stream)
179 {
180 int ret;
181
182 /*
183 * While holding the stream mutex, try to take a snapshot, if it
184 * succeeds, it means that data is ready to be sent, just let the data
185 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
186 * means that there is no data to read after the flush, so we can
187 * safely send the empty index.
188 *
189 * Doing a trylock and checking if waiting on metadata if
190 * trylock fails. Bail out of the stream is indeed waiting for
191 * metadata to be pushed. Busy wait on trylock otherwise.
192 */
193 for (;;) {
194 ret = pthread_mutex_trylock(&stream->lock);
195 switch (ret) {
196 case 0:
197 break; /* We have the lock. */
198 case EBUSY:
199 pthread_mutex_lock(&stream->metadata_timer_lock);
200 if (stream->waiting_on_metadata) {
201 ret = 0;
202 stream->missed_metadata_flush = true;
203 pthread_mutex_unlock(&stream->metadata_timer_lock);
204 goto end; /* Bail out. */
205 }
206 pthread_mutex_unlock(&stream->metadata_timer_lock);
207 /* Try again. */
208 caa_cpu_relax();
209 continue;
210 default:
211 ERR("Unexpected pthread_mutex_trylock error %d", ret);
212 ret = -1;
213 goto end;
214 }
215 break;
216 }
217 ret = consumer_flush_kernel_index(stream);
218 pthread_mutex_unlock(&stream->lock);
219 end:
220 return ret;
221 }
222
223 int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
224 {
225 uint64_t ts, stream_id;
226 int ret;
227
228 ret = cds_lfht_is_node_deleted(&stream->node.node);
229 if (ret) {
230 goto end;
231 }
232
233 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
234 if (ret < 0) {
235 ERR("Failed to get the current timestamp");
236 goto end;
237 }
238 lttng_ustconsumer_flush_buffer(stream, 1);
239 ret = lttng_ustconsumer_take_snapshot(stream);
240 if (ret < 0) {
241 if (ret != -EAGAIN) {
242 ERR("Taking UST snapshot");
243 ret = -1;
244 goto end;
245 }
246 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
247 if (ret < 0) {
248 PERROR("ustctl_get_stream_id");
249 goto end;
250 }
251 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
252 ret = send_empty_index(stream, ts, stream_id);
253 if (ret < 0) {
254 goto end;
255 }
256 }
257 ret = 0;
258 end:
259 return ret;
260 }
261
262 static int check_ust_stream(struct lttng_consumer_stream *stream)
263 {
264 int ret;
265
266 assert(stream);
267 assert(stream->ustream);
268 /*
269 * While holding the stream mutex, try to take a snapshot, if it
270 * succeeds, it means that data is ready to be sent, just let the data
271 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
272 * means that there is no data to read after the flush, so we can
273 * safely send the empty index.
274 *
275 * Doing a trylock and checking if waiting on metadata if
276 * trylock fails. Bail out of the stream is indeed waiting for
277 * metadata to be pushed. Busy wait on trylock otherwise.
278 */
279 for (;;) {
280 ret = pthread_mutex_trylock(&stream->lock);
281 switch (ret) {
282 case 0:
283 break; /* We have the lock. */
284 case EBUSY:
285 pthread_mutex_lock(&stream->metadata_timer_lock);
286 if (stream->waiting_on_metadata) {
287 ret = 0;
288 stream->missed_metadata_flush = true;
289 pthread_mutex_unlock(&stream->metadata_timer_lock);
290 goto end; /* Bail out. */
291 }
292 pthread_mutex_unlock(&stream->metadata_timer_lock);
293 /* Try again. */
294 caa_cpu_relax();
295 continue;
296 default:
297 ERR("Unexpected pthread_mutex_trylock error %d", ret);
298 ret = -1;
299 goto end;
300 }
301 break;
302 }
303 ret = consumer_flush_ust_index(stream);
304 pthread_mutex_unlock(&stream->lock);
305 end:
306 return ret;
307 }
308
309 /*
310 * Execute action on a live timer
311 */
312 static void live_timer(struct lttng_consumer_local_data *ctx,
313 siginfo_t *si)
314 {
315 int ret;
316 struct lttng_consumer_channel *channel;
317 struct lttng_consumer_stream *stream;
318 struct lttng_ht *ht;
319 struct lttng_ht_iter iter;
320
321 channel = si->si_value.sival_ptr;
322 assert(channel);
323
324 if (channel->switch_timer_error) {
325 goto error;
326 }
327 ht = consumer_data.stream_per_chan_id_ht;
328
329 DBG("Live timer for channel %" PRIu64, channel->key);
330
331 rcu_read_lock();
332 switch (ctx->type) {
333 case LTTNG_CONSUMER32_UST:
334 case LTTNG_CONSUMER64_UST:
335 cds_lfht_for_each_entry_duplicate(ht->ht,
336 ht->hash_fct(&channel->key, lttng_ht_seed),
337 ht->match_fct, &channel->key, &iter.iter,
338 stream, node_channel_id.node) {
339 ret = check_ust_stream(stream);
340 if (ret < 0) {
341 goto error_unlock;
342 }
343 }
344 break;
345 case LTTNG_CONSUMER_KERNEL:
346 cds_lfht_for_each_entry_duplicate(ht->ht,
347 ht->hash_fct(&channel->key, lttng_ht_seed),
348 ht->match_fct, &channel->key, &iter.iter,
349 stream, node_channel_id.node) {
350 ret = check_kernel_stream(stream);
351 if (ret < 0) {
352 goto error_unlock;
353 }
354 }
355 break;
356 case LTTNG_CONSUMER_UNKNOWN:
357 assert(0);
358 break;
359 }
360
361 error_unlock:
362 rcu_read_unlock();
363
364 error:
365 return;
366 }
367
368 static
369 void consumer_timer_signal_thread_qs(unsigned int signr)
370 {
371 sigset_t pending_set;
372 int ret;
373
374 /*
375 * We need to be the only thread interacting with the thread
376 * that manages signals for teardown synchronization.
377 */
378 pthread_mutex_lock(&timer_signal.lock);
379
380 /* Ensure we don't have any signal queued for this channel. */
381 for (;;) {
382 ret = sigemptyset(&pending_set);
383 if (ret == -1) {
384 PERROR("sigemptyset");
385 }
386 ret = sigpending(&pending_set);
387 if (ret == -1) {
388 PERROR("sigpending");
389 }
390 if (!sigismember(&pending_set, signr)) {
391 break;
392 }
393 caa_cpu_relax();
394 }
395
396 /*
397 * From this point, no new signal handler will be fired that would try to
398 * access "chan". However, we still need to wait for any currently
399 * executing handler to complete.
400 */
401 cmm_smp_mb();
402 CMM_STORE_SHARED(timer_signal.qs_done, 0);
403 cmm_smp_mb();
404
405 /*
406 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
407 * up.
408 */
409 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
410
411 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
412 caa_cpu_relax();
413 }
414 cmm_smp_mb();
415
416 pthread_mutex_unlock(&timer_signal.lock);
417 }
418
419 /*
420 * Start a timer channel timer which will fire at a given interval
421 * (timer_interval_us)and fire a given signal (signal).
422 *
423 * Returns a negative value on error, 0 if a timer was created, and
424 * a positive value if no timer was created (not an error).
425 */
426 static
427 int consumer_channel_timer_start(timer_t *timer_id,
428 struct lttng_consumer_channel *channel,
429 unsigned int timer_interval_us, int signal)
430 {
431 int ret = 0, delete_ret;
432 struct sigevent sev;
433 struct itimerspec its;
434
435 assert(channel);
436 assert(channel->key);
437
438 if (timer_interval_us == 0) {
439 /* No creation needed; not an error. */
440 ret = 1;
441 goto end;
442 }
443
444 sev.sigev_notify = SIGEV_SIGNAL;
445 sev.sigev_signo = signal;
446 sev.sigev_value.sival_ptr = channel;
447 ret = timer_create(CLOCKID, &sev, timer_id);
448 if (ret == -1) {
449 PERROR("timer_create");
450 goto end;
451 }
452
453 its.it_value.tv_sec = timer_interval_us / 1000000;
454 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
455 its.it_interval.tv_sec = its.it_value.tv_sec;
456 its.it_interval.tv_nsec = its.it_value.tv_nsec;
457
458 ret = timer_settime(*timer_id, 0, &its, NULL);
459 if (ret == -1) {
460 PERROR("timer_settime");
461 goto error_destroy_timer;
462 }
463 end:
464 return ret;
465 error_destroy_timer:
466 delete_ret = timer_delete(*timer_id);
467 if (delete_ret == -1) {
468 PERROR("timer_delete");
469 }
470 goto end;
471 }
472
473 static
474 int consumer_channel_timer_stop(timer_t *timer_id, int signal)
475 {
476 int ret = 0;
477
478 ret = timer_delete(*timer_id);
479 if (ret == -1) {
480 PERROR("timer_delete");
481 goto end;
482 }
483
484 consumer_timer_signal_thread_qs(signal);
485 *timer_id = 0;
486 end:
487 return ret;
488 }
489
490 /*
491 * Set the channel's switch timer.
492 */
493 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
494 unsigned int switch_timer_interval_us)
495 {
496 int ret;
497
498 assert(channel);
499 assert(channel->key);
500
501 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
502 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
503
504 channel->switch_timer_enabled = !!(ret == 0);
505 }
506
507 /*
508 * Stop and delete the channel's switch timer.
509 */
510 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
511 {
512 int ret;
513
514 assert(channel);
515
516 ret = consumer_channel_timer_stop(&channel->switch_timer,
517 LTTNG_CONSUMER_SIG_SWITCH);
518 if (ret == -1) {
519 ERR("Failed to stop switch timer");
520 }
521
522 channel->switch_timer_enabled = 0;
523 }
524
525 /*
526 * Set the channel's live timer.
527 */
528 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
529 unsigned int live_timer_interval_us)
530 {
531 int ret;
532
533 assert(channel);
534 assert(channel->key);
535
536 ret = consumer_channel_timer_start(&channel->live_timer, channel,
537 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
538
539 channel->live_timer_enabled = !!(ret == 0);
540 }
541
542 /*
543 * Stop and delete the channel's live timer.
544 */
545 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
546 {
547 int ret;
548
549 assert(channel);
550
551 ret = consumer_channel_timer_stop(&channel->live_timer,
552 LTTNG_CONSUMER_SIG_LIVE);
553 if (ret == -1) {
554 ERR("Failed to stop live timer");
555 }
556
557 channel->live_timer_enabled = 0;
558 }
559
560 /*
561 * Set the channel's monitoring timer.
562 *
563 * Returns a negative value on error, 0 if a timer was created, and
564 * a positive value if no timer was created (not an error).
565 */
566 int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
567 unsigned int monitor_timer_interval_us)
568 {
569 int ret;
570
571 assert(channel);
572 assert(channel->key);
573 assert(!channel->monitor_timer_enabled);
574
575 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
576 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
577 channel->monitor_timer_enabled = !!(ret == 0);
578 return ret;
579 }
580
581 /*
582 * Stop and delete the channel's monitoring timer.
583 */
584 int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
585 {
586 int ret;
587
588 assert(channel);
589 assert(channel->monitor_timer_enabled);
590
591 ret = consumer_channel_timer_stop(&channel->monitor_timer,
592 LTTNG_CONSUMER_SIG_MONITOR);
593 if (ret == -1) {
594 ERR("Failed to stop live timer");
595 goto end;
596 }
597
598 channel->monitor_timer_enabled = 0;
599 end:
600 return ret;
601 }
602
603 /*
604 * Block the RT signals for the entire process. It must be called from the
605 * consumer main before creating the threads
606 */
607 int consumer_signal_init(void)
608 {
609 int ret;
610 sigset_t mask;
611
612 /* Block signal for entire process, so only our thread processes it. */
613 setmask(&mask);
614 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
615 if (ret) {
616 errno = ret;
617 PERROR("pthread_sigmask");
618 return -1;
619 }
620 return 0;
621 }
622
623 static
624 int sample_channel_positions(struct lttng_consumer_channel *channel,
625 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
626 sample_positions_cb sample, get_consumed_cb get_consumed,
627 get_produced_cb get_produced)
628 {
629 int ret = 0;
630 struct lttng_ht_iter iter;
631 struct lttng_consumer_stream *stream;
632 bool empty_channel = true;
633 uint64_t high = 0, low = UINT64_MAX;
634 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
635
636 *_total_consumed = 0;
637
638 rcu_read_lock();
639
640 cds_lfht_for_each_entry_duplicate(ht->ht,
641 ht->hash_fct(&channel->key, lttng_ht_seed),
642 ht->match_fct, &channel->key,
643 &iter.iter, stream, node_channel_id.node) {
644 unsigned long produced, consumed, usage;
645
646 empty_channel = false;
647
648 pthread_mutex_lock(&stream->lock);
649 if (cds_lfht_is_node_deleted(&stream->node.node)) {
650 goto next;
651 }
652
653 ret = sample(stream);
654 if (ret) {
655 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
656 pthread_mutex_unlock(&stream->lock);
657 goto end;
658 }
659 ret = get_consumed(stream, &consumed);
660 if (ret) {
661 ERR("Failed to get buffer consumed position in monitor timer");
662 pthread_mutex_unlock(&stream->lock);
663 goto end;
664 }
665 ret = get_produced(stream, &produced);
666 if (ret) {
667 ERR("Failed to get buffer produced position in monitor timer");
668 pthread_mutex_unlock(&stream->lock);
669 goto end;
670 }
671
672 usage = produced - consumed;
673 high = (usage > high) ? usage : high;
674 low = (usage < low) ? usage : low;
675
676 /*
677 * We don't use consumed here for 2 reasons:
678 * - output_written takes into account the padding written in the
679 * tracefiles when we stop the session;
680 * - the consumed position is not the accurate representation of what
681 * was extracted from a buffer in overwrite mode.
682 */
683 *_total_consumed += stream->output_written;
684 next:
685 pthread_mutex_unlock(&stream->lock);
686 }
687
688 *_highest_use = high;
689 *_lowest_use = low;
690 end:
691 rcu_read_unlock();
692 if (empty_channel) {
693 ret = -1;
694 }
695 return ret;
696 }
697
698 /*
699 * Execute action on a monitor timer.
700 */
701 static
702 void monitor_timer(struct lttng_consumer_channel *channel)
703 {
704 int ret;
705 int channel_monitor_pipe =
706 consumer_timer_thread_get_channel_monitor_pipe();
707 struct lttcomm_consumer_channel_monitor_msg msg = {
708 .key = channel->key,
709 };
710 sample_positions_cb sample;
711 get_consumed_cb get_consumed;
712 get_produced_cb get_produced;
713 uint64_t lowest = 0, highest = 0, total_consumed = 0;
714
715 assert(channel);
716
717 if (channel_monitor_pipe < 0) {
718 return;
719 }
720
721 switch (consumer_data.type) {
722 case LTTNG_CONSUMER_KERNEL:
723 sample = lttng_kconsumer_sample_snapshot_positions;
724 get_consumed = lttng_kconsumer_get_consumed_snapshot;
725 get_produced = lttng_kconsumer_get_produced_snapshot;
726 break;
727 case LTTNG_CONSUMER32_UST:
728 case LTTNG_CONSUMER64_UST:
729 sample = lttng_ustconsumer_sample_snapshot_positions;
730 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
731 get_produced = lttng_ustconsumer_get_produced_snapshot;
732 break;
733 default:
734 abort();
735 }
736
737 ret = sample_channel_positions(channel, &highest, &lowest,
738 &total_consumed, sample, get_consumed, get_produced);
739 if (ret) {
740 return;
741 }
742 msg.highest = highest;
743 msg.lowest = lowest;
744 msg.total_consumed = total_consumed;
745
746 /*
747 * Writes performed here are assumed to be atomic which is only
748 * guaranteed for sizes < than PIPE_BUF.
749 */
750 assert(sizeof(msg) <= PIPE_BUF);
751
752 do {
753 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
754 } while (ret == -1 && errno == EINTR);
755 if (ret == -1) {
756 if (errno == EAGAIN) {
757 /* Not an error, the sample is merely dropped. */
758 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
759 channel->key);
760 } else {
761 PERROR("write to the channel monitor pipe");
762 }
763 } else {
764 DBG("Sent channel monitoring sample for channel key %" PRIu64
765 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
766 channel->key, msg.highest, msg.lowest);
767 }
768 }
769
770 int consumer_timer_thread_get_channel_monitor_pipe(void)
771 {
772 return uatomic_read(&channel_monitor_pipe);
773 }
774
775 int consumer_timer_thread_set_channel_monitor_pipe(int fd)
776 {
777 int ret;
778
779 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
780 if (ret != -1) {
781 ret = -1;
782 goto end;
783 }
784 ret = 0;
785 end:
786 return ret;
787 }
788
789 /*
790 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
791 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
792 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
793 */
794 void *consumer_timer_thread(void *data)
795 {
796 int signr;
797 sigset_t mask;
798 siginfo_t info;
799 struct lttng_consumer_local_data *ctx = data;
800
801 rcu_register_thread();
802
803 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
804
805 if (testpoint(consumerd_thread_metadata_timer)) {
806 goto error_testpoint;
807 }
808
809 health_code_update();
810
811 /* Only self thread will receive signal mask. */
812 setmask(&mask);
813 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
814
815 while (1) {
816 health_code_update();
817
818 health_poll_entry();
819 signr = sigwaitinfo(&mask, &info);
820 health_poll_exit();
821
822 /*
823 * NOTE: cascading conditions are used instead of a switch case
824 * since the use of SIGRTMIN in the definition of the signals'
825 * values prevents the reduction to an integer constant.
826 */
827 if (signr == -1) {
828 if (errno != EINTR) {
829 PERROR("sigwaitinfo");
830 }
831 continue;
832 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
833 metadata_switch_timer(ctx, &info);
834 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
835 cmm_smp_mb();
836 CMM_STORE_SHARED(timer_signal.qs_done, 1);
837 cmm_smp_mb();
838 DBG("Signal timer metadata thread teardown");
839 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
840 live_timer(ctx, &info);
841 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
842 struct lttng_consumer_channel *channel;
843
844 channel = info.si_value.sival_ptr;
845 monitor_timer(channel);
846 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
847 assert(CMM_LOAD_SHARED(consumer_quit));
848 goto end;
849 } else {
850 ERR("Unexpected signal %d\n", info.si_signo);
851 }
852 }
853
854 error_testpoint:
855 /* Only reached in testpoint error */
856 health_error();
857 end:
858 health_unregister(health_consumerd);
859 rcu_unregister_thread();
860 return NULL;
861 }
This page took 0.045918 seconds and 4 git commands to generate.