clang-tidy: add Chrome-inspired checks
[lttng-tools.git] / src / common / consumer / consumer-timer.cpp
CommitLineData
331744e3 1/*
ab5be9fa
MJ
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
331744e3 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
331744e3 6 *
331744e3
JD
7 */
8
6c1c0768 9#define _LGPL_SOURCE
c9e313bc
SM
10#include <common/common.hpp>
11#include <common/compat/endian.hpp>
c9e313bc 12#include <common/consumer/consumer-stream.hpp>
c9e313bc 13#include <common/consumer/consumer-testpoint.hpp>
28ab034a
JG
14#include <common/consumer/consumer-timer.hpp>
15#include <common/kernel-consumer/kernel-consumer.hpp>
16#include <common/kernel-ctl/kernel-ctl.hpp>
c9e313bc 17#include <common/ust-consumer/ust-consumer.hpp>
331744e3 18
28ab034a
JG
19#include <bin/lttng-consumerd/health-consumerd.hpp>
20#include <inttypes.h>
21#include <signal.h>
22
e665dfbc
JG
23using sample_positions_cb = int (*)(struct lttng_consumer_stream *);
24using get_consumed_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
25using get_produced_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
26using flush_index_cb = int (*)(struct lttng_consumer_stream *);
e9404c27 27
2b8f8754
MD
28static struct timer_signal_data timer_signal = {
29 .tid = 0,
30 .setup_done = 0,
31 .qs_done = 0,
32 .lock = PTHREAD_MUTEX_INITIALIZER,
33};
331744e3
JD
34
35/*
36 * Set custom signal mask to current thread.
37 */
38static void setmask(sigset_t *mask)
39{
40 int ret;
41
42 ret = sigemptyset(mask);
43 if (ret) {
44 PERROR("sigemptyset");
45 }
46 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
47 if (ret) {
d3e2ba59 48 PERROR("sigaddset switch");
331744e3
JD
49 }
50 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
51 if (ret) {
d3e2ba59
JD
52 PERROR("sigaddset teardown");
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
55 if (ret) {
56 PERROR("sigaddset live");
331744e3 57 }
e9404c27
JG
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
59 if (ret) {
60 PERROR("sigaddset monitor");
61 }
13675d0e
MD
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
63 if (ret) {
64 PERROR("sigaddset exit");
65 }
331744e3
JD
66}
67
fa29bfbf 68static int the_channel_monitor_pipe = -1;
e9404c27 69
331744e3
JD
70/*
71 * Execute action on a timer switch.
d98a47c7
MD
72 *
73 * Beware: metadata_switch_timer() should *never* take a mutex also held
74 * while consumer_timer_switch_stop() is called. It would result in
75 * deadlocks.
331744e3 76 */
28ab034a 77static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
331744e3
JD
78{
79 int ret;
80 struct lttng_consumer_channel *channel;
81
97535efa 82 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
a0377dfe 83 LTTNG_ASSERT(channel);
331744e3 84
4419b4fb
MD
85 if (channel->switch_timer_error) {
86 return;
87 }
88
331744e3
JD
89 DBG("Switch timer for channel %" PRIu64, channel->key);
90 switch (ctx->type) {
91 case LTTNG_CONSUMER32_UST:
92 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
93 /*
94 * Locks taken by lttng_ustconsumer_request_metadata():
95 * - metadata_socket_lock
96 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 97 * - channel->metadata_cache->lock
4fa3dc0e 98 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
99 * - channel->timer_lock
100 * - channel->metadata_cache->lock
4fa3dc0e 101 *
5e41ebe1
MD
102 * Ensure that neither consumer_data.lock nor
103 * channel->lock are taken within this function, since
104 * they are held while consumer_timer_switch_stop() is
105 * called.
4fa3dc0e 106 */
94d49140 107 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 108 if (ret < 0) {
4419b4fb 109 channel->switch_timer_error = 1;
331744e3
JD
110 }
111 break;
112 case LTTNG_CONSUMER_KERNEL:
113 case LTTNG_CONSUMER_UNKNOWN:
a0377dfe 114 abort();
331744e3
JD
115 break;
116 }
117}
118
28ab034a 119static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, uint64_t stream_id)
d3e2ba59
JD
120{
121 int ret;
50adc264 122 struct ctf_packet_index index;
d3e2ba59
JD
123
124 memset(&index, 0, sizeof(index));
528f2ffa 125 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
126 index.timestamp_end = htobe64(ts);
127 ret = consumer_stream_write_index(stream, &index);
128 if (ret < 0) {
129 goto error;
130 }
131
132error:
133 return ret;
134}
135
c585821b 136int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 137{
528f2ffa 138 uint64_t ts, stream_id;
d3e2ba59
JD
139 int ret;
140
d3e2ba59
JD
141 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
142 if (ret < 0) {
143 ERR("Failed to get the current timestamp");
c585821b 144 goto end;
d3e2ba59
JD
145 }
146 ret = kernctl_buffer_flush(stream->wait_fd);
147 if (ret < 0) {
148 ERR("Failed to flush kernel stream");
c585821b 149 goto end;
d3e2ba59
JD
150 }
151 ret = kernctl_snapshot(stream->wait_fd);
152 if (ret < 0) {
32af2c95 153 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 154 PERROR("live timer kernel snapshot");
d3e2ba59 155 ret = -1;
c585821b 156 goto end;
d3e2ba59 157 }
528f2ffa
JD
158 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
159 if (ret < 0) {
160 PERROR("kernctl_get_stream_id");
c585821b 161 goto end;
528f2ffa 162 }
d3e2ba59 163 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 164 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 165 if (ret < 0) {
c585821b 166 goto end;
d3e2ba59
JD
167 }
168 }
169 ret = 0;
c585821b 170end:
d3e2ba59
JD
171 return ret;
172}
173
28ab034a 174static int check_stream(struct lttng_consumer_stream *stream, flush_index_cb flush_index)
d3e2ba59 175{
d3e2ba59
JD
176 int ret;
177
d3e2ba59
JD
178 /*
179 * While holding the stream mutex, try to take a snapshot, if it
180 * succeeds, it means that data is ready to be sent, just let the data
181 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
182 * means that there is no data to read after the flush, so we can
183 * safely send the empty index.
c585821b
MD
184 *
185 * Doing a trylock and checking if waiting on metadata if
186 * trylock fails. Bail out of the stream is indeed waiting for
187 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 188 */
c585821b
MD
189 for (;;) {
190 ret = pthread_mutex_trylock(&stream->lock);
191 switch (ret) {
192 case 0:
28ab034a 193 break; /* We have the lock. */
c585821b
MD
194 case EBUSY:
195 pthread_mutex_lock(&stream->metadata_timer_lock);
196 if (stream->waiting_on_metadata) {
197 ret = 0;
198 stream->missed_metadata_flush = true;
199 pthread_mutex_unlock(&stream->metadata_timer_lock);
28ab034a 200 goto end; /* Bail out. */
c585821b
MD
201 }
202 pthread_mutex_unlock(&stream->metadata_timer_lock);
203 /* Try again. */
204 caa_cpu_relax();
205 continue;
206 default:
207 ERR("Unexpected pthread_mutex_trylock error %d", ret);
208 ret = -1;
209 goto end;
210 }
211 break;
212 }
fad4b619 213 ret = flush_index(stream);
c585821b
MD
214 pthread_mutex_unlock(&stream->lock);
215end:
216 return ret;
217}
218
219int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
220{
221 uint64_t ts, stream_id;
222 int ret;
223
94d49140
JD
224 ret = cds_lfht_is_node_deleted(&stream->node.node);
225 if (ret) {
c585821b 226 goto end;
94d49140
JD
227 }
228
84a182ce 229 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
230 if (ret < 0) {
231 ERR("Failed to get the current timestamp");
c585821b 232 goto end;
d3e2ba59 233 }
881fc67f
MD
234 ret = lttng_ustconsumer_flush_buffer(stream, 1);
235 if (ret < 0) {
236 ERR("Failed to flush buffer while flushing index");
237 goto end;
238 }
84a182ce 239 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 240 if (ret < 0) {
94d49140 241 if (ret != -EAGAIN) {
d3e2ba59
JD
242 ERR("Taking UST snapshot");
243 ret = -1;
c585821b 244 goto end;
d3e2ba59 245 }
70190e1c 246 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa 247 if (ret < 0) {
b623cb6a 248 PERROR("lttng_ust_ctl_get_stream_id");
c585821b 249 goto end;
528f2ffa 250 }
d3e2ba59 251 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 252 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 253 if (ret < 0) {
c585821b 254 goto end;
d3e2ba59
JD
255 }
256 }
257 ret = 0;
c585821b
MD
258end:
259 return ret;
260}
d3e2ba59 261
d3e2ba59
JD
262/*
263 * Execute action on a live timer
264 */
28ab034a 265static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
d3e2ba59
JD
266{
267 int ret;
268 struct lttng_consumer_channel *channel;
269 struct lttng_consumer_stream *stream;
d3e2ba59 270 struct lttng_ht_iter iter;
fa29bfbf 271 const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
28ab034a
JG
272 const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ?
273 consumer_flush_kernel_index :
274 consumer_flush_ust_index;
d3e2ba59 275
97535efa 276 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
a0377dfe 277 LTTNG_ASSERT(channel);
d3e2ba59
JD
278
279 if (channel->switch_timer_error) {
280 goto error;
281 }
d3e2ba59
JD
282
283 DBG("Live timer for channel %" PRIu64, channel->key);
284
285 rcu_read_lock();
fad4b619 286 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
287 ht->hash_fct(&channel->key, lttng_ht_seed),
288 ht->match_fct,
289 &channel->key,
290 &iter.iter,
291 stream,
292 node_channel_id.node)
293 {
fad4b619
JG
294 ret = check_stream(stream, flush_index);
295 if (ret < 0) {
296 goto error_unlock;
d3e2ba59 297 }
d3e2ba59
JD
298 }
299
300error_unlock:
301 rcu_read_unlock();
302
303error:
304 return;
305}
306
28ab034a 307static void consumer_timer_signal_thread_qs(unsigned int signr)
2b8f8754
MD
308{
309 sigset_t pending_set;
310 int ret;
311
312 /*
313 * We need to be the only thread interacting with the thread
314 * that manages signals for teardown synchronization.
315 */
316 pthread_mutex_lock(&timer_signal.lock);
317
318 /* Ensure we don't have any signal queued for this channel. */
319 for (;;) {
320 ret = sigemptyset(&pending_set);
321 if (ret == -1) {
322 PERROR("sigemptyset");
323 }
324 ret = sigpending(&pending_set);
325 if (ret == -1) {
326 PERROR("sigpending");
327 }
f05a6b6d 328 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
329 break;
330 }
331 caa_cpu_relax();
332 }
333
334 /*
335 * From this point, no new signal handler will be fired that would try to
336 * access "chan". However, we still need to wait for any currently
337 * executing handler to complete.
338 */
339 cmm_smp_mb();
340 CMM_STORE_SHARED(timer_signal.qs_done, 0);
341 cmm_smp_mb();
342
343 /*
344 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
345 * up.
346 */
347 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
348
349 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
350 caa_cpu_relax();
351 }
352 cmm_smp_mb();
353
354 pthread_mutex_unlock(&timer_signal.lock);
355}
356
331744e3 357/*
e9404c27
JG
358 * Start a timer channel timer which will fire at a given interval
359 * (timer_interval_us)and fire a given signal (signal).
360 *
361 * Returns a negative value on error, 0 if a timer was created, and
362 * a positive value if no timer was created (not an error).
331744e3 363 */
28ab034a
JG
364static int consumer_channel_timer_start(timer_t *timer_id,
365 struct lttng_consumer_channel *channel,
366 unsigned int timer_interval_us,
367 int signal)
331744e3 368{
e9404c27 369 int ret = 0, delete_ret;
389b8e8f 370 struct sigevent sev = {};
331744e3
JD
371 struct itimerspec its;
372
a0377dfe
FD
373 LTTNG_ASSERT(channel);
374 LTTNG_ASSERT(channel->key);
331744e3 375
e9404c27
JG
376 if (timer_interval_us == 0) {
377 /* No creation needed; not an error. */
378 ret = 1;
379 goto end;
331744e3
JD
380 }
381
382 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 383 sev.sigev_signo = signal;
331744e3 384 sev.sigev_value.sival_ptr = channel;
e9404c27 385 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
386 if (ret == -1) {
387 PERROR("timer_create");
e9404c27 388 goto end;
331744e3 389 }
331744e3 390
e9404c27
JG
391 its.it_value.tv_sec = timer_interval_us / 1000000;
392 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
393 its.it_interval.tv_sec = its.it_value.tv_sec;
394 its.it_interval.tv_nsec = its.it_value.tv_nsec;
395
cd9adb8b 396 ret = timer_settime(*timer_id, 0, &its, nullptr);
331744e3
JD
397 if (ret == -1) {
398 PERROR("timer_settime");
e9404c27
JG
399 goto error_destroy_timer;
400 }
401end:
402 return ret;
403error_destroy_timer:
404 delete_ret = timer_delete(*timer_id);
405 if (delete_ret == -1) {
406 PERROR("timer_delete");
407 }
408 goto end;
409}
410
28ab034a 411static int consumer_channel_timer_stop(timer_t *timer_id, int signal)
e9404c27
JG
412{
413 int ret = 0;
414
415 ret = timer_delete(*timer_id);
416 if (ret == -1) {
417 PERROR("timer_delete");
418 goto end;
331744e3 419 }
e9404c27
JG
420
421 consumer_timer_signal_thread_qs(signal);
cd9adb8b 422 *timer_id = nullptr;
e9404c27
JG
423end:
424 return ret;
425}
426
427/*
428 * Set the channel's switch timer.
429 */
430void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
28ab034a 431 unsigned int switch_timer_interval_us)
e9404c27
JG
432{
433 int ret;
434
a0377dfe
FD
435 LTTNG_ASSERT(channel);
436 LTTNG_ASSERT(channel->key);
e9404c27 437
28ab034a
JG
438 ret = consumer_channel_timer_start(&channel->switch_timer,
439 channel,
440 switch_timer_interval_us,
441 LTTNG_CONSUMER_SIG_SWITCH);
e9404c27
JG
442
443 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
444}
445
446/*
e9404c27 447 * Stop and delete the channel's switch timer.
331744e3
JD
448 */
449void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
450{
451 int ret;
331744e3 452
a0377dfe 453 LTTNG_ASSERT(channel);
331744e3 454
28ab034a 455 ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH);
331744e3 456 if (ret == -1) {
e9404c27 457 ERR("Failed to stop switch timer");
331744e3
JD
458 }
459
2b8f8754 460 channel->switch_timer_enabled = 0;
331744e3
JD
461}
462
d3e2ba59 463/*
e9404c27 464 * Set the channel's live timer.
d3e2ba59
JD
465 */
466void consumer_timer_live_start(struct lttng_consumer_channel *channel,
28ab034a 467 unsigned int live_timer_interval_us)
d3e2ba59
JD
468{
469 int ret;
d3e2ba59 470
a0377dfe
FD
471 LTTNG_ASSERT(channel);
472 LTTNG_ASSERT(channel->key);
d3e2ba59 473
28ab034a
JG
474 ret = consumer_channel_timer_start(
475 &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 476
e9404c27
JG
477 channel->live_timer_enabled = !!(ret == 0);
478}
d3e2ba59 479
e9404c27
JG
480/*
481 * Stop and delete the channel's live timer.
482 */
483void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
484{
485 int ret;
486
a0377dfe 487 LTTNG_ASSERT(channel);
d3e2ba59 488
28ab034a 489 ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 490 if (ret == -1) {
e9404c27 491 ERR("Failed to stop live timer");
d3e2ba59 492 }
e9404c27
JG
493
494 channel->live_timer_enabled = 0;
d3e2ba59
JD
495}
496
497/*
e9404c27
JG
498 * Set the channel's monitoring timer.
499 *
500 * Returns a negative value on error, 0 if a timer was created, and
501 * a positive value if no timer was created (not an error).
d3e2ba59 502 */
e9404c27 503int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
28ab034a 504 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
505{
506 int ret;
507
a0377dfe
FD
508 LTTNG_ASSERT(channel);
509 LTTNG_ASSERT(channel->key);
510 LTTNG_ASSERT(!channel->monitor_timer_enabled);
d3e2ba59 511
28ab034a
JG
512 ret = consumer_channel_timer_start(&channel->monitor_timer,
513 channel,
514 monitor_timer_interval_us,
515 LTTNG_CONSUMER_SIG_MONITOR);
e9404c27
JG
516 channel->monitor_timer_enabled = !!(ret == 0);
517 return ret;
518}
519
520/*
521 * Stop and delete the channel's monitoring timer.
522 */
523int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
524{
525 int ret;
526
a0377dfe
FD
527 LTTNG_ASSERT(channel);
528 LTTNG_ASSERT(channel->monitor_timer_enabled);
e9404c27 529
28ab034a 530 ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 531 if (ret == -1) {
e9404c27
JG
532 ERR("Failed to stop live timer");
533 goto end;
d3e2ba59
JD
534 }
535
e9404c27
JG
536 channel->monitor_timer_enabled = 0;
537end:
538 return ret;
d3e2ba59
JD
539}
540
331744e3
JD
541/*
542 * Block the RT signals for the entire process. It must be called from the
543 * consumer main before creating the threads
544 */
cd9adb8b 545int consumer_signal_init()
331744e3
JD
546{
547 int ret;
548 sigset_t mask;
549
550 /* Block signal for entire process, so only our thread processes it. */
551 setmask(&mask);
cd9adb8b 552 ret = pthread_sigmask(SIG_BLOCK, &mask, nullptr);
331744e3
JD
553 if (ret) {
554 errno = ret;
555 PERROR("pthread_sigmask");
73664f81 556 return -1;
331744e3 557 }
73664f81 558 return 0;
331744e3
JD
559}
560
28ab034a
JG
561static int sample_channel_positions(struct lttng_consumer_channel *channel,
562 uint64_t *_highest_use,
563 uint64_t *_lowest_use,
564 uint64_t *_total_consumed,
565 sample_positions_cb sample,
566 get_consumed_cb get_consumed,
567 get_produced_cb get_produced)
e9404c27 568{
23bc9bb5 569 int ret = 0;
e9404c27
JG
570 struct lttng_ht_iter iter;
571 struct lttng_consumer_stream *stream;
572 bool empty_channel = true;
573 uint64_t high = 0, low = UINT64_MAX;
fa29bfbf 574 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
e9404c27 575
e8360425
JD
576 *_total_consumed = 0;
577
e9404c27
JG
578 rcu_read_lock();
579
580 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
581 ht->hash_fct(&channel->key, lttng_ht_seed),
582 ht->match_fct,
583 &channel->key,
584 &iter.iter,
585 stream,
586 node_channel_id.node)
587 {
e9404c27
JG
588 unsigned long produced, consumed, usage;
589
590 empty_channel = false;
591
592 pthread_mutex_lock(&stream->lock);
593 if (cds_lfht_is_node_deleted(&stream->node.node)) {
594 goto next;
595 }
596
597 ret = sample(stream);
598 if (ret) {
28ab034a
JG
599 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)",
600 ret);
e9404c27
JG
601 pthread_mutex_unlock(&stream->lock);
602 goto end;
603 }
604 ret = get_consumed(stream, &consumed);
605 if (ret) {
606 ERR("Failed to get buffer consumed position in monitor timer");
607 pthread_mutex_unlock(&stream->lock);
608 goto end;
609 }
610 ret = get_produced(stream, &produced);
611 if (ret) {
612 ERR("Failed to get buffer produced position in monitor timer");
613 pthread_mutex_unlock(&stream->lock);
614 goto end;
615 }
616
617 usage = produced - consumed;
618 high = (usage > high) ? usage : high;
619 low = (usage < low) ? usage : low;
e8360425
JD
620
621 /*
622 * We don't use consumed here for 2 reasons:
623 * - output_written takes into account the padding written in the
624 * tracefiles when we stop the session;
625 * - the consumed position is not the accurate representation of what
626 * was extracted from a buffer in overwrite mode.
627 */
628 *_total_consumed += stream->output_written;
e9404c27
JG
629 next:
630 pthread_mutex_unlock(&stream->lock);
631 }
632
633 *_highest_use = high;
634 *_lowest_use = low;
635end:
636 rcu_read_unlock();
637 if (empty_channel) {
638 ret = -1;
639 }
640 return ret;
641}
642
9cc4ae91
JG
643/* Sample and send channel buffering statistics to the session daemon. */
644void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel)
e9404c27
JG
645{
646 int ret;
28ab034a 647 int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe();
e9404c27
JG
648 struct lttcomm_consumer_channel_monitor_msg msg = {
649 .key = channel->key,
319dcddc 650 .session_id = channel->session_id,
1c9a0b0e
MJ
651 .lowest = 0,
652 .highest = 0,
319dcddc 653 .consumed_since_last_sample = 0,
e9404c27
JG
654 };
655 sample_positions_cb sample;
656 get_consumed_cb get_consumed;
657 get_produced_cb get_produced;
ef4b086e 658 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27 659
a0377dfe 660 LTTNG_ASSERT(channel);
e9404c27
JG
661
662 if (channel_monitor_pipe < 0) {
873dda4e 663 return;
e9404c27
JG
664 }
665
fa29bfbf 666 switch (the_consumer_data.type) {
e9404c27
JG
667 case LTTNG_CONSUMER_KERNEL:
668 sample = lttng_kconsumer_sample_snapshot_positions;
669 get_consumed = lttng_kconsumer_get_consumed_snapshot;
670 get_produced = lttng_kconsumer_get_produced_snapshot;
671 break;
672 case LTTNG_CONSUMER32_UST:
673 case LTTNG_CONSUMER64_UST:
674 sample = lttng_ustconsumer_sample_snapshot_positions;
675 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
676 get_produced = lttng_ustconsumer_get_produced_snapshot;
677 break;
678 default:
679 abort();
680 }
681
28ab034a
JG
682 ret = sample_channel_positions(
683 channel, &highest, &lowest, &total_consumed, sample, get_consumed, get_produced);
e9404c27 684 if (ret) {
873dda4e 685 return;
e9404c27 686 }
319dcddc 687
ef4b086e
JG
688 msg.highest = highest;
689 msg.lowest = lowest;
319dcddc 690 msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent;
e9404c27
JG
691
692 /*
693 * Writes performed here are assumed to be atomic which is only
694 * guaranteed for sizes < than PIPE_BUF.
695 */
a0377dfe 696 LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF);
e9404c27
JG
697
698 do {
699 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
700 } while (ret == -1 && errno == EINTR);
701 if (ret == -1) {
702 if (errno == EAGAIN) {
703 /* Not an error, the sample is merely dropped. */
97535efa 704 DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
28ab034a 705 channel->key);
e9404c27
JG
706 } else {
707 PERROR("write to the channel monitor pipe");
708 }
709 } else {
710 DBG("Sent channel monitoring sample for channel key %" PRIu64
28ab034a
JG
711 ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
712 channel->key,
713 msg.highest,
714 msg.lowest);
319dcddc 715 channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample;
e9404c27 716 }
e9404c27
JG
717}
718
cd9adb8b 719int consumer_timer_thread_get_channel_monitor_pipe()
e9404c27 720{
fa29bfbf 721 return uatomic_read(&the_channel_monitor_pipe);
e9404c27
JG
722}
723
724int consumer_timer_thread_set_channel_monitor_pipe(int fd)
725{
726 int ret;
727
fa29bfbf 728 ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
e9404c27
JG
729 if (ret != -1) {
730 ret = -1;
731 goto end;
732 }
733 ret = 0;
734end:
735 return ret;
736}
737
331744e3 738/*
d3e2ba59 739 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 740 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 741 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 742 */
d3e2ba59 743void *consumer_timer_thread(void *data)
331744e3
JD
744{
745 int signr;
746 sigset_t mask;
747 siginfo_t info;
97535efa 748 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
331744e3 749
8a9acb74
MD
750 rcu_register_thread();
751
1fc79fb4
MD
752 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
753
2d57de81
MD
754 if (testpoint(consumerd_thread_metadata_timer)) {
755 goto error_testpoint;
756 }
757
9ce5646a
MD
758 health_code_update();
759
331744e3
JD
760 /* Only self thread will receive signal mask. */
761 setmask(&mask);
762 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
763
cd9adb8b 764 while (true) {
9ce5646a
MD
765 health_code_update();
766
767 health_poll_entry();
331744e3 768 signr = sigwaitinfo(&mask, &info);
9ce5646a 769 health_poll_exit();
e9404c27
JG
770
771 /*
772 * NOTE: cascading conditions are used instead of a switch case
773 * since the use of SIGRTMIN in the definition of the signals'
774 * values prevents the reduction to an integer constant.
775 */
331744e3
JD
776 if (signr == -1) {
777 if (errno != EINTR) {
778 PERROR("sigwaitinfo");
779 }
780 continue;
781 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 782 metadata_switch_timer(ctx, &info);
331744e3
JD
783 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
784 cmm_smp_mb();
785 CMM_STORE_SHARED(timer_signal.qs_done, 1);
786 cmm_smp_mb();
787 DBG("Signal timer metadata thread teardown");
d3e2ba59 788 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 789 live_timer(ctx, &info);
e9404c27
JG
790 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
791 struct lttng_consumer_channel *channel;
792
97535efa 793 channel = (lttng_consumer_channel *) info.si_value.sival_ptr;
9cc4ae91 794 sample_and_send_channel_buffer_stats(channel);
13675d0e 795 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
a0377dfe 796 LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
13675d0e 797 goto end;
331744e3
JD
798 } else {
799 ERR("Unexpected signal %d\n", info.si_signo);
800 }
801 }
802
2d57de81
MD
803error_testpoint:
804 /* Only reached in testpoint error */
805 health_error();
13675d0e 806end:
1fc79fb4 807 health_unregister(health_consumerd);
8a9acb74 808 rcu_unregister_thread();
cd9adb8b 809 return nullptr;
331744e3 810}
This page took 0.104541 seconds and 4 git commands to generate.