Fix: statements with side-effects in assert statements
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3 1/*
ab5be9fa
MJ
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
331744e3 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
331744e3 6 *
331744e3
JD
7 */
8
6c1c0768 9#define _LGPL_SOURCE
331744e3
JD
10#include <assert.h>
11#include <inttypes.h>
12#include <signal.h>
13
51a9e1c7 14#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 15#include <common/common.h>
f263b7fd 16#include <common/compat/endian.h>
d3e2ba59
JD
17#include <common/kernel-ctl/kernel-ctl.h>
18#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
19#include <common/consumer/consumer-stream.h>
20#include <common/consumer/consumer-timer.h>
21#include <common/consumer/consumer-testpoint.h>
22#include <common/ust-consumer/ust-consumer.h>
331744e3 23
e9404c27
JG
24typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
25typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
26 unsigned long *consumed);
27typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
28 unsigned long *produced);
fad4b619 29typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
e9404c27 30
2b8f8754
MD
31static struct timer_signal_data timer_signal = {
32 .tid = 0,
33 .setup_done = 0,
34 .qs_done = 0,
35 .lock = PTHREAD_MUTEX_INITIALIZER,
36};
331744e3
JD
37
38/*
39 * Set custom signal mask to current thread.
40 */
41static void setmask(sigset_t *mask)
42{
43 int ret;
44
45 ret = sigemptyset(mask);
46 if (ret) {
47 PERROR("sigemptyset");
48 }
49 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
50 if (ret) {
d3e2ba59 51 PERROR("sigaddset switch");
331744e3
JD
52 }
53 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
54 if (ret) {
d3e2ba59
JD
55 PERROR("sigaddset teardown");
56 }
57 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
58 if (ret) {
59 PERROR("sigaddset live");
331744e3 60 }
e9404c27
JG
61 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
62 if (ret) {
63 PERROR("sigaddset monitor");
64 }
13675d0e
MD
65 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
66 if (ret) {
67 PERROR("sigaddset exit");
68 }
331744e3
JD
69}
70
fa29bfbf 71static int the_channel_monitor_pipe = -1;
e9404c27 72
331744e3
JD
73/*
74 * Execute action on a timer switch.
d98a47c7
MD
75 *
76 * Beware: metadata_switch_timer() should *never* take a mutex also held
77 * while consumer_timer_switch_stop() is called. It would result in
78 * deadlocks.
331744e3
JD
79 */
80static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 81 siginfo_t *si)
331744e3
JD
82{
83 int ret;
84 struct lttng_consumer_channel *channel;
85
86 channel = si->si_value.sival_ptr;
87 assert(channel);
88
4419b4fb
MD
89 if (channel->switch_timer_error) {
90 return;
91 }
92
331744e3
JD
93 DBG("Switch timer for channel %" PRIu64, channel->key);
94 switch (ctx->type) {
95 case LTTNG_CONSUMER32_UST:
96 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
97 /*
98 * Locks taken by lttng_ustconsumer_request_metadata():
99 * - metadata_socket_lock
100 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 101 * - channel->metadata_cache->lock
4fa3dc0e 102 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
103 * - channel->timer_lock
104 * - channel->metadata_cache->lock
4fa3dc0e 105 *
5e41ebe1
MD
106 * Ensure that neither consumer_data.lock nor
107 * channel->lock are taken within this function, since
108 * they are held while consumer_timer_switch_stop() is
109 * called.
4fa3dc0e 110 */
94d49140 111 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 112 if (ret < 0) {
4419b4fb 113 channel->switch_timer_error = 1;
331744e3
JD
114 }
115 break;
116 case LTTNG_CONSUMER_KERNEL:
117 case LTTNG_CONSUMER_UNKNOWN:
118 assert(0);
119 break;
120 }
121}
122
528f2ffa
JD
123static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
124 uint64_t stream_id)
d3e2ba59
JD
125{
126 int ret;
50adc264 127 struct ctf_packet_index index;
d3e2ba59
JD
128
129 memset(&index, 0, sizeof(index));
528f2ffa 130 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
131 index.timestamp_end = htobe64(ts);
132 ret = consumer_stream_write_index(stream, &index);
133 if (ret < 0) {
134 goto error;
135 }
136
137error:
138 return ret;
139}
140
c585821b 141int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 142{
528f2ffa 143 uint64_t ts, stream_id;
d3e2ba59
JD
144 int ret;
145
d3e2ba59
JD
146 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
147 if (ret < 0) {
148 ERR("Failed to get the current timestamp");
c585821b 149 goto end;
d3e2ba59
JD
150 }
151 ret = kernctl_buffer_flush(stream->wait_fd);
152 if (ret < 0) {
153 ERR("Failed to flush kernel stream");
c585821b 154 goto end;
d3e2ba59
JD
155 }
156 ret = kernctl_snapshot(stream->wait_fd);
157 if (ret < 0) {
32af2c95 158 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 159 PERROR("live timer kernel snapshot");
d3e2ba59 160 ret = -1;
c585821b 161 goto end;
d3e2ba59 162 }
528f2ffa
JD
163 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
164 if (ret < 0) {
165 PERROR("kernctl_get_stream_id");
c585821b 166 goto end;
528f2ffa 167 }
d3e2ba59 168 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 169 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 170 if (ret < 0) {
c585821b 171 goto end;
d3e2ba59
JD
172 }
173 }
174 ret = 0;
c585821b 175end:
d3e2ba59
JD
176 return ret;
177}
178
fad4b619
JG
179static int check_stream(struct lttng_consumer_stream *stream,
180 flush_index_cb flush_index)
d3e2ba59 181{
d3e2ba59
JD
182 int ret;
183
d3e2ba59
JD
184 /*
185 * While holding the stream mutex, try to take a snapshot, if it
186 * succeeds, it means that data is ready to be sent, just let the data
187 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
188 * means that there is no data to read after the flush, so we can
189 * safely send the empty index.
c585821b
MD
190 *
191 * Doing a trylock and checking if waiting on metadata if
192 * trylock fails. Bail out of the stream is indeed waiting for
193 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 194 */
c585821b
MD
195 for (;;) {
196 ret = pthread_mutex_trylock(&stream->lock);
197 switch (ret) {
198 case 0:
199 break; /* We have the lock. */
200 case EBUSY:
201 pthread_mutex_lock(&stream->metadata_timer_lock);
202 if (stream->waiting_on_metadata) {
203 ret = 0;
204 stream->missed_metadata_flush = true;
205 pthread_mutex_unlock(&stream->metadata_timer_lock);
206 goto end; /* Bail out. */
207 }
208 pthread_mutex_unlock(&stream->metadata_timer_lock);
209 /* Try again. */
210 caa_cpu_relax();
211 continue;
212 default:
213 ERR("Unexpected pthread_mutex_trylock error %d", ret);
214 ret = -1;
215 goto end;
216 }
217 break;
218 }
fad4b619 219 ret = flush_index(stream);
c585821b
MD
220 pthread_mutex_unlock(&stream->lock);
221end:
222 return ret;
223}
224
225int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
226{
227 uint64_t ts, stream_id;
228 int ret;
229
94d49140
JD
230 ret = cds_lfht_is_node_deleted(&stream->node.node);
231 if (ret) {
c585821b 232 goto end;
94d49140
JD
233 }
234
84a182ce 235 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
236 if (ret < 0) {
237 ERR("Failed to get the current timestamp");
c585821b 238 goto end;
d3e2ba59 239 }
881fc67f
MD
240 ret = lttng_ustconsumer_flush_buffer(stream, 1);
241 if (ret < 0) {
242 ERR("Failed to flush buffer while flushing index");
243 goto end;
244 }
84a182ce 245 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 246 if (ret < 0) {
94d49140 247 if (ret != -EAGAIN) {
d3e2ba59
JD
248 ERR("Taking UST snapshot");
249 ret = -1;
c585821b 250 goto end;
d3e2ba59 251 }
70190e1c 252 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa 253 if (ret < 0) {
b623cb6a 254 PERROR("lttng_ust_ctl_get_stream_id");
c585821b 255 goto end;
528f2ffa 256 }
d3e2ba59 257 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 258 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 259 if (ret < 0) {
c585821b 260 goto end;
d3e2ba59
JD
261 }
262 }
263 ret = 0;
c585821b
MD
264end:
265 return ret;
266}
d3e2ba59 267
d3e2ba59
JD
268/*
269 * Execute action on a live timer
270 */
271static void live_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 272 siginfo_t *si)
d3e2ba59
JD
273{
274 int ret;
275 struct lttng_consumer_channel *channel;
276 struct lttng_consumer_stream *stream;
d3e2ba59 277 struct lttng_ht_iter iter;
fa29bfbf 278 const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
fad4b619
JG
279 const flush_index_cb flush_index =
280 ctx->type == LTTNG_CONSUMER_KERNEL ?
281 consumer_flush_kernel_index :
282 consumer_flush_ust_index;
d3e2ba59
JD
283
284 channel = si->si_value.sival_ptr;
285 assert(channel);
286
287 if (channel->switch_timer_error) {
288 goto error;
289 }
d3e2ba59
JD
290
291 DBG("Live timer for channel %" PRIu64, channel->key);
292
293 rcu_read_lock();
fad4b619
JG
294 cds_lfht_for_each_entry_duplicate(ht->ht,
295 ht->hash_fct(&channel->key, lttng_ht_seed),
296 ht->match_fct, &channel->key, &iter.iter,
297 stream, node_channel_id.node) {
298 ret = check_stream(stream, flush_index);
299 if (ret < 0) {
300 goto error_unlock;
d3e2ba59 301 }
d3e2ba59
JD
302 }
303
304error_unlock:
305 rcu_read_unlock();
306
307error:
308 return;
309}
310
2b8f8754
MD
311static
312void consumer_timer_signal_thread_qs(unsigned int signr)
313{
314 sigset_t pending_set;
315 int ret;
316
317 /*
318 * We need to be the only thread interacting with the thread
319 * that manages signals for teardown synchronization.
320 */
321 pthread_mutex_lock(&timer_signal.lock);
322
323 /* Ensure we don't have any signal queued for this channel. */
324 for (;;) {
325 ret = sigemptyset(&pending_set);
326 if (ret == -1) {
327 PERROR("sigemptyset");
328 }
329 ret = sigpending(&pending_set);
330 if (ret == -1) {
331 PERROR("sigpending");
332 }
f05a6b6d 333 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
334 break;
335 }
336 caa_cpu_relax();
337 }
338
339 /*
340 * From this point, no new signal handler will be fired that would try to
341 * access "chan". However, we still need to wait for any currently
342 * executing handler to complete.
343 */
344 cmm_smp_mb();
345 CMM_STORE_SHARED(timer_signal.qs_done, 0);
346 cmm_smp_mb();
347
348 /*
349 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
350 * up.
351 */
352 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
353
354 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
355 caa_cpu_relax();
356 }
357 cmm_smp_mb();
358
359 pthread_mutex_unlock(&timer_signal.lock);
360}
361
331744e3 362/*
e9404c27
JG
363 * Start a timer channel timer which will fire at a given interval
364 * (timer_interval_us)and fire a given signal (signal).
365 *
366 * Returns a negative value on error, 0 if a timer was created, and
367 * a positive value if no timer was created (not an error).
331744e3 368 */
e9404c27
JG
369static
370int consumer_channel_timer_start(timer_t *timer_id,
371 struct lttng_consumer_channel *channel,
372 unsigned int timer_interval_us, int signal)
331744e3 373{
e9404c27 374 int ret = 0, delete_ret;
389b8e8f 375 struct sigevent sev = {};
331744e3
JD
376 struct itimerspec its;
377
378 assert(channel);
379 assert(channel->key);
380
e9404c27
JG
381 if (timer_interval_us == 0) {
382 /* No creation needed; not an error. */
383 ret = 1;
384 goto end;
331744e3
JD
385 }
386
387 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 388 sev.sigev_signo = signal;
331744e3 389 sev.sigev_value.sival_ptr = channel;
e9404c27 390 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
391 if (ret == -1) {
392 PERROR("timer_create");
e9404c27 393 goto end;
331744e3 394 }
331744e3 395
e9404c27
JG
396 its.it_value.tv_sec = timer_interval_us / 1000000;
397 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
398 its.it_interval.tv_sec = its.it_value.tv_sec;
399 its.it_interval.tv_nsec = its.it_value.tv_nsec;
400
e9404c27 401 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
402 if (ret == -1) {
403 PERROR("timer_settime");
e9404c27
JG
404 goto error_destroy_timer;
405 }
406end:
407 return ret;
408error_destroy_timer:
409 delete_ret = timer_delete(*timer_id);
410 if (delete_ret == -1) {
411 PERROR("timer_delete");
412 }
413 goto end;
414}
415
416static
417int consumer_channel_timer_stop(timer_t *timer_id, int signal)
418{
419 int ret = 0;
420
421 ret = timer_delete(*timer_id);
422 if (ret == -1) {
423 PERROR("timer_delete");
424 goto end;
331744e3 425 }
e9404c27
JG
426
427 consumer_timer_signal_thread_qs(signal);
428 *timer_id = 0;
429end:
430 return ret;
431}
432
433/*
434 * Set the channel's switch timer.
435 */
436void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
437 unsigned int switch_timer_interval_us)
438{
439 int ret;
440
441 assert(channel);
442 assert(channel->key);
443
444 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
445 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
446
447 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
448}
449
450/*
e9404c27 451 * Stop and delete the channel's switch timer.
331744e3
JD
452 */
453void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
454{
455 int ret;
331744e3
JD
456
457 assert(channel);
458
e9404c27
JG
459 ret = consumer_channel_timer_stop(&channel->switch_timer,
460 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 461 if (ret == -1) {
e9404c27 462 ERR("Failed to stop switch timer");
331744e3
JD
463 }
464
2b8f8754 465 channel->switch_timer_enabled = 0;
331744e3
JD
466}
467
d3e2ba59 468/*
e9404c27 469 * Set the channel's live timer.
d3e2ba59
JD
470 */
471void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 472 unsigned int live_timer_interval_us)
d3e2ba59
JD
473{
474 int ret;
d3e2ba59
JD
475
476 assert(channel);
477 assert(channel->key);
478
e9404c27
JG
479 ret = consumer_channel_timer_start(&channel->live_timer, channel,
480 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 481
e9404c27
JG
482 channel->live_timer_enabled = !!(ret == 0);
483}
d3e2ba59 484
e9404c27
JG
485/*
486 * Stop and delete the channel's live timer.
487 */
488void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
489{
490 int ret;
491
492 assert(channel);
d3e2ba59 493
e9404c27
JG
494 ret = consumer_channel_timer_stop(&channel->live_timer,
495 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 496 if (ret == -1) {
e9404c27 497 ERR("Failed to stop live timer");
d3e2ba59 498 }
e9404c27
JG
499
500 channel->live_timer_enabled = 0;
d3e2ba59
JD
501}
502
503/*
e9404c27
JG
504 * Set the channel's monitoring timer.
505 *
506 * Returns a negative value on error, 0 if a timer was created, and
507 * a positive value if no timer was created (not an error).
d3e2ba59 508 */
e9404c27
JG
509int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
510 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
511{
512 int ret;
513
514 assert(channel);
e9404c27
JG
515 assert(channel->key);
516 assert(!channel->monitor_timer_enabled);
d3e2ba59 517
e9404c27
JG
518 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
519 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
520 channel->monitor_timer_enabled = !!(ret == 0);
521 return ret;
522}
523
524/*
525 * Stop and delete the channel's monitoring timer.
526 */
527int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
528{
529 int ret;
530
531 assert(channel);
532 assert(channel->monitor_timer_enabled);
533
534 ret = consumer_channel_timer_stop(&channel->monitor_timer,
535 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 536 if (ret == -1) {
e9404c27
JG
537 ERR("Failed to stop live timer");
538 goto end;
d3e2ba59
JD
539 }
540
e9404c27
JG
541 channel->monitor_timer_enabled = 0;
542end:
543 return ret;
d3e2ba59
JD
544}
545
331744e3
JD
546/*
547 * Block the RT signals for the entire process. It must be called from the
548 * consumer main before creating the threads
549 */
73664f81 550int consumer_signal_init(void)
331744e3
JD
551{
552 int ret;
553 sigset_t mask;
554
555 /* Block signal for entire process, so only our thread processes it. */
556 setmask(&mask);
557 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
558 if (ret) {
559 errno = ret;
560 PERROR("pthread_sigmask");
73664f81 561 return -1;
331744e3 562 }
73664f81 563 return 0;
331744e3
JD
564}
565
e9404c27
JG
566static
567int sample_channel_positions(struct lttng_consumer_channel *channel,
e8360425 568 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
e9404c27
JG
569 sample_positions_cb sample, get_consumed_cb get_consumed,
570 get_produced_cb get_produced)
571{
23bc9bb5 572 int ret = 0;
e9404c27
JG
573 struct lttng_ht_iter iter;
574 struct lttng_consumer_stream *stream;
575 bool empty_channel = true;
576 uint64_t high = 0, low = UINT64_MAX;
fa29bfbf 577 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
e9404c27 578
e8360425
JD
579 *_total_consumed = 0;
580
e9404c27
JG
581 rcu_read_lock();
582
583 cds_lfht_for_each_entry_duplicate(ht->ht,
584 ht->hash_fct(&channel->key, lttng_ht_seed),
585 ht->match_fct, &channel->key,
586 &iter.iter, stream, node_channel_id.node) {
587 unsigned long produced, consumed, usage;
588
589 empty_channel = false;
590
591 pthread_mutex_lock(&stream->lock);
592 if (cds_lfht_is_node_deleted(&stream->node.node)) {
593 goto next;
594 }
595
596 ret = sample(stream);
597 if (ret) {
598 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
599 pthread_mutex_unlock(&stream->lock);
600 goto end;
601 }
602 ret = get_consumed(stream, &consumed);
603 if (ret) {
604 ERR("Failed to get buffer consumed position in monitor timer");
605 pthread_mutex_unlock(&stream->lock);
606 goto end;
607 }
608 ret = get_produced(stream, &produced);
609 if (ret) {
610 ERR("Failed to get buffer produced position in monitor timer");
611 pthread_mutex_unlock(&stream->lock);
612 goto end;
613 }
614
615 usage = produced - consumed;
616 high = (usage > high) ? usage : high;
617 low = (usage < low) ? usage : low;
e8360425
JD
618
619 /*
620 * We don't use consumed here for 2 reasons:
621 * - output_written takes into account the padding written in the
622 * tracefiles when we stop the session;
623 * - the consumed position is not the accurate representation of what
624 * was extracted from a buffer in overwrite mode.
625 */
626 *_total_consumed += stream->output_written;
e9404c27
JG
627 next:
628 pthread_mutex_unlock(&stream->lock);
629 }
630
631 *_highest_use = high;
632 *_lowest_use = low;
633end:
634 rcu_read_unlock();
635 if (empty_channel) {
636 ret = -1;
637 }
638 return ret;
639}
640
641/*
642 * Execute action on a monitor timer.
643 */
644static
5704917f 645void monitor_timer(struct lttng_consumer_channel *channel)
e9404c27
JG
646{
647 int ret;
648 int channel_monitor_pipe =
649 consumer_timer_thread_get_channel_monitor_pipe();
650 struct lttcomm_consumer_channel_monitor_msg msg = {
651 .key = channel->key,
652 };
653 sample_positions_cb sample;
654 get_consumed_cb get_consumed;
655 get_produced_cb get_produced;
ef4b086e 656 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27
JG
657
658 assert(channel);
e9404c27
JG
659
660 if (channel_monitor_pipe < 0) {
873dda4e 661 return;
e9404c27
JG
662 }
663
fa29bfbf 664 switch (the_consumer_data.type) {
e9404c27
JG
665 case LTTNG_CONSUMER_KERNEL:
666 sample = lttng_kconsumer_sample_snapshot_positions;
667 get_consumed = lttng_kconsumer_get_consumed_snapshot;
668 get_produced = lttng_kconsumer_get_produced_snapshot;
669 break;
670 case LTTNG_CONSUMER32_UST:
671 case LTTNG_CONSUMER64_UST:
672 sample = lttng_ustconsumer_sample_snapshot_positions;
673 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
674 get_produced = lttng_ustconsumer_get_produced_snapshot;
675 break;
676 default:
677 abort();
678 }
679
ef4b086e
JG
680 ret = sample_channel_positions(channel, &highest, &lowest,
681 &total_consumed, sample, get_consumed, get_produced);
e9404c27 682 if (ret) {
873dda4e 683 return;
e9404c27 684 }
ef4b086e
JG
685 msg.highest = highest;
686 msg.lowest = lowest;
687 msg.total_consumed = total_consumed;
e9404c27
JG
688
689 /*
690 * Writes performed here are assumed to be atomic which is only
691 * guaranteed for sizes < than PIPE_BUF.
692 */
693 assert(sizeof(msg) <= PIPE_BUF);
694
695 do {
696 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
697 } while (ret == -1 && errno == EINTR);
698 if (ret == -1) {
699 if (errno == EAGAIN) {
700 /* Not an error, the sample is merely dropped. */
701 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
702 channel->key);
703 } else {
704 PERROR("write to the channel monitor pipe");
705 }
706 } else {
707 DBG("Sent channel monitoring sample for channel key %" PRIu64
708 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
709 channel->key, msg.highest, msg.lowest);
710 }
e9404c27
JG
711}
712
713int consumer_timer_thread_get_channel_monitor_pipe(void)
714{
fa29bfbf 715 return uatomic_read(&the_channel_monitor_pipe);
e9404c27
JG
716}
717
718int consumer_timer_thread_set_channel_monitor_pipe(int fd)
719{
720 int ret;
721
fa29bfbf 722 ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
e9404c27
JG
723 if (ret != -1) {
724 ret = -1;
725 goto end;
726 }
727 ret = 0;
728end:
729 return ret;
730}
731
331744e3 732/*
d3e2ba59 733 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 734 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 735 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 736 */
d3e2ba59 737void *consumer_timer_thread(void *data)
331744e3
JD
738{
739 int signr;
740 sigset_t mask;
741 siginfo_t info;
742 struct lttng_consumer_local_data *ctx = data;
743
8a9acb74
MD
744 rcu_register_thread();
745
1fc79fb4
MD
746 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
747
2d57de81
MD
748 if (testpoint(consumerd_thread_metadata_timer)) {
749 goto error_testpoint;
750 }
751
9ce5646a
MD
752 health_code_update();
753
331744e3
JD
754 /* Only self thread will receive signal mask. */
755 setmask(&mask);
756 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
757
758 while (1) {
9ce5646a
MD
759 health_code_update();
760
761 health_poll_entry();
331744e3 762 signr = sigwaitinfo(&mask, &info);
9ce5646a 763 health_poll_exit();
e9404c27
JG
764
765 /*
766 * NOTE: cascading conditions are used instead of a switch case
767 * since the use of SIGRTMIN in the definition of the signals'
768 * values prevents the reduction to an integer constant.
769 */
331744e3
JD
770 if (signr == -1) {
771 if (errno != EINTR) {
772 PERROR("sigwaitinfo");
773 }
774 continue;
775 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 776 metadata_switch_timer(ctx, &info);
331744e3
JD
777 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
778 cmm_smp_mb();
779 CMM_STORE_SHARED(timer_signal.qs_done, 1);
780 cmm_smp_mb();
781 DBG("Signal timer metadata thread teardown");
d3e2ba59 782 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 783 live_timer(ctx, &info);
e9404c27
JG
784 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
785 struct lttng_consumer_channel *channel;
786
787 channel = info.si_value.sival_ptr;
5704917f 788 monitor_timer(channel);
13675d0e
MD
789 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
790 assert(CMM_LOAD_SHARED(consumer_quit));
791 goto end;
331744e3
JD
792 } else {
793 ERR("Unexpected signal %d\n", info.si_signo);
794 }
795 }
796
2d57de81
MD
797error_testpoint:
798 /* Only reached in testpoint error */
799 health_error();
13675d0e 800end:
1fc79fb4 801 health_unregister(health_consumerd);
8a9acb74 802 rcu_unregister_thread();
331744e3
JD
803 return NULL;
804}
This page took 0.087874 seconds and 4 git commands to generate.