Implement consumer ring buffer position sampling
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
e9404c27 24#include <lttng/ust-ctl.h>
51a9e1c7 25#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 26#include <common/common.h>
f263b7fd 27#include <common/compat/endian.h>
d3e2ba59
JD
28#include <common/kernel-ctl/kernel-ctl.h>
29#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
30#include <common/consumer/consumer-stream.h>
31#include <common/consumer/consumer-timer.h>
32#include <common/consumer/consumer-testpoint.h>
33#include <common/ust-consumer/ust-consumer.h>
331744e3 34
e9404c27
JG
35typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
36typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
37 unsigned long *consumed);
38typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
39 unsigned long *produced);
40
2b8f8754
MD
41static struct timer_signal_data timer_signal = {
42 .tid = 0,
43 .setup_done = 0,
44 .qs_done = 0,
45 .lock = PTHREAD_MUTEX_INITIALIZER,
46};
331744e3
JD
47
48/*
49 * Set custom signal mask to current thread.
50 */
51static void setmask(sigset_t *mask)
52{
53 int ret;
54
55 ret = sigemptyset(mask);
56 if (ret) {
57 PERROR("sigemptyset");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
60 if (ret) {
d3e2ba59 61 PERROR("sigaddset switch");
331744e3
JD
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
64 if (ret) {
d3e2ba59
JD
65 PERROR("sigaddset teardown");
66 }
67 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
68 if (ret) {
69 PERROR("sigaddset live");
331744e3 70 }
e9404c27
JG
71 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
72 if (ret) {
73 PERROR("sigaddset monitor");
74 }
331744e3
JD
75}
76
e9404c27
JG
77static int channel_monitor_pipe = -1;
78
331744e3
JD
79/*
80 * Execute action on a timer switch.
d98a47c7
MD
81 *
82 * Beware: metadata_switch_timer() should *never* take a mutex also held
83 * while consumer_timer_switch_stop() is called. It would result in
84 * deadlocks.
331744e3
JD
85 */
86static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
e9404c27 87 int sig, siginfo_t *si)
331744e3
JD
88{
89 int ret;
90 struct lttng_consumer_channel *channel;
91
92 channel = si->si_value.sival_ptr;
93 assert(channel);
94
4419b4fb
MD
95 if (channel->switch_timer_error) {
96 return;
97 }
98
331744e3
JD
99 DBG("Switch timer for channel %" PRIu64, channel->key);
100 switch (ctx->type) {
101 case LTTNG_CONSUMER32_UST:
102 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
103 /*
104 * Locks taken by lttng_ustconsumer_request_metadata():
105 * - metadata_socket_lock
106 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 107 * - channel->metadata_cache->lock
4fa3dc0e 108 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
109 * - channel->timer_lock
110 * - channel->metadata_cache->lock
4fa3dc0e 111 *
5e41ebe1
MD
112 * Ensure that neither consumer_data.lock nor
113 * channel->lock are taken within this function, since
114 * they are held while consumer_timer_switch_stop() is
115 * called.
4fa3dc0e 116 */
94d49140 117 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 118 if (ret < 0) {
4419b4fb 119 channel->switch_timer_error = 1;
331744e3
JD
120 }
121 break;
122 case LTTNG_CONSUMER_KERNEL:
123 case LTTNG_CONSUMER_UNKNOWN:
124 assert(0);
125 break;
126 }
127}
128
528f2ffa
JD
129static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
130 uint64_t stream_id)
d3e2ba59
JD
131{
132 int ret;
50adc264 133 struct ctf_packet_index index;
d3e2ba59
JD
134
135 memset(&index, 0, sizeof(index));
528f2ffa 136 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
137 index.timestamp_end = htobe64(ts);
138 ret = consumer_stream_write_index(stream, &index);
139 if (ret < 0) {
140 goto error;
141 }
142
143error:
144 return ret;
145}
146
c585821b 147int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 148{
528f2ffa 149 uint64_t ts, stream_id;
d3e2ba59
JD
150 int ret;
151
d3e2ba59
JD
152 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
153 if (ret < 0) {
154 ERR("Failed to get the current timestamp");
c585821b 155 goto end;
d3e2ba59
JD
156 }
157 ret = kernctl_buffer_flush(stream->wait_fd);
158 if (ret < 0) {
159 ERR("Failed to flush kernel stream");
c585821b 160 goto end;
d3e2ba59
JD
161 }
162 ret = kernctl_snapshot(stream->wait_fd);
163 if (ret < 0) {
32af2c95 164 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 165 PERROR("live timer kernel snapshot");
d3e2ba59 166 ret = -1;
c585821b 167 goto end;
d3e2ba59 168 }
528f2ffa
JD
169 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
170 if (ret < 0) {
171 PERROR("kernctl_get_stream_id");
c585821b 172 goto end;
528f2ffa 173 }
d3e2ba59 174 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 175 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 176 if (ret < 0) {
c585821b 177 goto end;
d3e2ba59
JD
178 }
179 }
180 ret = 0;
c585821b 181end:
d3e2ba59
JD
182 return ret;
183}
184
c585821b 185static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 186{
d3e2ba59
JD
187 int ret;
188
d3e2ba59
JD
189 /*
190 * While holding the stream mutex, try to take a snapshot, if it
191 * succeeds, it means that data is ready to be sent, just let the data
192 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
193 * means that there is no data to read after the flush, so we can
194 * safely send the empty index.
c585821b
MD
195 *
196 * Doing a trylock and checking if waiting on metadata if
197 * trylock fails. Bail out of the stream is indeed waiting for
198 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 199 */
c585821b
MD
200 for (;;) {
201 ret = pthread_mutex_trylock(&stream->lock);
202 switch (ret) {
203 case 0:
204 break; /* We have the lock. */
205 case EBUSY:
206 pthread_mutex_lock(&stream->metadata_timer_lock);
207 if (stream->waiting_on_metadata) {
208 ret = 0;
209 stream->missed_metadata_flush = true;
210 pthread_mutex_unlock(&stream->metadata_timer_lock);
211 goto end; /* Bail out. */
212 }
213 pthread_mutex_unlock(&stream->metadata_timer_lock);
214 /* Try again. */
215 caa_cpu_relax();
216 continue;
217 default:
218 ERR("Unexpected pthread_mutex_trylock error %d", ret);
219 ret = -1;
220 goto end;
221 }
222 break;
223 }
224 ret = consumer_flush_kernel_index(stream);
225 pthread_mutex_unlock(&stream->lock);
226end:
227 return ret;
228}
229
230int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
231{
232 uint64_t ts, stream_id;
233 int ret;
234
94d49140
JD
235 ret = cds_lfht_is_node_deleted(&stream->node.node);
236 if (ret) {
c585821b 237 goto end;
94d49140
JD
238 }
239
84a182ce 240 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
241 if (ret < 0) {
242 ERR("Failed to get the current timestamp");
c585821b 243 goto end;
d3e2ba59 244 }
84a182ce
DG
245 lttng_ustconsumer_flush_buffer(stream, 1);
246 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 247 if (ret < 0) {
94d49140 248 if (ret != -EAGAIN) {
d3e2ba59
JD
249 ERR("Taking UST snapshot");
250 ret = -1;
c585821b 251 goto end;
d3e2ba59 252 }
70190e1c 253 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
254 if (ret < 0) {
255 PERROR("ustctl_get_stream_id");
c585821b 256 goto end;
528f2ffa 257 }
d3e2ba59 258 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 259 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 260 if (ret < 0) {
c585821b 261 goto end;
d3e2ba59
JD
262 }
263 }
264 ret = 0;
c585821b
MD
265end:
266 return ret;
267}
d3e2ba59 268
c585821b
MD
269static int check_ust_stream(struct lttng_consumer_stream *stream)
270{
271 int ret;
272
273 assert(stream);
274 assert(stream->ustream);
275 /*
276 * While holding the stream mutex, try to take a snapshot, if it
277 * succeeds, it means that data is ready to be sent, just let the data
278 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
279 * means that there is no data to read after the flush, so we can
280 * safely send the empty index.
281 *
282 * Doing a trylock and checking if waiting on metadata if
283 * trylock fails. Bail out of the stream is indeed waiting for
284 * metadata to be pushed. Busy wait on trylock otherwise.
285 */
286 for (;;) {
287 ret = pthread_mutex_trylock(&stream->lock);
288 switch (ret) {
289 case 0:
290 break; /* We have the lock. */
291 case EBUSY:
292 pthread_mutex_lock(&stream->metadata_timer_lock);
293 if (stream->waiting_on_metadata) {
294 ret = 0;
295 stream->missed_metadata_flush = true;
296 pthread_mutex_unlock(&stream->metadata_timer_lock);
297 goto end; /* Bail out. */
298 }
299 pthread_mutex_unlock(&stream->metadata_timer_lock);
300 /* Try again. */
301 caa_cpu_relax();
302 continue;
303 default:
304 ERR("Unexpected pthread_mutex_trylock error %d", ret);
305 ret = -1;
306 goto end;
307 }
308 break;
309 }
310 ret = consumer_flush_ust_index(stream);
d3e2ba59 311 pthread_mutex_unlock(&stream->lock);
c585821b 312end:
d3e2ba59
JD
313 return ret;
314}
315
316/*
317 * Execute action on a live timer
318 */
319static void live_timer(struct lttng_consumer_local_data *ctx,
e9404c27 320 int sig, siginfo_t *si)
d3e2ba59
JD
321{
322 int ret;
323 struct lttng_consumer_channel *channel;
324 struct lttng_consumer_stream *stream;
325 struct lttng_ht *ht;
326 struct lttng_ht_iter iter;
327
328 channel = si->si_value.sival_ptr;
329 assert(channel);
330
331 if (channel->switch_timer_error) {
332 goto error;
333 }
334 ht = consumer_data.stream_per_chan_id_ht;
335
336 DBG("Live timer for channel %" PRIu64, channel->key);
337
338 rcu_read_lock();
339 switch (ctx->type) {
340 case LTTNG_CONSUMER32_UST:
341 case LTTNG_CONSUMER64_UST:
342 cds_lfht_for_each_entry_duplicate(ht->ht,
343 ht->hash_fct(&channel->key, lttng_ht_seed),
344 ht->match_fct, &channel->key, &iter.iter,
345 stream, node_channel_id.node) {
346 ret = check_ust_stream(stream);
347 if (ret < 0) {
348 goto error_unlock;
349 }
350 }
351 break;
352 case LTTNG_CONSUMER_KERNEL:
353 cds_lfht_for_each_entry_duplicate(ht->ht,
354 ht->hash_fct(&channel->key, lttng_ht_seed),
355 ht->match_fct, &channel->key, &iter.iter,
356 stream, node_channel_id.node) {
357 ret = check_kernel_stream(stream);
358 if (ret < 0) {
359 goto error_unlock;
360 }
361 }
362 break;
363 case LTTNG_CONSUMER_UNKNOWN:
364 assert(0);
365 break;
366 }
367
368error_unlock:
369 rcu_read_unlock();
370
371error:
372 return;
373}
374
2b8f8754
MD
375static
376void consumer_timer_signal_thread_qs(unsigned int signr)
377{
378 sigset_t pending_set;
379 int ret;
380
381 /*
382 * We need to be the only thread interacting with the thread
383 * that manages signals for teardown synchronization.
384 */
385 pthread_mutex_lock(&timer_signal.lock);
386
387 /* Ensure we don't have any signal queued for this channel. */
388 for (;;) {
389 ret = sigemptyset(&pending_set);
390 if (ret == -1) {
391 PERROR("sigemptyset");
392 }
393 ret = sigpending(&pending_set);
394 if (ret == -1) {
395 PERROR("sigpending");
396 }
397 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
398 break;
399 }
400 caa_cpu_relax();
401 }
402
403 /*
404 * From this point, no new signal handler will be fired that would try to
405 * access "chan". However, we still need to wait for any currently
406 * executing handler to complete.
407 */
408 cmm_smp_mb();
409 CMM_STORE_SHARED(timer_signal.qs_done, 0);
410 cmm_smp_mb();
411
412 /*
413 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
414 * up.
415 */
416 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
417
418 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
419 caa_cpu_relax();
420 }
421 cmm_smp_mb();
422
423 pthread_mutex_unlock(&timer_signal.lock);
424}
425
331744e3 426/*
e9404c27
JG
427 * Start a timer channel timer which will fire at a given interval
428 * (timer_interval_us)and fire a given signal (signal).
429 *
430 * Returns a negative value on error, 0 if a timer was created, and
431 * a positive value if no timer was created (not an error).
331744e3 432 */
e9404c27
JG
433static
434int consumer_channel_timer_start(timer_t *timer_id,
435 struct lttng_consumer_channel *channel,
436 unsigned int timer_interval_us, int signal)
331744e3 437{
e9404c27 438 int ret = 0, delete_ret;
331744e3
JD
439 struct sigevent sev;
440 struct itimerspec its;
441
442 assert(channel);
443 assert(channel->key);
444
e9404c27
JG
445 if (timer_interval_us == 0) {
446 /* No creation needed; not an error. */
447 ret = 1;
448 goto end;
331744e3
JD
449 }
450
451 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 452 sev.sigev_signo = signal;
331744e3 453 sev.sigev_value.sival_ptr = channel;
e9404c27 454 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
455 if (ret == -1) {
456 PERROR("timer_create");
e9404c27 457 goto end;
331744e3 458 }
331744e3 459
e9404c27
JG
460 its.it_value.tv_sec = timer_interval_us / 1000000;
461 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
462 its.it_interval.tv_sec = its.it_value.tv_sec;
463 its.it_interval.tv_nsec = its.it_value.tv_nsec;
464
e9404c27 465 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
466 if (ret == -1) {
467 PERROR("timer_settime");
e9404c27
JG
468 goto error_destroy_timer;
469 }
470end:
471 return ret;
472error_destroy_timer:
473 delete_ret = timer_delete(*timer_id);
474 if (delete_ret == -1) {
475 PERROR("timer_delete");
476 }
477 goto end;
478}
479
480static
481int consumer_channel_timer_stop(timer_t *timer_id, int signal)
482{
483 int ret = 0;
484
485 ret = timer_delete(*timer_id);
486 if (ret == -1) {
487 PERROR("timer_delete");
488 goto end;
331744e3 489 }
e9404c27
JG
490
491 consumer_timer_signal_thread_qs(signal);
492 *timer_id = 0;
493end:
494 return ret;
495}
496
497/*
498 * Set the channel's switch timer.
499 */
500void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
501 unsigned int switch_timer_interval_us)
502{
503 int ret;
504
505 assert(channel);
506 assert(channel->key);
507
508 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
509 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
510
511 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
512}
513
514/*
e9404c27 515 * Stop and delete the channel's switch timer.
331744e3
JD
516 */
517void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
518{
519 int ret;
331744e3
JD
520
521 assert(channel);
522
e9404c27
JG
523 ret = consumer_channel_timer_stop(&channel->switch_timer,
524 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 525 if (ret == -1) {
e9404c27 526 ERR("Failed to stop switch timer");
331744e3
JD
527 }
528
2b8f8754 529 channel->switch_timer_enabled = 0;
331744e3
JD
530}
531
d3e2ba59 532/*
e9404c27 533 * Set the channel's live timer.
d3e2ba59
JD
534 */
535void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 536 unsigned int live_timer_interval_us)
d3e2ba59
JD
537{
538 int ret;
d3e2ba59
JD
539
540 assert(channel);
541 assert(channel->key);
542
e9404c27
JG
543 ret = consumer_channel_timer_start(&channel->live_timer, channel,
544 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 545
e9404c27
JG
546 channel->live_timer_enabled = !!(ret == 0);
547}
d3e2ba59 548
e9404c27
JG
549/*
550 * Stop and delete the channel's live timer.
551 */
552void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
553{
554 int ret;
555
556 assert(channel);
d3e2ba59 557
e9404c27
JG
558 ret = consumer_channel_timer_stop(&channel->live_timer,
559 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 560 if (ret == -1) {
e9404c27 561 ERR("Failed to stop live timer");
d3e2ba59 562 }
e9404c27
JG
563
564 channel->live_timer_enabled = 0;
d3e2ba59
JD
565}
566
567/*
e9404c27
JG
568 * Set the channel's monitoring timer.
569 *
570 * Returns a negative value on error, 0 if a timer was created, and
571 * a positive value if no timer was created (not an error).
d3e2ba59 572 */
e9404c27
JG
573int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
574 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
575{
576 int ret;
577
578 assert(channel);
e9404c27
JG
579 assert(channel->key);
580 assert(!channel->monitor_timer_enabled);
d3e2ba59 581
e9404c27
JG
582 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
583 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
584 channel->monitor_timer_enabled = !!(ret == 0);
585 return ret;
586}
587
588/*
589 * Stop and delete the channel's monitoring timer.
590 */
591int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
592{
593 int ret;
594
595 assert(channel);
596 assert(channel->monitor_timer_enabled);
597
598 ret = consumer_channel_timer_stop(&channel->monitor_timer,
599 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 600 if (ret == -1) {
e9404c27
JG
601 ERR("Failed to stop live timer");
602 goto end;
d3e2ba59
JD
603 }
604
e9404c27
JG
605 channel->monitor_timer_enabled = 0;
606end:
607 return ret;
d3e2ba59
JD
608}
609
331744e3
JD
610/*
611 * Block the RT signals for the entire process. It must be called from the
612 * consumer main before creating the threads
613 */
73664f81 614int consumer_signal_init(void)
331744e3
JD
615{
616 int ret;
617 sigset_t mask;
618
619 /* Block signal for entire process, so only our thread processes it. */
620 setmask(&mask);
621 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
622 if (ret) {
623 errno = ret;
624 PERROR("pthread_sigmask");
73664f81 625 return -1;
331744e3 626 }
73664f81 627 return 0;
331744e3
JD
628}
629
e9404c27
JG
630static
631int sample_channel_positions(struct lttng_consumer_channel *channel,
632 uint64_t *_highest_use, uint64_t *_lowest_use,
633 sample_positions_cb sample, get_consumed_cb get_consumed,
634 get_produced_cb get_produced)
635{
636 int ret;
637 struct lttng_ht_iter iter;
638 struct lttng_consumer_stream *stream;
639 bool empty_channel = true;
640 uint64_t high = 0, low = UINT64_MAX;
641 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
642
643 rcu_read_lock();
644
645 cds_lfht_for_each_entry_duplicate(ht->ht,
646 ht->hash_fct(&channel->key, lttng_ht_seed),
647 ht->match_fct, &channel->key,
648 &iter.iter, stream, node_channel_id.node) {
649 unsigned long produced, consumed, usage;
650
651 empty_channel = false;
652
653 pthread_mutex_lock(&stream->lock);
654 if (cds_lfht_is_node_deleted(&stream->node.node)) {
655 goto next;
656 }
657
658 ret = sample(stream);
659 if (ret) {
660 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
661 pthread_mutex_unlock(&stream->lock);
662 goto end;
663 }
664 ret = get_consumed(stream, &consumed);
665 if (ret) {
666 ERR("Failed to get buffer consumed position in monitor timer");
667 pthread_mutex_unlock(&stream->lock);
668 goto end;
669 }
670 ret = get_produced(stream, &produced);
671 if (ret) {
672 ERR("Failed to get buffer produced position in monitor timer");
673 pthread_mutex_unlock(&stream->lock);
674 goto end;
675 }
676
677 usage = produced - consumed;
678 high = (usage > high) ? usage : high;
679 low = (usage < low) ? usage : low;
680 next:
681 pthread_mutex_unlock(&stream->lock);
682 }
683
684 *_highest_use = high;
685 *_lowest_use = low;
686end:
687 rcu_read_unlock();
688 if (empty_channel) {
689 ret = -1;
690 }
691 return ret;
692}
693
694/*
695 * Execute action on a monitor timer.
696 */
697static
698void monitor_timer(struct lttng_consumer_local_data *ctx,
699 struct lttng_consumer_channel *channel)
700{
701 int ret;
702 int channel_monitor_pipe =
703 consumer_timer_thread_get_channel_monitor_pipe();
704 struct lttcomm_consumer_channel_monitor_msg msg = {
705 .key = channel->key,
706 };
707 sample_positions_cb sample;
708 get_consumed_cb get_consumed;
709 get_produced_cb get_produced;
710
711 assert(channel);
712 pthread_mutex_lock(&consumer_data.lock);
713
714 if (channel_monitor_pipe < 0) {
715 goto end;
716 }
717
718 switch (consumer_data.type) {
719 case LTTNG_CONSUMER_KERNEL:
720 sample = lttng_kconsumer_sample_snapshot_positions;
721 get_consumed = lttng_kconsumer_get_consumed_snapshot;
722 get_produced = lttng_kconsumer_get_produced_snapshot;
723 break;
724 case LTTNG_CONSUMER32_UST:
725 case LTTNG_CONSUMER64_UST:
726 sample = lttng_ustconsumer_sample_snapshot_positions;
727 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
728 get_produced = lttng_ustconsumer_get_produced_snapshot;
729 break;
730 default:
731 abort();
732 }
733
734 ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
735 sample, get_consumed, get_produced);
736 if (ret) {
737 goto end;
738 }
739
740 /*
741 * Writes performed here are assumed to be atomic which is only
742 * guaranteed for sizes < than PIPE_BUF.
743 */
744 assert(sizeof(msg) <= PIPE_BUF);
745
746 do {
747 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
748 } while (ret == -1 && errno == EINTR);
749 if (ret == -1) {
750 if (errno == EAGAIN) {
751 /* Not an error, the sample is merely dropped. */
752 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
753 channel->key);
754 } else {
755 PERROR("write to the channel monitor pipe");
756 }
757 } else {
758 DBG("Sent channel monitoring sample for channel key %" PRIu64
759 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
760 channel->key, msg.highest, msg.lowest);
761 }
762end:
763 pthread_mutex_unlock(&consumer_data.lock);
764}
765
766int consumer_timer_thread_get_channel_monitor_pipe(void)
767{
768 return uatomic_read(&channel_monitor_pipe);
769}
770
771int consumer_timer_thread_set_channel_monitor_pipe(int fd)
772{
773 int ret;
774
775 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
776 if (ret != -1) {
777 ret = -1;
778 goto end;
779 }
780 ret = 0;
781end:
782 return ret;
783}
784
331744e3 785/*
d3e2ba59 786 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27
JG
787 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
788 * LTTNG_CONSUMER_SIG_MONITOR.
331744e3 789 */
d3e2ba59 790void *consumer_timer_thread(void *data)
331744e3
JD
791{
792 int signr;
793 sigset_t mask;
794 siginfo_t info;
795 struct lttng_consumer_local_data *ctx = data;
796
8a9acb74
MD
797 rcu_register_thread();
798
1fc79fb4
MD
799 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
800
2d57de81
MD
801 if (testpoint(consumerd_thread_metadata_timer)) {
802 goto error_testpoint;
803 }
804
9ce5646a
MD
805 health_code_update();
806
331744e3
JD
807 /* Only self thread will receive signal mask. */
808 setmask(&mask);
809 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
810
811 while (1) {
9ce5646a
MD
812 health_code_update();
813
814 health_poll_entry();
331744e3 815 signr = sigwaitinfo(&mask, &info);
9ce5646a 816 health_poll_exit();
e9404c27
JG
817
818 /*
819 * NOTE: cascading conditions are used instead of a switch case
820 * since the use of SIGRTMIN in the definition of the signals'
821 * values prevents the reduction to an integer constant.
822 */
331744e3
JD
823 if (signr == -1) {
824 if (errno != EINTR) {
825 PERROR("sigwaitinfo");
826 }
827 continue;
828 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
e9404c27 829 metadata_switch_timer(ctx, info.si_signo, &info);
331744e3
JD
830 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
831 cmm_smp_mb();
832 CMM_STORE_SHARED(timer_signal.qs_done, 1);
833 cmm_smp_mb();
834 DBG("Signal timer metadata thread teardown");
d3e2ba59 835 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
e9404c27
JG
836 live_timer(ctx, info.si_signo, &info);
837 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
838 struct lttng_consumer_channel *channel;
839
840 channel = info.si_value.sival_ptr;
841 monitor_timer(ctx, channel);
331744e3
JD
842 } else {
843 ERR("Unexpected signal %d\n", info.si_signo);
844 }
845 }
846
2d57de81
MD
847error_testpoint:
848 /* Only reached in testpoint error */
849 health_error();
1fc79fb4
MD
850 health_unregister(health_consumerd);
851
8a9acb74
MD
852 rcu_unregister_thread();
853
1fc79fb4 854 /* Never return */
331744e3
JD
855 return NULL;
856}
This page took 0.118472 seconds and 4 git commands to generate.