consumerd: clean-up: stream attribute accessed without locking stream
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
29#include <common/consumer/consumer-stream.h>
30#include <common/consumer/consumer-timer.h>
31#include <common/consumer/consumer-testpoint.h>
32#include <common/ust-consumer/ust-consumer.h>
331744e3 33
e9404c27
JG
34typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
39
2b8f8754
MD
40static struct timer_signal_data timer_signal = {
41 .tid = 0,
42 .setup_done = 0,
43 .qs_done = 0,
44 .lock = PTHREAD_MUTEX_INITIALIZER,
45};
331744e3
JD
46
47/*
48 * Set custom signal mask to current thread.
49 */
50static void setmask(sigset_t *mask)
51{
52 int ret;
53
54 ret = sigemptyset(mask);
55 if (ret) {
56 PERROR("sigemptyset");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
59 if (ret) {
d3e2ba59 60 PERROR("sigaddset switch");
331744e3
JD
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
63 if (ret) {
d3e2ba59
JD
64 PERROR("sigaddset teardown");
65 }
66 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
67 if (ret) {
68 PERROR("sigaddset live");
331744e3 69 }
e9404c27
JG
70 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
71 if (ret) {
72 PERROR("sigaddset monitor");
73 }
13675d0e
MD
74 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
75 if (ret) {
76 PERROR("sigaddset exit");
77 }
331744e3
JD
78}
79
e9404c27
JG
80static int channel_monitor_pipe = -1;
81
331744e3
JD
82/*
83 * Execute action on a timer switch.
d98a47c7
MD
84 *
85 * Beware: metadata_switch_timer() should *never* take a mutex also held
86 * while consumer_timer_switch_stop() is called. It would result in
87 * deadlocks.
331744e3
JD
88 */
89static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 90 siginfo_t *si)
331744e3
JD
91{
92 int ret;
93 struct lttng_consumer_channel *channel;
94
95 channel = si->si_value.sival_ptr;
96 assert(channel);
97
4419b4fb
MD
98 if (channel->switch_timer_error) {
99 return;
100 }
101
331744e3
JD
102 DBG("Switch timer for channel %" PRIu64, channel->key);
103 switch (ctx->type) {
104 case LTTNG_CONSUMER32_UST:
105 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
106 /*
107 * Locks taken by lttng_ustconsumer_request_metadata():
108 * - metadata_socket_lock
109 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 110 * - channel->metadata_cache->lock
4fa3dc0e 111 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
112 * - channel->timer_lock
113 * - channel->metadata_cache->lock
4fa3dc0e 114 *
5e41ebe1
MD
115 * Ensure that neither consumer_data.lock nor
116 * channel->lock are taken within this function, since
117 * they are held while consumer_timer_switch_stop() is
118 * called.
4fa3dc0e 119 */
94d49140 120 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 121 if (ret < 0) {
4419b4fb 122 channel->switch_timer_error = 1;
331744e3
JD
123 }
124 break;
125 case LTTNG_CONSUMER_KERNEL:
126 case LTTNG_CONSUMER_UNKNOWN:
127 assert(0);
128 break;
129 }
130}
131
528f2ffa
JD
132static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
133 uint64_t stream_id)
d3e2ba59
JD
134{
135 int ret;
50adc264 136 struct ctf_packet_index index;
d3e2ba59
JD
137
138 memset(&index, 0, sizeof(index));
528f2ffa 139 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
140 index.timestamp_end = htobe64(ts);
141 ret = consumer_stream_write_index(stream, &index);
142 if (ret < 0) {
143 goto error;
144 }
145
146error:
147 return ret;
148}
149
c585821b 150int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 151{
528f2ffa 152 uint64_t ts, stream_id;
d3e2ba59
JD
153 int ret;
154
d3e2ba59
JD
155 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
156 if (ret < 0) {
157 ERR("Failed to get the current timestamp");
c585821b 158 goto end;
d3e2ba59
JD
159 }
160 ret = kernctl_buffer_flush(stream->wait_fd);
161 if (ret < 0) {
162 ERR("Failed to flush kernel stream");
c585821b 163 goto end;
d3e2ba59
JD
164 }
165 ret = kernctl_snapshot(stream->wait_fd);
166 if (ret < 0) {
32af2c95 167 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 168 PERROR("live timer kernel snapshot");
d3e2ba59 169 ret = -1;
c585821b 170 goto end;
d3e2ba59 171 }
528f2ffa
JD
172 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
173 if (ret < 0) {
174 PERROR("kernctl_get_stream_id");
c585821b 175 goto end;
528f2ffa 176 }
d3e2ba59 177 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 178 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 179 if (ret < 0) {
c585821b 180 goto end;
d3e2ba59
JD
181 }
182 }
183 ret = 0;
c585821b 184end:
d3e2ba59
JD
185 return ret;
186}
187
c585821b 188static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 189{
d3e2ba59
JD
190 int ret;
191
d3e2ba59
JD
192 /*
193 * While holding the stream mutex, try to take a snapshot, if it
194 * succeeds, it means that data is ready to be sent, just let the data
195 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
196 * means that there is no data to read after the flush, so we can
197 * safely send the empty index.
c585821b
MD
198 *
199 * Doing a trylock and checking if waiting on metadata if
200 * trylock fails. Bail out of the stream is indeed waiting for
201 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 202 */
c585821b
MD
203 for (;;) {
204 ret = pthread_mutex_trylock(&stream->lock);
205 switch (ret) {
206 case 0:
207 break; /* We have the lock. */
208 case EBUSY:
209 pthread_mutex_lock(&stream->metadata_timer_lock);
210 if (stream->waiting_on_metadata) {
211 ret = 0;
212 stream->missed_metadata_flush = true;
213 pthread_mutex_unlock(&stream->metadata_timer_lock);
214 goto end; /* Bail out. */
215 }
216 pthread_mutex_unlock(&stream->metadata_timer_lock);
217 /* Try again. */
218 caa_cpu_relax();
219 continue;
220 default:
221 ERR("Unexpected pthread_mutex_trylock error %d", ret);
222 ret = -1;
223 goto end;
224 }
225 break;
226 }
227 ret = consumer_flush_kernel_index(stream);
228 pthread_mutex_unlock(&stream->lock);
229end:
230 return ret;
231}
232
233int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
234{
235 uint64_t ts, stream_id;
236 int ret;
237
94d49140
JD
238 ret = cds_lfht_is_node_deleted(&stream->node.node);
239 if (ret) {
c585821b 240 goto end;
94d49140
JD
241 }
242
84a182ce 243 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
244 if (ret < 0) {
245 ERR("Failed to get the current timestamp");
c585821b 246 goto end;
d3e2ba59 247 }
84a182ce
DG
248 lttng_ustconsumer_flush_buffer(stream, 1);
249 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 250 if (ret < 0) {
94d49140 251 if (ret != -EAGAIN) {
d3e2ba59
JD
252 ERR("Taking UST snapshot");
253 ret = -1;
c585821b 254 goto end;
d3e2ba59 255 }
70190e1c 256 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
257 if (ret < 0) {
258 PERROR("ustctl_get_stream_id");
c585821b 259 goto end;
528f2ffa 260 }
d3e2ba59 261 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 262 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 263 if (ret < 0) {
c585821b 264 goto end;
d3e2ba59
JD
265 }
266 }
267 ret = 0;
c585821b
MD
268end:
269 return ret;
270}
d3e2ba59 271
c585821b
MD
272static int check_ust_stream(struct lttng_consumer_stream *stream)
273{
274 int ret;
275
276 assert(stream);
277 assert(stream->ustream);
278 /*
279 * While holding the stream mutex, try to take a snapshot, if it
280 * succeeds, it means that data is ready to be sent, just let the data
281 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
282 * means that there is no data to read after the flush, so we can
283 * safely send the empty index.
284 *
285 * Doing a trylock and checking if waiting on metadata if
286 * trylock fails. Bail out of the stream is indeed waiting for
287 * metadata to be pushed. Busy wait on trylock otherwise.
288 */
289 for (;;) {
290 ret = pthread_mutex_trylock(&stream->lock);
291 switch (ret) {
292 case 0:
293 break; /* We have the lock. */
294 case EBUSY:
295 pthread_mutex_lock(&stream->metadata_timer_lock);
296 if (stream->waiting_on_metadata) {
297 ret = 0;
298 stream->missed_metadata_flush = true;
299 pthread_mutex_unlock(&stream->metadata_timer_lock);
300 goto end; /* Bail out. */
301 }
302 pthread_mutex_unlock(&stream->metadata_timer_lock);
303 /* Try again. */
304 caa_cpu_relax();
305 continue;
306 default:
307 ERR("Unexpected pthread_mutex_trylock error %d", ret);
308 ret = -1;
309 goto end;
310 }
311 break;
312 }
313 ret = consumer_flush_ust_index(stream);
d3e2ba59 314 pthread_mutex_unlock(&stream->lock);
c585821b 315end:
d3e2ba59
JD
316 return ret;
317}
318
319/*
320 * Execute action on a live timer
321 */
322static void live_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 323 siginfo_t *si)
d3e2ba59
JD
324{
325 int ret;
326 struct lttng_consumer_channel *channel;
327 struct lttng_consumer_stream *stream;
328 struct lttng_ht *ht;
329 struct lttng_ht_iter iter;
330
331 channel = si->si_value.sival_ptr;
332 assert(channel);
333
334 if (channel->switch_timer_error) {
335 goto error;
336 }
337 ht = consumer_data.stream_per_chan_id_ht;
338
339 DBG("Live timer for channel %" PRIu64, channel->key);
340
341 rcu_read_lock();
342 switch (ctx->type) {
343 case LTTNG_CONSUMER32_UST:
344 case LTTNG_CONSUMER64_UST:
345 cds_lfht_for_each_entry_duplicate(ht->ht,
346 ht->hash_fct(&channel->key, lttng_ht_seed),
347 ht->match_fct, &channel->key, &iter.iter,
348 stream, node_channel_id.node) {
349 ret = check_ust_stream(stream);
350 if (ret < 0) {
351 goto error_unlock;
352 }
353 }
354 break;
355 case LTTNG_CONSUMER_KERNEL:
356 cds_lfht_for_each_entry_duplicate(ht->ht,
357 ht->hash_fct(&channel->key, lttng_ht_seed),
358 ht->match_fct, &channel->key, &iter.iter,
359 stream, node_channel_id.node) {
360 ret = check_kernel_stream(stream);
361 if (ret < 0) {
362 goto error_unlock;
363 }
364 }
365 break;
366 case LTTNG_CONSUMER_UNKNOWN:
367 assert(0);
368 break;
369 }
370
371error_unlock:
372 rcu_read_unlock();
373
374error:
375 return;
376}
377
2b8f8754
MD
378static
379void consumer_timer_signal_thread_qs(unsigned int signr)
380{
381 sigset_t pending_set;
382 int ret;
383
384 /*
385 * We need to be the only thread interacting with the thread
386 * that manages signals for teardown synchronization.
387 */
388 pthread_mutex_lock(&timer_signal.lock);
389
390 /* Ensure we don't have any signal queued for this channel. */
391 for (;;) {
392 ret = sigemptyset(&pending_set);
393 if (ret == -1) {
394 PERROR("sigemptyset");
395 }
396 ret = sigpending(&pending_set);
397 if (ret == -1) {
398 PERROR("sigpending");
399 }
f05a6b6d 400 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
401 break;
402 }
403 caa_cpu_relax();
404 }
405
406 /*
407 * From this point, no new signal handler will be fired that would try to
408 * access "chan". However, we still need to wait for any currently
409 * executing handler to complete.
410 */
411 cmm_smp_mb();
412 CMM_STORE_SHARED(timer_signal.qs_done, 0);
413 cmm_smp_mb();
414
415 /*
416 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
417 * up.
418 */
419 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
420
421 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
422 caa_cpu_relax();
423 }
424 cmm_smp_mb();
425
426 pthread_mutex_unlock(&timer_signal.lock);
427}
428
331744e3 429/*
e9404c27
JG
430 * Start a timer channel timer which will fire at a given interval
431 * (timer_interval_us)and fire a given signal (signal).
432 *
433 * Returns a negative value on error, 0 if a timer was created, and
434 * a positive value if no timer was created (not an error).
331744e3 435 */
e9404c27
JG
436static
437int consumer_channel_timer_start(timer_t *timer_id,
438 struct lttng_consumer_channel *channel,
439 unsigned int timer_interval_us, int signal)
331744e3 440{
e9404c27 441 int ret = 0, delete_ret;
331744e3
JD
442 struct sigevent sev;
443 struct itimerspec its;
444
445 assert(channel);
446 assert(channel->key);
447
e9404c27
JG
448 if (timer_interval_us == 0) {
449 /* No creation needed; not an error. */
450 ret = 1;
451 goto end;
331744e3
JD
452 }
453
454 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 455 sev.sigev_signo = signal;
331744e3 456 sev.sigev_value.sival_ptr = channel;
e9404c27 457 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
458 if (ret == -1) {
459 PERROR("timer_create");
e9404c27 460 goto end;
331744e3 461 }
331744e3 462
e9404c27
JG
463 its.it_value.tv_sec = timer_interval_us / 1000000;
464 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
465 its.it_interval.tv_sec = its.it_value.tv_sec;
466 its.it_interval.tv_nsec = its.it_value.tv_nsec;
467
e9404c27 468 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
469 if (ret == -1) {
470 PERROR("timer_settime");
e9404c27
JG
471 goto error_destroy_timer;
472 }
473end:
474 return ret;
475error_destroy_timer:
476 delete_ret = timer_delete(*timer_id);
477 if (delete_ret == -1) {
478 PERROR("timer_delete");
479 }
480 goto end;
481}
482
483static
484int consumer_channel_timer_stop(timer_t *timer_id, int signal)
485{
486 int ret = 0;
487
488 ret = timer_delete(*timer_id);
489 if (ret == -1) {
490 PERROR("timer_delete");
491 goto end;
331744e3 492 }
e9404c27
JG
493
494 consumer_timer_signal_thread_qs(signal);
495 *timer_id = 0;
496end:
497 return ret;
498}
499
500/*
501 * Set the channel's switch timer.
502 */
503void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
504 unsigned int switch_timer_interval_us)
505{
506 int ret;
507
508 assert(channel);
509 assert(channel->key);
510
511 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
512 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
513
514 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
515}
516
517/*
e9404c27 518 * Stop and delete the channel's switch timer.
331744e3
JD
519 */
520void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
521{
522 int ret;
331744e3
JD
523
524 assert(channel);
525
e9404c27
JG
526 ret = consumer_channel_timer_stop(&channel->switch_timer,
527 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 528 if (ret == -1) {
e9404c27 529 ERR("Failed to stop switch timer");
331744e3
JD
530 }
531
2b8f8754 532 channel->switch_timer_enabled = 0;
331744e3
JD
533}
534
d3e2ba59 535/*
e9404c27 536 * Set the channel's live timer.
d3e2ba59
JD
537 */
538void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 539 unsigned int live_timer_interval_us)
d3e2ba59
JD
540{
541 int ret;
d3e2ba59
JD
542
543 assert(channel);
544 assert(channel->key);
545
e9404c27
JG
546 ret = consumer_channel_timer_start(&channel->live_timer, channel,
547 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 548
e9404c27
JG
549 channel->live_timer_enabled = !!(ret == 0);
550}
d3e2ba59 551
e9404c27
JG
552/*
553 * Stop and delete the channel's live timer.
554 */
555void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
556{
557 int ret;
558
559 assert(channel);
d3e2ba59 560
e9404c27
JG
561 ret = consumer_channel_timer_stop(&channel->live_timer,
562 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 563 if (ret == -1) {
e9404c27 564 ERR("Failed to stop live timer");
d3e2ba59 565 }
e9404c27
JG
566
567 channel->live_timer_enabled = 0;
d3e2ba59
JD
568}
569
570/*
e9404c27
JG
571 * Set the channel's monitoring timer.
572 *
573 * Returns a negative value on error, 0 if a timer was created, and
574 * a positive value if no timer was created (not an error).
d3e2ba59 575 */
e9404c27
JG
576int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
577 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
578{
579 int ret;
580
581 assert(channel);
e9404c27
JG
582 assert(channel->key);
583 assert(!channel->monitor_timer_enabled);
d3e2ba59 584
e9404c27
JG
585 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
586 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
587 channel->monitor_timer_enabled = !!(ret == 0);
588 return ret;
589}
590
591/*
592 * Stop and delete the channel's monitoring timer.
593 */
594int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
595{
596 int ret;
597
598 assert(channel);
599 assert(channel->monitor_timer_enabled);
600
601 ret = consumer_channel_timer_stop(&channel->monitor_timer,
602 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 603 if (ret == -1) {
e9404c27
JG
604 ERR("Failed to stop live timer");
605 goto end;
d3e2ba59
JD
606 }
607
e9404c27
JG
608 channel->monitor_timer_enabled = 0;
609end:
610 return ret;
d3e2ba59
JD
611}
612
331744e3
JD
613/*
614 * Block the RT signals for the entire process. It must be called from the
615 * consumer main before creating the threads
616 */
73664f81 617int consumer_signal_init(void)
331744e3
JD
618{
619 int ret;
620 sigset_t mask;
621
622 /* Block signal for entire process, so only our thread processes it. */
623 setmask(&mask);
624 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
625 if (ret) {
626 errno = ret;
627 PERROR("pthread_sigmask");
73664f81 628 return -1;
331744e3 629 }
73664f81 630 return 0;
331744e3
JD
631}
632
e9404c27
JG
633static
634int sample_channel_positions(struct lttng_consumer_channel *channel,
e8360425 635 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
e9404c27
JG
636 sample_positions_cb sample, get_consumed_cb get_consumed,
637 get_produced_cb get_produced)
638{
23bc9bb5 639 int ret = 0;
e9404c27
JG
640 struct lttng_ht_iter iter;
641 struct lttng_consumer_stream *stream;
642 bool empty_channel = true;
643 uint64_t high = 0, low = UINT64_MAX;
644 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
645
e8360425
JD
646 *_total_consumed = 0;
647
e9404c27
JG
648 rcu_read_lock();
649
650 cds_lfht_for_each_entry_duplicate(ht->ht,
651 ht->hash_fct(&channel->key, lttng_ht_seed),
652 ht->match_fct, &channel->key,
653 &iter.iter, stream, node_channel_id.node) {
654 unsigned long produced, consumed, usage;
655
656 empty_channel = false;
657
658 pthread_mutex_lock(&stream->lock);
659 if (cds_lfht_is_node_deleted(&stream->node.node)) {
660 goto next;
661 }
662
663 ret = sample(stream);
664 if (ret) {
665 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
666 pthread_mutex_unlock(&stream->lock);
667 goto end;
668 }
669 ret = get_consumed(stream, &consumed);
670 if (ret) {
671 ERR("Failed to get buffer consumed position in monitor timer");
672 pthread_mutex_unlock(&stream->lock);
673 goto end;
674 }
675 ret = get_produced(stream, &produced);
676 if (ret) {
677 ERR("Failed to get buffer produced position in monitor timer");
678 pthread_mutex_unlock(&stream->lock);
679 goto end;
680 }
681
682 usage = produced - consumed;
683 high = (usage > high) ? usage : high;
684 low = (usage < low) ? usage : low;
e8360425
JD
685
686 /*
687 * We don't use consumed here for 2 reasons:
688 * - output_written takes into account the padding written in the
689 * tracefiles when we stop the session;
690 * - the consumed position is not the accurate representation of what
691 * was extracted from a buffer in overwrite mode.
692 */
693 *_total_consumed += stream->output_written;
e9404c27
JG
694 next:
695 pthread_mutex_unlock(&stream->lock);
696 }
697
698 *_highest_use = high;
699 *_lowest_use = low;
700end:
701 rcu_read_unlock();
702 if (empty_channel) {
703 ret = -1;
704 }
705 return ret;
706}
707
708/*
709 * Execute action on a monitor timer.
710 */
711static
5704917f 712void monitor_timer(struct lttng_consumer_channel *channel)
e9404c27
JG
713{
714 int ret;
715 int channel_monitor_pipe =
716 consumer_timer_thread_get_channel_monitor_pipe();
717 struct lttcomm_consumer_channel_monitor_msg msg = {
718 .key = channel->key,
719 };
720 sample_positions_cb sample;
721 get_consumed_cb get_consumed;
722 get_produced_cb get_produced;
ef4b086e 723 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27
JG
724
725 assert(channel);
e9404c27
JG
726
727 if (channel_monitor_pipe < 0) {
873dda4e 728 return;
e9404c27
JG
729 }
730
731 switch (consumer_data.type) {
732 case LTTNG_CONSUMER_KERNEL:
733 sample = lttng_kconsumer_sample_snapshot_positions;
734 get_consumed = lttng_kconsumer_get_consumed_snapshot;
735 get_produced = lttng_kconsumer_get_produced_snapshot;
736 break;
737 case LTTNG_CONSUMER32_UST:
738 case LTTNG_CONSUMER64_UST:
739 sample = lttng_ustconsumer_sample_snapshot_positions;
740 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
741 get_produced = lttng_ustconsumer_get_produced_snapshot;
742 break;
743 default:
744 abort();
745 }
746
ef4b086e
JG
747 ret = sample_channel_positions(channel, &highest, &lowest,
748 &total_consumed, sample, get_consumed, get_produced);
e9404c27 749 if (ret) {
873dda4e 750 return;
e9404c27 751 }
ef4b086e
JG
752 msg.highest = highest;
753 msg.lowest = lowest;
754 msg.total_consumed = total_consumed;
e9404c27
JG
755
756 /*
757 * Writes performed here are assumed to be atomic which is only
758 * guaranteed for sizes < than PIPE_BUF.
759 */
760 assert(sizeof(msg) <= PIPE_BUF);
761
762 do {
763 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
764 } while (ret == -1 && errno == EINTR);
765 if (ret == -1) {
766 if (errno == EAGAIN) {
767 /* Not an error, the sample is merely dropped. */
768 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
769 channel->key);
770 } else {
771 PERROR("write to the channel monitor pipe");
772 }
773 } else {
774 DBG("Sent channel monitoring sample for channel key %" PRIu64
775 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
776 channel->key, msg.highest, msg.lowest);
777 }
e9404c27
JG
778}
779
780int consumer_timer_thread_get_channel_monitor_pipe(void)
781{
782 return uatomic_read(&channel_monitor_pipe);
783}
784
785int consumer_timer_thread_set_channel_monitor_pipe(int fd)
786{
787 int ret;
788
789 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
790 if (ret != -1) {
791 ret = -1;
792 goto end;
793 }
794 ret = 0;
795end:
796 return ret;
797}
798
331744e3 799/*
d3e2ba59 800 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 801 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 802 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 803 */
d3e2ba59 804void *consumer_timer_thread(void *data)
331744e3
JD
805{
806 int signr;
807 sigset_t mask;
808 siginfo_t info;
809 struct lttng_consumer_local_data *ctx = data;
810
8a9acb74
MD
811 rcu_register_thread();
812
1fc79fb4
MD
813 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
814
2d57de81
MD
815 if (testpoint(consumerd_thread_metadata_timer)) {
816 goto error_testpoint;
817 }
818
9ce5646a
MD
819 health_code_update();
820
331744e3
JD
821 /* Only self thread will receive signal mask. */
822 setmask(&mask);
823 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
824
825 while (1) {
9ce5646a
MD
826 health_code_update();
827
828 health_poll_entry();
331744e3 829 signr = sigwaitinfo(&mask, &info);
9ce5646a 830 health_poll_exit();
e9404c27
JG
831
832 /*
833 * NOTE: cascading conditions are used instead of a switch case
834 * since the use of SIGRTMIN in the definition of the signals'
835 * values prevents the reduction to an integer constant.
836 */
331744e3
JD
837 if (signr == -1) {
838 if (errno != EINTR) {
839 PERROR("sigwaitinfo");
840 }
841 continue;
842 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 843 metadata_switch_timer(ctx, &info);
331744e3
JD
844 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
845 cmm_smp_mb();
846 CMM_STORE_SHARED(timer_signal.qs_done, 1);
847 cmm_smp_mb();
848 DBG("Signal timer metadata thread teardown");
d3e2ba59 849 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 850 live_timer(ctx, &info);
e9404c27
JG
851 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
852 struct lttng_consumer_channel *channel;
853
854 channel = info.si_value.sival_ptr;
5704917f 855 monitor_timer(channel);
13675d0e
MD
856 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
857 assert(CMM_LOAD_SHARED(consumer_quit));
858 goto end;
331744e3
JD
859 } else {
860 ERR("Unexpected signal %d\n", info.si_signo);
861 }
862 }
863
2d57de81
MD
864error_testpoint:
865 /* Only reached in testpoint error */
866 health_error();
13675d0e 867end:
1fc79fb4 868 health_unregister(health_consumerd);
8a9acb74 869 rcu_unregister_thread();
331744e3
JD
870 return NULL;
871}
This page took 0.081893 seconds and 4 git commands to generate.