Fix: consumerd: leak of tracing buffers on relayd connectivity issue
[lttng-tools.git] / src / common / consumer / consumer-timer.cpp
CommitLineData
331744e3 1/*
ab5be9fa
MJ
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
331744e3 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
331744e3 6 *
331744e3
JD
7 */
8
6c1c0768 9#define _LGPL_SOURCE
c9e313bc
SM
10#include <common/common.hpp>
11#include <common/compat/endian.hpp>
c9e313bc 12#include <common/consumer/consumer-stream.hpp>
c9e313bc 13#include <common/consumer/consumer-testpoint.hpp>
28ab034a
JG
14#include <common/consumer/consumer-timer.hpp>
15#include <common/kernel-consumer/kernel-consumer.hpp>
16#include <common/kernel-ctl/kernel-ctl.hpp>
56047f5a 17#include <common/urcu.hpp>
c9e313bc 18#include <common/ust-consumer/ust-consumer.hpp>
331744e3 19
28ab034a
JG
20#include <bin/lttng-consumerd/health-consumerd.hpp>
21#include <inttypes.h>
22#include <signal.h>
23
e665dfbc
JG
24using sample_positions_cb = int (*)(struct lttng_consumer_stream *);
25using get_consumed_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
26using get_produced_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
27using flush_index_cb = int (*)(struct lttng_consumer_stream *);
e9404c27 28
2b8f8754
MD
29static struct timer_signal_data timer_signal = {
30 .tid = 0,
31 .setup_done = 0,
32 .qs_done = 0,
33 .lock = PTHREAD_MUTEX_INITIALIZER,
34};
331744e3
JD
35
36/*
37 * Set custom signal mask to current thread.
38 */
39static void setmask(sigset_t *mask)
40{
41 int ret;
42
43 ret = sigemptyset(mask);
44 if (ret) {
45 PERROR("sigemptyset");
46 }
47 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
48 if (ret) {
d3e2ba59 49 PERROR("sigaddset switch");
331744e3
JD
50 }
51 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
52 if (ret) {
d3e2ba59
JD
53 PERROR("sigaddset teardown");
54 }
55 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
56 if (ret) {
57 PERROR("sigaddset live");
331744e3 58 }
e9404c27
JG
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
60 if (ret) {
61 PERROR("sigaddset monitor");
62 }
13675d0e
MD
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
64 if (ret) {
65 PERROR("sigaddset exit");
66 }
331744e3
JD
67}
68
fa29bfbf 69static int the_channel_monitor_pipe = -1;
e9404c27 70
331744e3
JD
71/*
72 * Execute action on a timer switch.
d98a47c7
MD
73 *
74 * Beware: metadata_switch_timer() should *never* take a mutex also held
75 * while consumer_timer_switch_stop() is called. It would result in
76 * deadlocks.
331744e3 77 */
28ab034a 78static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
331744e3
JD
79{
80 int ret;
81 struct lttng_consumer_channel *channel;
82
97535efa 83 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
a0377dfe 84 LTTNG_ASSERT(channel);
331744e3 85
4419b4fb
MD
86 if (channel->switch_timer_error) {
87 return;
88 }
89
331744e3
JD
90 DBG("Switch timer for channel %" PRIu64, channel->key);
91 switch (ctx->type) {
92 case LTTNG_CONSUMER32_UST:
93 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
94 /*
95 * Locks taken by lttng_ustconsumer_request_metadata():
96 * - metadata_socket_lock
97 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 98 * - channel->metadata_cache->lock
f40b76ae 99 * - Calling consumer_wait_metadata_cache_flushed():
5e41ebe1
MD
100 * - channel->timer_lock
101 * - channel->metadata_cache->lock
4fa3dc0e 102 *
5e41ebe1
MD
103 * Ensure that neither consumer_data.lock nor
104 * channel->lock are taken within this function, since
105 * they are held while consumer_timer_switch_stop() is
106 * called.
4fa3dc0e 107 */
f40b76ae 108 ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
331744e3 109 if (ret < 0) {
4419b4fb 110 channel->switch_timer_error = 1;
331744e3
JD
111 }
112 break;
113 case LTTNG_CONSUMER_KERNEL:
114 case LTTNG_CONSUMER_UNKNOWN:
a0377dfe 115 abort();
331744e3
JD
116 break;
117 }
118}
119
28ab034a 120static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, uint64_t stream_id)
d3e2ba59
JD
121{
122 int ret;
50adc264 123 struct ctf_packet_index index;
d3e2ba59
JD
124
125 memset(&index, 0, sizeof(index));
528f2ffa 126 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
127 index.timestamp_end = htobe64(ts);
128 ret = consumer_stream_write_index(stream, &index);
129 if (ret < 0) {
130 goto error;
131 }
132
133error:
134 return ret;
135}
136
c585821b 137int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 138{
528f2ffa 139 uint64_t ts, stream_id;
d3e2ba59
JD
140 int ret;
141
d3e2ba59
JD
142 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
143 if (ret < 0) {
144 ERR("Failed to get the current timestamp");
c585821b 145 goto end;
d3e2ba59
JD
146 }
147 ret = kernctl_buffer_flush(stream->wait_fd);
148 if (ret < 0) {
149 ERR("Failed to flush kernel stream");
c585821b 150 goto end;
d3e2ba59
JD
151 }
152 ret = kernctl_snapshot(stream->wait_fd);
153 if (ret < 0) {
32af2c95 154 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 155 PERROR("live timer kernel snapshot");
d3e2ba59 156 ret = -1;
c585821b 157 goto end;
d3e2ba59 158 }
528f2ffa
JD
159 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
160 if (ret < 0) {
161 PERROR("kernctl_get_stream_id");
c585821b 162 goto end;
528f2ffa 163 }
d3e2ba59 164 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 165 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 166 if (ret < 0) {
c585821b 167 goto end;
d3e2ba59
JD
168 }
169 }
170 ret = 0;
c585821b 171end:
d3e2ba59
JD
172 return ret;
173}
174
28ab034a 175static int check_stream(struct lttng_consumer_stream *stream, flush_index_cb flush_index)
d3e2ba59 176{
d3e2ba59
JD
177 int ret;
178
d3e2ba59
JD
179 /*
180 * While holding the stream mutex, try to take a snapshot, if it
181 * succeeds, it means that data is ready to be sent, just let the data
182 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
183 * means that there is no data to read after the flush, so we can
184 * safely send the empty index.
c585821b
MD
185 *
186 * Doing a trylock and checking if waiting on metadata if
187 * trylock fails. Bail out of the stream is indeed waiting for
188 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 189 */
c585821b
MD
190 for (;;) {
191 ret = pthread_mutex_trylock(&stream->lock);
192 switch (ret) {
193 case 0:
28ab034a 194 break; /* We have the lock. */
c585821b
MD
195 case EBUSY:
196 pthread_mutex_lock(&stream->metadata_timer_lock);
197 if (stream->waiting_on_metadata) {
198 ret = 0;
199 stream->missed_metadata_flush = true;
200 pthread_mutex_unlock(&stream->metadata_timer_lock);
28ab034a 201 goto end; /* Bail out. */
c585821b
MD
202 }
203 pthread_mutex_unlock(&stream->metadata_timer_lock);
204 /* Try again. */
205 caa_cpu_relax();
206 continue;
207 default:
208 ERR("Unexpected pthread_mutex_trylock error %d", ret);
209 ret = -1;
210 goto end;
211 }
212 break;
213 }
fad4b619 214 ret = flush_index(stream);
c585821b
MD
215 pthread_mutex_unlock(&stream->lock);
216end:
217 return ret;
218}
219
220int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
221{
222 uint64_t ts, stream_id;
223 int ret;
224
94d49140
JD
225 ret = cds_lfht_is_node_deleted(&stream->node.node);
226 if (ret) {
c585821b 227 goto end;
94d49140
JD
228 }
229
84a182ce 230 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
231 if (ret < 0) {
232 ERR("Failed to get the current timestamp");
c585821b 233 goto end;
d3e2ba59 234 }
881fc67f
MD
235 ret = lttng_ustconsumer_flush_buffer(stream, 1);
236 if (ret < 0) {
237 ERR("Failed to flush buffer while flushing index");
238 goto end;
239 }
84a182ce 240 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 241 if (ret < 0) {
94d49140 242 if (ret != -EAGAIN) {
d3e2ba59
JD
243 ERR("Taking UST snapshot");
244 ret = -1;
c585821b 245 goto end;
d3e2ba59 246 }
70190e1c 247 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa 248 if (ret < 0) {
b623cb6a 249 PERROR("lttng_ust_ctl_get_stream_id");
c585821b 250 goto end;
528f2ffa 251 }
d3e2ba59 252 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 253 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 254 if (ret < 0) {
c585821b 255 goto end;
d3e2ba59
JD
256 }
257 }
258 ret = 0;
c585821b
MD
259end:
260 return ret;
261}
d3e2ba59 262
d3e2ba59
JD
263/*
264 * Execute action on a live timer
265 */
28ab034a 266static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
d3e2ba59
JD
267{
268 int ret;
269 struct lttng_consumer_channel *channel;
270 struct lttng_consumer_stream *stream;
d3e2ba59 271 struct lttng_ht_iter iter;
fa29bfbf 272 const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
28ab034a
JG
273 const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ?
274 consumer_flush_kernel_index :
275 consumer_flush_ust_index;
d3e2ba59 276
97535efa 277 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
a0377dfe 278 LTTNG_ASSERT(channel);
d3e2ba59
JD
279
280 if (channel->switch_timer_error) {
281 goto error;
282 }
d3e2ba59
JD
283
284 DBG("Live timer for channel %" PRIu64, channel->key);
285
28ab034a 286 {
56047f5a
JG
287 lttng::urcu::read_lock_guard read_lock;
288 cds_lfht_for_each_entry_duplicate(ht->ht,
289 ht->hash_fct(&channel->key, lttng_ht_seed),
290 ht->match_fct,
291 &channel->key,
292 &iter.iter,
293 stream,
294 node_channel_id.node)
295 {
296 ret = check_stream(stream, flush_index);
297 if (ret < 0) {
298 goto error_unlock;
299 }
d3e2ba59 300 }
d3e2ba59 301 }
d3e2ba59 302error_unlock:
d3e2ba59
JD
303
304error:
305 return;
306}
307
28ab034a 308static void consumer_timer_signal_thread_qs(unsigned int signr)
2b8f8754
MD
309{
310 sigset_t pending_set;
311 int ret;
312
313 /*
314 * We need to be the only thread interacting with the thread
315 * that manages signals for teardown synchronization.
316 */
317 pthread_mutex_lock(&timer_signal.lock);
318
319 /* Ensure we don't have any signal queued for this channel. */
320 for (;;) {
321 ret = sigemptyset(&pending_set);
322 if (ret == -1) {
323 PERROR("sigemptyset");
324 }
325 ret = sigpending(&pending_set);
326 if (ret == -1) {
327 PERROR("sigpending");
328 }
f05a6b6d 329 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
330 break;
331 }
332 caa_cpu_relax();
333 }
334
335 /*
336 * From this point, no new signal handler will be fired that would try to
337 * access "chan". However, we still need to wait for any currently
338 * executing handler to complete.
339 */
340 cmm_smp_mb();
341 CMM_STORE_SHARED(timer_signal.qs_done, 0);
342 cmm_smp_mb();
343
344 /*
345 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
346 * up.
347 */
348 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
349
350 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
351 caa_cpu_relax();
352 }
353 cmm_smp_mb();
354
355 pthread_mutex_unlock(&timer_signal.lock);
356}
357
331744e3 358/*
e9404c27
JG
359 * Start a timer channel timer which will fire at a given interval
360 * (timer_interval_us)and fire a given signal (signal).
361 *
362 * Returns a negative value on error, 0 if a timer was created, and
363 * a positive value if no timer was created (not an error).
331744e3 364 */
28ab034a
JG
365static int consumer_channel_timer_start(timer_t *timer_id,
366 struct lttng_consumer_channel *channel,
367 unsigned int timer_interval_us,
368 int signal)
331744e3 369{
e9404c27 370 int ret = 0, delete_ret;
389b8e8f 371 struct sigevent sev = {};
331744e3
JD
372 struct itimerspec its;
373
a0377dfe
FD
374 LTTNG_ASSERT(channel);
375 LTTNG_ASSERT(channel->key);
331744e3 376
e9404c27
JG
377 if (timer_interval_us == 0) {
378 /* No creation needed; not an error. */
379 ret = 1;
380 goto end;
331744e3
JD
381 }
382
383 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 384 sev.sigev_signo = signal;
331744e3 385 sev.sigev_value.sival_ptr = channel;
e9404c27 386 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
387 if (ret == -1) {
388 PERROR("timer_create");
e9404c27 389 goto end;
331744e3 390 }
331744e3 391
e9404c27
JG
392 its.it_value.tv_sec = timer_interval_us / 1000000;
393 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
394 its.it_interval.tv_sec = its.it_value.tv_sec;
395 its.it_interval.tv_nsec = its.it_value.tv_nsec;
396
cd9adb8b 397 ret = timer_settime(*timer_id, 0, &its, nullptr);
331744e3
JD
398 if (ret == -1) {
399 PERROR("timer_settime");
e9404c27
JG
400 goto error_destroy_timer;
401 }
402end:
403 return ret;
404error_destroy_timer:
405 delete_ret = timer_delete(*timer_id);
406 if (delete_ret == -1) {
407 PERROR("timer_delete");
408 }
409 goto end;
410}
411
28ab034a 412static int consumer_channel_timer_stop(timer_t *timer_id, int signal)
e9404c27
JG
413{
414 int ret = 0;
415
416 ret = timer_delete(*timer_id);
417 if (ret == -1) {
418 PERROR("timer_delete");
419 goto end;
331744e3 420 }
e9404c27
JG
421
422 consumer_timer_signal_thread_qs(signal);
cd9adb8b 423 *timer_id = nullptr;
e9404c27
JG
424end:
425 return ret;
426}
427
428/*
429 * Set the channel's switch timer.
430 */
431void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
28ab034a 432 unsigned int switch_timer_interval_us)
e9404c27
JG
433{
434 int ret;
435
a0377dfe
FD
436 LTTNG_ASSERT(channel);
437 LTTNG_ASSERT(channel->key);
e9404c27 438
28ab034a
JG
439 ret = consumer_channel_timer_start(&channel->switch_timer,
440 channel,
441 switch_timer_interval_us,
442 LTTNG_CONSUMER_SIG_SWITCH);
e9404c27
JG
443
444 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
445}
446
447/*
e9404c27 448 * Stop and delete the channel's switch timer.
331744e3
JD
449 */
450void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
451{
452 int ret;
331744e3 453
a0377dfe 454 LTTNG_ASSERT(channel);
331744e3 455
28ab034a 456 ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH);
331744e3 457 if (ret == -1) {
e9404c27 458 ERR("Failed to stop switch timer");
331744e3
JD
459 }
460
2b8f8754 461 channel->switch_timer_enabled = 0;
331744e3
JD
462}
463
d3e2ba59 464/*
e9404c27 465 * Set the channel's live timer.
d3e2ba59
JD
466 */
467void consumer_timer_live_start(struct lttng_consumer_channel *channel,
28ab034a 468 unsigned int live_timer_interval_us)
d3e2ba59
JD
469{
470 int ret;
d3e2ba59 471
a0377dfe
FD
472 LTTNG_ASSERT(channel);
473 LTTNG_ASSERT(channel->key);
d3e2ba59 474
28ab034a
JG
475 ret = consumer_channel_timer_start(
476 &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 477
e9404c27
JG
478 channel->live_timer_enabled = !!(ret == 0);
479}
d3e2ba59 480
e9404c27
JG
481/*
482 * Stop and delete the channel's live timer.
483 */
484void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
485{
486 int ret;
487
a0377dfe 488 LTTNG_ASSERT(channel);
d3e2ba59 489
28ab034a 490 ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 491 if (ret == -1) {
e9404c27 492 ERR("Failed to stop live timer");
d3e2ba59 493 }
e9404c27
JG
494
495 channel->live_timer_enabled = 0;
d3e2ba59
JD
496}
497
498/*
e9404c27
JG
499 * Set the channel's monitoring timer.
500 *
501 * Returns a negative value on error, 0 if a timer was created, and
502 * a positive value if no timer was created (not an error).
d3e2ba59 503 */
e9404c27 504int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
28ab034a 505 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
506{
507 int ret;
508
a0377dfe
FD
509 LTTNG_ASSERT(channel);
510 LTTNG_ASSERT(channel->key);
511 LTTNG_ASSERT(!channel->monitor_timer_enabled);
d3e2ba59 512
28ab034a
JG
513 ret = consumer_channel_timer_start(&channel->monitor_timer,
514 channel,
515 monitor_timer_interval_us,
516 LTTNG_CONSUMER_SIG_MONITOR);
e9404c27
JG
517 channel->monitor_timer_enabled = !!(ret == 0);
518 return ret;
519}
520
521/*
522 * Stop and delete the channel's monitoring timer.
523 */
524int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
525{
526 int ret;
527
a0377dfe
FD
528 LTTNG_ASSERT(channel);
529 LTTNG_ASSERT(channel->monitor_timer_enabled);
e9404c27 530
28ab034a 531 ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 532 if (ret == -1) {
9fed015c 533 ERR("Failed to stop monitor timer");
e9404c27 534 goto end;
d3e2ba59
JD
535 }
536
e9404c27
JG
537 channel->monitor_timer_enabled = 0;
538end:
539 return ret;
d3e2ba59
JD
540}
541
331744e3
JD
542/*
543 * Block the RT signals for the entire process. It must be called from the
544 * consumer main before creating the threads
545 */
cd9adb8b 546int consumer_signal_init()
331744e3
JD
547{
548 int ret;
549 sigset_t mask;
550
551 /* Block signal for entire process, so only our thread processes it. */
552 setmask(&mask);
cd9adb8b 553 ret = pthread_sigmask(SIG_BLOCK, &mask, nullptr);
331744e3
JD
554 if (ret) {
555 errno = ret;
556 PERROR("pthread_sigmask");
73664f81 557 return -1;
331744e3 558 }
73664f81 559 return 0;
331744e3
JD
560}
561
28ab034a
JG
562static int sample_channel_positions(struct lttng_consumer_channel *channel,
563 uint64_t *_highest_use,
564 uint64_t *_lowest_use,
565 uint64_t *_total_consumed,
566 sample_positions_cb sample,
567 get_consumed_cb get_consumed,
568 get_produced_cb get_produced)
e9404c27 569{
23bc9bb5 570 int ret = 0;
e9404c27
JG
571 struct lttng_ht_iter iter;
572 struct lttng_consumer_stream *stream;
573 bool empty_channel = true;
574 uint64_t high = 0, low = UINT64_MAX;
fa29bfbf 575 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
e9404c27 576
e8360425
JD
577 *_total_consumed = 0;
578
56047f5a 579 lttng::urcu::read_lock_guard read_lock;
e9404c27
JG
580
581 cds_lfht_for_each_entry_duplicate(ht->ht,
28ab034a
JG
582 ht->hash_fct(&channel->key, lttng_ht_seed),
583 ht->match_fct,
584 &channel->key,
585 &iter.iter,
586 stream,
587 node_channel_id.node)
588 {
e9404c27
JG
589 unsigned long produced, consumed, usage;
590
591 empty_channel = false;
592
593 pthread_mutex_lock(&stream->lock);
594 if (cds_lfht_is_node_deleted(&stream->node.node)) {
595 goto next;
596 }
597
598 ret = sample(stream);
599 if (ret) {
28ab034a
JG
600 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)",
601 ret);
e9404c27
JG
602 pthread_mutex_unlock(&stream->lock);
603 goto end;
604 }
605 ret = get_consumed(stream, &consumed);
606 if (ret) {
607 ERR("Failed to get buffer consumed position in monitor timer");
608 pthread_mutex_unlock(&stream->lock);
609 goto end;
610 }
611 ret = get_produced(stream, &produced);
612 if (ret) {
613 ERR("Failed to get buffer produced position in monitor timer");
614 pthread_mutex_unlock(&stream->lock);
615 goto end;
616 }
617
618 usage = produced - consumed;
619 high = (usage > high) ? usage : high;
620 low = (usage < low) ? usage : low;
e8360425
JD
621
622 /*
623 * We don't use consumed here for 2 reasons:
624 * - output_written takes into account the padding written in the
625 * tracefiles when we stop the session;
626 * - the consumed position is not the accurate representation of what
627 * was extracted from a buffer in overwrite mode.
628 */
629 *_total_consumed += stream->output_written;
e9404c27
JG
630 next:
631 pthread_mutex_unlock(&stream->lock);
632 }
633
634 *_highest_use = high;
635 *_lowest_use = low;
636end:
e9404c27
JG
637 if (empty_channel) {
638 ret = -1;
639 }
640 return ret;
641}
642
9cc4ae91
JG
643/* Sample and send channel buffering statistics to the session daemon. */
644void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel)
e9404c27
JG
645{
646 int ret;
28ab034a 647 int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe();
e9404c27
JG
648 struct lttcomm_consumer_channel_monitor_msg msg = {
649 .key = channel->key,
319dcddc 650 .session_id = channel->session_id,
1c9a0b0e
MJ
651 .lowest = 0,
652 .highest = 0,
319dcddc 653 .consumed_since_last_sample = 0,
e9404c27
JG
654 };
655 sample_positions_cb sample;
656 get_consumed_cb get_consumed;
657 get_produced_cb get_produced;
ef4b086e 658 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27 659
a0377dfe 660 LTTNG_ASSERT(channel);
e9404c27
JG
661
662 if (channel_monitor_pipe < 0) {
873dda4e 663 return;
e9404c27
JG
664 }
665
fa29bfbf 666 switch (the_consumer_data.type) {
e9404c27
JG
667 case LTTNG_CONSUMER_KERNEL:
668 sample = lttng_kconsumer_sample_snapshot_positions;
669 get_consumed = lttng_kconsumer_get_consumed_snapshot;
670 get_produced = lttng_kconsumer_get_produced_snapshot;
671 break;
672 case LTTNG_CONSUMER32_UST:
673 case LTTNG_CONSUMER64_UST:
674 sample = lttng_ustconsumer_sample_snapshot_positions;
675 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
676 get_produced = lttng_ustconsumer_get_produced_snapshot;
677 break;
678 default:
679 abort();
680 }
681
28ab034a
JG
682 ret = sample_channel_positions(
683 channel, &highest, &lowest, &total_consumed, sample, get_consumed, get_produced);
e9404c27 684 if (ret) {
873dda4e 685 return;
e9404c27 686 }
319dcddc 687
ef4b086e
JG
688 msg.highest = highest;
689 msg.lowest = lowest;
319dcddc 690 msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent;
e9404c27
JG
691
692 /*
693 * Writes performed here are assumed to be atomic which is only
694 * guaranteed for sizes < than PIPE_BUF.
695 */
a0377dfe 696 LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF);
e9404c27
JG
697
698 do {
699 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
700 } while (ret == -1 && errno == EINTR);
701 if (ret == -1) {
702 if (errno == EAGAIN) {
703 /* Not an error, the sample is merely dropped. */
97535efa 704 DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
28ab034a 705 channel->key);
e9404c27
JG
706 } else {
707 PERROR("write to the channel monitor pipe");
708 }
709 } else {
710 DBG("Sent channel monitoring sample for channel key %" PRIu64
28ab034a
JG
711 ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
712 channel->key,
713 msg.highest,
714 msg.lowest);
319dcddc 715 channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample;
e9404c27 716 }
e9404c27
JG
717}
718
cd9adb8b 719int consumer_timer_thread_get_channel_monitor_pipe()
e9404c27 720{
fa29bfbf 721 return uatomic_read(&the_channel_monitor_pipe);
e9404c27
JG
722}
723
724int consumer_timer_thread_set_channel_monitor_pipe(int fd)
725{
726 int ret;
727
fa29bfbf 728 ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
e9404c27
JG
729 if (ret != -1) {
730 ret = -1;
731 goto end;
732 }
733 ret = 0;
734end:
735 return ret;
736}
737
331744e3 738/*
d3e2ba59 739 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 740 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 741 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 742 */
d3e2ba59 743void *consumer_timer_thread(void *data)
331744e3
JD
744{
745 int signr;
746 sigset_t mask;
747 siginfo_t info;
97535efa 748 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
331744e3 749
8a9acb74
MD
750 rcu_register_thread();
751
1fc79fb4
MD
752 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
753
2d57de81
MD
754 if (testpoint(consumerd_thread_metadata_timer)) {
755 goto error_testpoint;
756 }
757
9ce5646a
MD
758 health_code_update();
759
331744e3
JD
760 /* Only self thread will receive signal mask. */
761 setmask(&mask);
762 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
763
cd9adb8b 764 while (true) {
9ce5646a
MD
765 health_code_update();
766
767 health_poll_entry();
331744e3 768 signr = sigwaitinfo(&mask, &info);
9ce5646a 769 health_poll_exit();
e9404c27
JG
770
771 /*
772 * NOTE: cascading conditions are used instead of a switch case
773 * since the use of SIGRTMIN in the definition of the signals'
774 * values prevents the reduction to an integer constant.
775 */
331744e3
JD
776 if (signr == -1) {
777 if (errno != EINTR) {
778 PERROR("sigwaitinfo");
779 }
780 continue;
781 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 782 metadata_switch_timer(ctx, &info);
331744e3
JD
783 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
784 cmm_smp_mb();
785 CMM_STORE_SHARED(timer_signal.qs_done, 1);
786 cmm_smp_mb();
787 DBG("Signal timer metadata thread teardown");
d3e2ba59 788 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 789 live_timer(ctx, &info);
e9404c27
JG
790 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
791 struct lttng_consumer_channel *channel;
792
97535efa 793 channel = (lttng_consumer_channel *) info.si_value.sival_ptr;
9cc4ae91 794 sample_and_send_channel_buffer_stats(channel);
13675d0e 795 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
a0377dfe 796 LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
13675d0e 797 goto end;
331744e3
JD
798 } else {
799 ERR("Unexpected signal %d\n", info.si_signo);
800 }
801 }
802
2d57de81
MD
803error_testpoint:
804 /* Only reached in testpoint error */
805 health_error();
13675d0e 806end:
1fc79fb4 807 health_unregister(health_consumerd);
8a9acb74 808 rcu_unregister_thread();
cd9adb8b 809 return nullptr;
331744e3 810}
This page took 0.143484 seconds and 4 git commands to generate.