Fix: sessiond: use system LTTng-UST headers when available
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
29#include <common/consumer/consumer-stream.h>
30#include <common/consumer/consumer-timer.h>
31#include <common/consumer/consumer-testpoint.h>
32#include <common/ust-consumer/ust-consumer.h>
331744e3 33
e9404c27
JG
34typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
39
2b8f8754
MD
40static struct timer_signal_data timer_signal = {
41 .tid = 0,
42 .setup_done = 0,
43 .qs_done = 0,
44 .lock = PTHREAD_MUTEX_INITIALIZER,
45};
331744e3
JD
46
47/*
48 * Set custom signal mask to current thread.
49 */
50static void setmask(sigset_t *mask)
51{
52 int ret;
53
54 ret = sigemptyset(mask);
55 if (ret) {
56 PERROR("sigemptyset");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
59 if (ret) {
d3e2ba59 60 PERROR("sigaddset switch");
331744e3
JD
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
63 if (ret) {
d3e2ba59
JD
64 PERROR("sigaddset teardown");
65 }
66 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
67 if (ret) {
68 PERROR("sigaddset live");
331744e3 69 }
e9404c27
JG
70 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
71 if (ret) {
72 PERROR("sigaddset monitor");
73 }
02fffb7c
MD
74 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
75 if (ret) {
76 PERROR("sigaddset exit");
77 }
331744e3
JD
78}
79
e9404c27
JG
80static int channel_monitor_pipe = -1;
81
331744e3
JD
82/*
83 * Execute action on a timer switch.
d98a47c7
MD
84 *
85 * Beware: metadata_switch_timer() should *never* take a mutex also held
86 * while consumer_timer_switch_stop() is called. It would result in
87 * deadlocks.
331744e3
JD
88 */
89static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
e9404c27 90 int sig, siginfo_t *si)
331744e3
JD
91{
92 int ret;
93 struct lttng_consumer_channel *channel;
94
95 channel = si->si_value.sival_ptr;
96 assert(channel);
97
4419b4fb
MD
98 if (channel->switch_timer_error) {
99 return;
100 }
101
331744e3
JD
102 DBG("Switch timer for channel %" PRIu64, channel->key);
103 switch (ctx->type) {
104 case LTTNG_CONSUMER32_UST:
105 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
106 /*
107 * Locks taken by lttng_ustconsumer_request_metadata():
108 * - metadata_socket_lock
109 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 110 * - channel->metadata_cache->lock
4fa3dc0e 111 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
112 * - channel->timer_lock
113 * - channel->metadata_cache->lock
4fa3dc0e 114 *
5e41ebe1
MD
115 * Ensure that neither consumer_data.lock nor
116 * channel->lock are taken within this function, since
117 * they are held while consumer_timer_switch_stop() is
118 * called.
4fa3dc0e 119 */
94d49140 120 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 121 if (ret < 0) {
4419b4fb 122 channel->switch_timer_error = 1;
331744e3
JD
123 }
124 break;
125 case LTTNG_CONSUMER_KERNEL:
126 case LTTNG_CONSUMER_UNKNOWN:
127 assert(0);
128 break;
129 }
130}
131
528f2ffa
JD
132static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
133 uint64_t stream_id)
d3e2ba59
JD
134{
135 int ret;
50adc264 136 struct ctf_packet_index index;
d3e2ba59
JD
137
138 memset(&index, 0, sizeof(index));
528f2ffa 139 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
140 index.timestamp_end = htobe64(ts);
141 ret = consumer_stream_write_index(stream, &index);
142 if (ret < 0) {
143 goto error;
144 }
145
146error:
147 return ret;
148}
149
c585821b 150int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 151{
528f2ffa 152 uint64_t ts, stream_id;
d3e2ba59
JD
153 int ret;
154
d3e2ba59
JD
155 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
156 if (ret < 0) {
157 ERR("Failed to get the current timestamp");
c585821b 158 goto end;
d3e2ba59
JD
159 }
160 ret = kernctl_buffer_flush(stream->wait_fd);
161 if (ret < 0) {
162 ERR("Failed to flush kernel stream");
c585821b 163 goto end;
d3e2ba59
JD
164 }
165 ret = kernctl_snapshot(stream->wait_fd);
166 if (ret < 0) {
32af2c95 167 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 168 PERROR("live timer kernel snapshot");
d3e2ba59 169 ret = -1;
c585821b 170 goto end;
d3e2ba59 171 }
528f2ffa
JD
172 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
173 if (ret < 0) {
174 PERROR("kernctl_get_stream_id");
c585821b 175 goto end;
528f2ffa 176 }
d3e2ba59 177 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 178 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 179 if (ret < 0) {
c585821b 180 goto end;
d3e2ba59
JD
181 }
182 }
183 ret = 0;
c585821b 184end:
d3e2ba59
JD
185 return ret;
186}
187
c585821b 188static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 189{
d3e2ba59
JD
190 int ret;
191
d3e2ba59
JD
192 /*
193 * While holding the stream mutex, try to take a snapshot, if it
194 * succeeds, it means that data is ready to be sent, just let the data
195 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
196 * means that there is no data to read after the flush, so we can
197 * safely send the empty index.
c585821b
MD
198 *
199 * Doing a trylock and checking if waiting on metadata if
200 * trylock fails. Bail out of the stream is indeed waiting for
201 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 202 */
c585821b
MD
203 for (;;) {
204 ret = pthread_mutex_trylock(&stream->lock);
205 switch (ret) {
206 case 0:
207 break; /* We have the lock. */
208 case EBUSY:
209 pthread_mutex_lock(&stream->metadata_timer_lock);
210 if (stream->waiting_on_metadata) {
211 ret = 0;
212 stream->missed_metadata_flush = true;
213 pthread_mutex_unlock(&stream->metadata_timer_lock);
214 goto end; /* Bail out. */
215 }
216 pthread_mutex_unlock(&stream->metadata_timer_lock);
217 /* Try again. */
218 caa_cpu_relax();
219 continue;
220 default:
221 ERR("Unexpected pthread_mutex_trylock error %d", ret);
222 ret = -1;
223 goto end;
224 }
225 break;
226 }
227 ret = consumer_flush_kernel_index(stream);
228 pthread_mutex_unlock(&stream->lock);
229end:
230 return ret;
231}
232
233int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
234{
235 uint64_t ts, stream_id;
236 int ret;
237
94d49140
JD
238 ret = cds_lfht_is_node_deleted(&stream->node.node);
239 if (ret) {
c585821b 240 goto end;
94d49140
JD
241 }
242
84a182ce 243 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
244 if (ret < 0) {
245 ERR("Failed to get the current timestamp");
c585821b 246 goto end;
d3e2ba59 247 }
84a182ce
DG
248 lttng_ustconsumer_flush_buffer(stream, 1);
249 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 250 if (ret < 0) {
94d49140 251 if (ret != -EAGAIN) {
d3e2ba59
JD
252 ERR("Taking UST snapshot");
253 ret = -1;
c585821b 254 goto end;
d3e2ba59 255 }
70190e1c 256 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
257 if (ret < 0) {
258 PERROR("ustctl_get_stream_id");
c585821b 259 goto end;
528f2ffa 260 }
d3e2ba59 261 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 262 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 263 if (ret < 0) {
c585821b 264 goto end;
d3e2ba59
JD
265 }
266 }
267 ret = 0;
c585821b
MD
268end:
269 return ret;
270}
d3e2ba59 271
c585821b
MD
272static int check_ust_stream(struct lttng_consumer_stream *stream)
273{
274 int ret;
275
276 assert(stream);
277 assert(stream->ustream);
278 /*
279 * While holding the stream mutex, try to take a snapshot, if it
280 * succeeds, it means that data is ready to be sent, just let the data
281 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
282 * means that there is no data to read after the flush, so we can
283 * safely send the empty index.
284 *
285 * Doing a trylock and checking if waiting on metadata if
286 * trylock fails. Bail out of the stream is indeed waiting for
287 * metadata to be pushed. Busy wait on trylock otherwise.
288 */
289 for (;;) {
290 ret = pthread_mutex_trylock(&stream->lock);
291 switch (ret) {
292 case 0:
293 break; /* We have the lock. */
294 case EBUSY:
295 pthread_mutex_lock(&stream->metadata_timer_lock);
296 if (stream->waiting_on_metadata) {
297 ret = 0;
298 stream->missed_metadata_flush = true;
299 pthread_mutex_unlock(&stream->metadata_timer_lock);
300 goto end; /* Bail out. */
301 }
302 pthread_mutex_unlock(&stream->metadata_timer_lock);
303 /* Try again. */
304 caa_cpu_relax();
305 continue;
306 default:
307 ERR("Unexpected pthread_mutex_trylock error %d", ret);
308 ret = -1;
309 goto end;
310 }
311 break;
312 }
313 ret = consumer_flush_ust_index(stream);
d3e2ba59 314 pthread_mutex_unlock(&stream->lock);
c585821b 315end:
d3e2ba59
JD
316 return ret;
317}
318
319/*
320 * Execute action on a live timer
321 */
322static void live_timer(struct lttng_consumer_local_data *ctx,
e9404c27 323 int sig, siginfo_t *si)
d3e2ba59
JD
324{
325 int ret;
326 struct lttng_consumer_channel *channel;
327 struct lttng_consumer_stream *stream;
328 struct lttng_ht *ht;
329 struct lttng_ht_iter iter;
330
331 channel = si->si_value.sival_ptr;
332 assert(channel);
333
334 if (channel->switch_timer_error) {
335 goto error;
336 }
337 ht = consumer_data.stream_per_chan_id_ht;
338
339 DBG("Live timer for channel %" PRIu64, channel->key);
340
341 rcu_read_lock();
342 switch (ctx->type) {
343 case LTTNG_CONSUMER32_UST:
344 case LTTNG_CONSUMER64_UST:
345 cds_lfht_for_each_entry_duplicate(ht->ht,
346 ht->hash_fct(&channel->key, lttng_ht_seed),
347 ht->match_fct, &channel->key, &iter.iter,
348 stream, node_channel_id.node) {
349 ret = check_ust_stream(stream);
350 if (ret < 0) {
351 goto error_unlock;
352 }
353 }
354 break;
355 case LTTNG_CONSUMER_KERNEL:
356 cds_lfht_for_each_entry_duplicate(ht->ht,
357 ht->hash_fct(&channel->key, lttng_ht_seed),
358 ht->match_fct, &channel->key, &iter.iter,
359 stream, node_channel_id.node) {
360 ret = check_kernel_stream(stream);
361 if (ret < 0) {
362 goto error_unlock;
363 }
364 }
365 break;
366 case LTTNG_CONSUMER_UNKNOWN:
367 assert(0);
368 break;
369 }
370
371error_unlock:
372 rcu_read_unlock();
373
374error:
375 return;
376}
377
2b8f8754
MD
378static
379void consumer_timer_signal_thread_qs(unsigned int signr)
380{
381 sigset_t pending_set;
382 int ret;
383
384 /*
385 * We need to be the only thread interacting with the thread
386 * that manages signals for teardown synchronization.
387 */
388 pthread_mutex_lock(&timer_signal.lock);
389
390 /* Ensure we don't have any signal queued for this channel. */
391 for (;;) {
392 ret = sigemptyset(&pending_set);
393 if (ret == -1) {
394 PERROR("sigemptyset");
395 }
396 ret = sigpending(&pending_set);
397 if (ret == -1) {
398 PERROR("sigpending");
399 }
cecd89cf 400 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
401 break;
402 }
403 caa_cpu_relax();
404 }
405
406 /*
407 * From this point, no new signal handler will be fired that would try to
408 * access "chan". However, we still need to wait for any currently
409 * executing handler to complete.
410 */
411 cmm_smp_mb();
412 CMM_STORE_SHARED(timer_signal.qs_done, 0);
413 cmm_smp_mb();
414
415 /*
416 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
417 * up.
418 */
419 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
420
421 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
422 caa_cpu_relax();
423 }
424 cmm_smp_mb();
425
426 pthread_mutex_unlock(&timer_signal.lock);
427}
428
331744e3 429/*
e9404c27
JG
430 * Start a timer channel timer which will fire at a given interval
431 * (timer_interval_us)and fire a given signal (signal).
432 *
433 * Returns a negative value on error, 0 if a timer was created, and
434 * a positive value if no timer was created (not an error).
331744e3 435 */
e9404c27
JG
436static
437int consumer_channel_timer_start(timer_t *timer_id,
438 struct lttng_consumer_channel *channel,
439 unsigned int timer_interval_us, int signal)
331744e3 440{
e9404c27 441 int ret = 0, delete_ret;
331744e3
JD
442 struct sigevent sev;
443 struct itimerspec its;
444
445 assert(channel);
446 assert(channel->key);
447
e9404c27
JG
448 if (timer_interval_us == 0) {
449 /* No creation needed; not an error. */
450 ret = 1;
451 goto end;
331744e3
JD
452 }
453
454 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 455 sev.sigev_signo = signal;
331744e3 456 sev.sigev_value.sival_ptr = channel;
e9404c27 457 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
458 if (ret == -1) {
459 PERROR("timer_create");
e9404c27 460 goto end;
331744e3 461 }
331744e3 462
e9404c27
JG
463 its.it_value.tv_sec = timer_interval_us / 1000000;
464 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
465 its.it_interval.tv_sec = its.it_value.tv_sec;
466 its.it_interval.tv_nsec = its.it_value.tv_nsec;
467
e9404c27 468 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
469 if (ret == -1) {
470 PERROR("timer_settime");
e9404c27
JG
471 goto error_destroy_timer;
472 }
473end:
474 return ret;
475error_destroy_timer:
476 delete_ret = timer_delete(*timer_id);
477 if (delete_ret == -1) {
478 PERROR("timer_delete");
479 }
480 goto end;
481}
482
483static
484int consumer_channel_timer_stop(timer_t *timer_id, int signal)
485{
486 int ret = 0;
487
488 ret = timer_delete(*timer_id);
489 if (ret == -1) {
490 PERROR("timer_delete");
491 goto end;
331744e3 492 }
e9404c27
JG
493
494 consumer_timer_signal_thread_qs(signal);
495 *timer_id = 0;
496end:
497 return ret;
498}
499
500/*
501 * Set the channel's switch timer.
502 */
503void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
504 unsigned int switch_timer_interval_us)
505{
506 int ret;
507
508 assert(channel);
509 assert(channel->key);
510
511 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
512 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
513
514 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
515}
516
517/*
e9404c27 518 * Stop and delete the channel's switch timer.
331744e3
JD
519 */
520void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
521{
522 int ret;
331744e3
JD
523
524 assert(channel);
525
e9404c27
JG
526 ret = consumer_channel_timer_stop(&channel->switch_timer,
527 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 528 if (ret == -1) {
e9404c27 529 ERR("Failed to stop switch timer");
331744e3
JD
530 }
531
2b8f8754 532 channel->switch_timer_enabled = 0;
331744e3
JD
533}
534
d3e2ba59 535/*
e9404c27 536 * Set the channel's live timer.
d3e2ba59
JD
537 */
538void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 539 unsigned int live_timer_interval_us)
d3e2ba59
JD
540{
541 int ret;
d3e2ba59
JD
542
543 assert(channel);
544 assert(channel->key);
545
e9404c27
JG
546 ret = consumer_channel_timer_start(&channel->live_timer, channel,
547 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 548
e9404c27
JG
549 channel->live_timer_enabled = !!(ret == 0);
550}
d3e2ba59 551
e9404c27
JG
552/*
553 * Stop and delete the channel's live timer.
554 */
555void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
556{
557 int ret;
558
559 assert(channel);
d3e2ba59 560
e9404c27
JG
561 ret = consumer_channel_timer_stop(&channel->live_timer,
562 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 563 if (ret == -1) {
e9404c27 564 ERR("Failed to stop live timer");
d3e2ba59 565 }
e9404c27
JG
566
567 channel->live_timer_enabled = 0;
d3e2ba59
JD
568}
569
570/*
e9404c27
JG
571 * Set the channel's monitoring timer.
572 *
573 * Returns a negative value on error, 0 if a timer was created, and
574 * a positive value if no timer was created (not an error).
d3e2ba59 575 */
e9404c27
JG
576int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
577 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
578{
579 int ret;
580
581 assert(channel);
e9404c27
JG
582 assert(channel->key);
583 assert(!channel->monitor_timer_enabled);
d3e2ba59 584
e9404c27
JG
585 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
586 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
587 channel->monitor_timer_enabled = !!(ret == 0);
588 return ret;
589}
590
591/*
592 * Stop and delete the channel's monitoring timer.
593 */
594int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
595{
596 int ret;
597
598 assert(channel);
599 assert(channel->monitor_timer_enabled);
600
601 ret = consumer_channel_timer_stop(&channel->monitor_timer,
602 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 603 if (ret == -1) {
e9404c27
JG
604 ERR("Failed to stop live timer");
605 goto end;
d3e2ba59
JD
606 }
607
e9404c27
JG
608 channel->monitor_timer_enabled = 0;
609end:
610 return ret;
d3e2ba59
JD
611}
612
331744e3
JD
613/*
614 * Block the RT signals for the entire process. It must be called from the
615 * consumer main before creating the threads
616 */
73664f81 617int consumer_signal_init(void)
331744e3
JD
618{
619 int ret;
620 sigset_t mask;
621
622 /* Block signal for entire process, so only our thread processes it. */
623 setmask(&mask);
624 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
625 if (ret) {
626 errno = ret;
627 PERROR("pthread_sigmask");
73664f81 628 return -1;
331744e3 629 }
73664f81 630 return 0;
331744e3
JD
631}
632
e9404c27
JG
633static
634int sample_channel_positions(struct lttng_consumer_channel *channel,
635 uint64_t *_highest_use, uint64_t *_lowest_use,
636 sample_positions_cb sample, get_consumed_cb get_consumed,
637 get_produced_cb get_produced)
638{
e96fdefc 639 int ret = 0;
e9404c27
JG
640 struct lttng_ht_iter iter;
641 struct lttng_consumer_stream *stream;
642 bool empty_channel = true;
643 uint64_t high = 0, low = UINT64_MAX;
644 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
645
646 rcu_read_lock();
647
648 cds_lfht_for_each_entry_duplicate(ht->ht,
649 ht->hash_fct(&channel->key, lttng_ht_seed),
650 ht->match_fct, &channel->key,
651 &iter.iter, stream, node_channel_id.node) {
652 unsigned long produced, consumed, usage;
653
654 empty_channel = false;
655
656 pthread_mutex_lock(&stream->lock);
657 if (cds_lfht_is_node_deleted(&stream->node.node)) {
658 goto next;
659 }
660
661 ret = sample(stream);
662 if (ret) {
663 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
664 pthread_mutex_unlock(&stream->lock);
665 goto end;
666 }
667 ret = get_consumed(stream, &consumed);
668 if (ret) {
669 ERR("Failed to get buffer consumed position in monitor timer");
670 pthread_mutex_unlock(&stream->lock);
671 goto end;
672 }
673 ret = get_produced(stream, &produced);
674 if (ret) {
675 ERR("Failed to get buffer produced position in monitor timer");
676 pthread_mutex_unlock(&stream->lock);
677 goto end;
678 }
679
680 usage = produced - consumed;
681 high = (usage > high) ? usage : high;
682 low = (usage < low) ? usage : low;
683 next:
684 pthread_mutex_unlock(&stream->lock);
685 }
686
687 *_highest_use = high;
688 *_lowest_use = low;
689end:
690 rcu_read_unlock();
691 if (empty_channel) {
692 ret = -1;
693 }
694 return ret;
695}
696
697/*
698 * Execute action on a monitor timer.
699 */
700static
701void monitor_timer(struct lttng_consumer_local_data *ctx,
702 struct lttng_consumer_channel *channel)
703{
704 int ret;
705 int channel_monitor_pipe =
706 consumer_timer_thread_get_channel_monitor_pipe();
707 struct lttcomm_consumer_channel_monitor_msg msg = {
708 .key = channel->key,
709 };
710 sample_positions_cb sample;
711 get_consumed_cb get_consumed;
712 get_produced_cb get_produced;
0b611245 713 uint64_t lowest = 0, highest = 0;
e9404c27
JG
714
715 assert(channel);
e9404c27
JG
716
717 if (channel_monitor_pipe < 0) {
fd9e9d30 718 return;
e9404c27
JG
719 }
720
721 switch (consumer_data.type) {
722 case LTTNG_CONSUMER_KERNEL:
723 sample = lttng_kconsumer_sample_snapshot_positions;
724 get_consumed = lttng_kconsumer_get_consumed_snapshot;
725 get_produced = lttng_kconsumer_get_produced_snapshot;
726 break;
727 case LTTNG_CONSUMER32_UST:
728 case LTTNG_CONSUMER64_UST:
729 sample = lttng_ustconsumer_sample_snapshot_positions;
730 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
731 get_produced = lttng_ustconsumer_get_produced_snapshot;
732 break;
733 default:
734 abort();
735 }
736
0b611245 737 ret = sample_channel_positions(channel, &highest, &lowest,
e9404c27
JG
738 sample, get_consumed, get_produced);
739 if (ret) {
fd9e9d30 740 return;
e9404c27 741 }
0b611245
JG
742 msg.highest = highest;
743 msg.lowest = lowest;
e9404c27
JG
744
745 /*
746 * Writes performed here are assumed to be atomic which is only
747 * guaranteed for sizes < than PIPE_BUF.
748 */
749 assert(sizeof(msg) <= PIPE_BUF);
750
751 do {
752 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
753 } while (ret == -1 && errno == EINTR);
754 if (ret == -1) {
755 if (errno == EAGAIN) {
756 /* Not an error, the sample is merely dropped. */
757 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
758 channel->key);
759 } else {
760 PERROR("write to the channel monitor pipe");
761 }
762 } else {
763 DBG("Sent channel monitoring sample for channel key %" PRIu64
764 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
765 channel->key, msg.highest, msg.lowest);
766 }
e9404c27
JG
767}
768
769int consumer_timer_thread_get_channel_monitor_pipe(void)
770{
771 return uatomic_read(&channel_monitor_pipe);
772}
773
774int consumer_timer_thread_set_channel_monitor_pipe(int fd)
775{
776 int ret;
777
778 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
779 if (ret != -1) {
780 ret = -1;
781 goto end;
782 }
783 ret = 0;
784end:
785 return ret;
786}
787
331744e3 788/*
d3e2ba59 789 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 790 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
02fffb7c 791 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 792 */
d3e2ba59 793void *consumer_timer_thread(void *data)
331744e3
JD
794{
795 int signr;
796 sigset_t mask;
797 siginfo_t info;
798 struct lttng_consumer_local_data *ctx = data;
799
8a9acb74
MD
800 rcu_register_thread();
801
1fc79fb4
MD
802 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
803
2d57de81
MD
804 if (testpoint(consumerd_thread_metadata_timer)) {
805 goto error_testpoint;
806 }
807
9ce5646a
MD
808 health_code_update();
809
331744e3
JD
810 /* Only self thread will receive signal mask. */
811 setmask(&mask);
812 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
813
814 while (1) {
9ce5646a
MD
815 health_code_update();
816
817 health_poll_entry();
331744e3 818 signr = sigwaitinfo(&mask, &info);
9ce5646a 819 health_poll_exit();
e9404c27
JG
820
821 /*
822 * NOTE: cascading conditions are used instead of a switch case
823 * since the use of SIGRTMIN in the definition of the signals'
824 * values prevents the reduction to an integer constant.
825 */
331744e3
JD
826 if (signr == -1) {
827 if (errno != EINTR) {
828 PERROR("sigwaitinfo");
829 }
830 continue;
831 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
e9404c27 832 metadata_switch_timer(ctx, info.si_signo, &info);
331744e3
JD
833 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
834 cmm_smp_mb();
835 CMM_STORE_SHARED(timer_signal.qs_done, 1);
836 cmm_smp_mb();
837 DBG("Signal timer metadata thread teardown");
d3e2ba59 838 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
e9404c27
JG
839 live_timer(ctx, info.si_signo, &info);
840 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
841 struct lttng_consumer_channel *channel;
842
843 channel = info.si_value.sival_ptr;
844 monitor_timer(ctx, channel);
02fffb7c
MD
845 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
846 assert(CMM_LOAD_SHARED(consumer_quit));
847 goto end;
331744e3
JD
848 } else {
849 ERR("Unexpected signal %d\n", info.si_signo);
850 }
851 }
852
2d57de81
MD
853error_testpoint:
854 /* Only reached in testpoint error */
855 health_error();
02fffb7c 856end:
1fc79fb4 857 health_unregister(health_consumerd);
8a9acb74 858 rcu_unregister_thread();
331744e3
JD
859 return NULL;
860}
This page took 0.0787 seconds and 4 git commands to generate.