Fix: sessiond consumer thread should register as RCU thread
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
6c1c0768 20#define _LGPL_SOURCE
331744e3
JD
21#include <assert.h>
22#include <inttypes.h>
23#include <signal.h>
24
51a9e1c7 25#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 26#include <common/common.h>
f263b7fd 27#include <common/compat/endian.h>
d3e2ba59
JD
28#include <common/kernel-ctl/kernel-ctl.h>
29#include <common/kernel-consumer/kernel-consumer.h>
30#include <common/consumer-stream.h>
331744e3
JD
31
32#include "consumer-timer.h"
2d57de81 33#include "consumer-testpoint.h"
331744e3
JD
34#include "ust-consumer/ust-consumer.h"
35
2b8f8754
MD
36static struct timer_signal_data timer_signal = {
37 .tid = 0,
38 .setup_done = 0,
39 .qs_done = 0,
40 .lock = PTHREAD_MUTEX_INITIALIZER,
41};
331744e3
JD
42
43/*
44 * Set custom signal mask to current thread.
45 */
46static void setmask(sigset_t *mask)
47{
48 int ret;
49
50 ret = sigemptyset(mask);
51 if (ret) {
52 PERROR("sigemptyset");
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
55 if (ret) {
d3e2ba59 56 PERROR("sigaddset switch");
331744e3
JD
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
59 if (ret) {
d3e2ba59
JD
60 PERROR("sigaddset teardown");
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
63 if (ret) {
64 PERROR("sigaddset live");
331744e3
JD
65 }
66}
67
68/*
69 * Execute action on a timer switch.
d98a47c7
MD
70 *
71 * Beware: metadata_switch_timer() should *never* take a mutex also held
72 * while consumer_timer_switch_stop() is called. It would result in
73 * deadlocks.
331744e3
JD
74 */
75static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
76 int sig, siginfo_t *si, void *uc)
77{
78 int ret;
79 struct lttng_consumer_channel *channel;
80
81 channel = si->si_value.sival_ptr;
82 assert(channel);
83
4419b4fb
MD
84 if (channel->switch_timer_error) {
85 return;
86 }
87
331744e3
JD
88 DBG("Switch timer for channel %" PRIu64, channel->key);
89 switch (ctx->type) {
90 case LTTNG_CONSUMER32_UST:
91 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
92 /*
93 * Locks taken by lttng_ustconsumer_request_metadata():
94 * - metadata_socket_lock
95 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 96 * - channel->metadata_cache->lock
4fa3dc0e 97 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
98 * - channel->timer_lock
99 * - channel->metadata_cache->lock
4fa3dc0e 100 *
5e41ebe1
MD
101 * Ensure that neither consumer_data.lock nor
102 * channel->lock are taken within this function, since
103 * they are held while consumer_timer_switch_stop() is
104 * called.
4fa3dc0e 105 */
94d49140 106 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 107 if (ret < 0) {
4419b4fb 108 channel->switch_timer_error = 1;
331744e3
JD
109 }
110 break;
111 case LTTNG_CONSUMER_KERNEL:
112 case LTTNG_CONSUMER_UNKNOWN:
113 assert(0);
114 break;
115 }
116}
117
528f2ffa
JD
118static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
119 uint64_t stream_id)
d3e2ba59
JD
120{
121 int ret;
50adc264 122 struct ctf_packet_index index;
d3e2ba59
JD
123
124 memset(&index, 0, sizeof(index));
528f2ffa 125 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
126 index.timestamp_end = htobe64(ts);
127 ret = consumer_stream_write_index(stream, &index);
128 if (ret < 0) {
129 goto error;
130 }
131
132error:
133 return ret;
134}
135
c585821b 136int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 137{
528f2ffa 138 uint64_t ts, stream_id;
d3e2ba59
JD
139 int ret;
140
d3e2ba59
JD
141 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
142 if (ret < 0) {
143 ERR("Failed to get the current timestamp");
c585821b 144 goto end;
d3e2ba59
JD
145 }
146 ret = kernctl_buffer_flush(stream->wait_fd);
147 if (ret < 0) {
148 ERR("Failed to flush kernel stream");
c585821b 149 goto end;
d3e2ba59
JD
150 }
151 ret = kernctl_snapshot(stream->wait_fd);
152 if (ret < 0) {
08b1dcd3
DG
153 if (errno != EAGAIN && errno != ENODATA) {
154 PERROR("live timer kernel snapshot");
d3e2ba59 155 ret = -1;
c585821b 156 goto end;
d3e2ba59 157 }
528f2ffa
JD
158 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
159 if (ret < 0) {
160 PERROR("kernctl_get_stream_id");
c585821b 161 goto end;
528f2ffa 162 }
d3e2ba59 163 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 164 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 165 if (ret < 0) {
c585821b 166 goto end;
d3e2ba59
JD
167 }
168 }
169 ret = 0;
c585821b 170end:
d3e2ba59
JD
171 return ret;
172}
173
c585821b 174static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 175{
d3e2ba59
JD
176 int ret;
177
d3e2ba59
JD
178 /*
179 * While holding the stream mutex, try to take a snapshot, if it
180 * succeeds, it means that data is ready to be sent, just let the data
181 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
182 * means that there is no data to read after the flush, so we can
183 * safely send the empty index.
c585821b
MD
184 *
185 * Doing a trylock and checking if waiting on metadata if
186 * trylock fails. Bail out of the stream is indeed waiting for
187 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 188 */
c585821b
MD
189 for (;;) {
190 ret = pthread_mutex_trylock(&stream->lock);
191 switch (ret) {
192 case 0:
193 break; /* We have the lock. */
194 case EBUSY:
195 pthread_mutex_lock(&stream->metadata_timer_lock);
196 if (stream->waiting_on_metadata) {
197 ret = 0;
198 stream->missed_metadata_flush = true;
199 pthread_mutex_unlock(&stream->metadata_timer_lock);
200 goto end; /* Bail out. */
201 }
202 pthread_mutex_unlock(&stream->metadata_timer_lock);
203 /* Try again. */
204 caa_cpu_relax();
205 continue;
206 default:
207 ERR("Unexpected pthread_mutex_trylock error %d", ret);
208 ret = -1;
209 goto end;
210 }
211 break;
212 }
213 ret = consumer_flush_kernel_index(stream);
214 pthread_mutex_unlock(&stream->lock);
215end:
216 return ret;
217}
218
219int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
220{
221 uint64_t ts, stream_id;
222 int ret;
223
94d49140
JD
224 ret = cds_lfht_is_node_deleted(&stream->node.node);
225 if (ret) {
c585821b 226 goto end;
94d49140
JD
227 }
228
84a182ce 229 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
230 if (ret < 0) {
231 ERR("Failed to get the current timestamp");
c585821b 232 goto end;
d3e2ba59 233 }
84a182ce
DG
234 lttng_ustconsumer_flush_buffer(stream, 1);
235 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 236 if (ret < 0) {
94d49140 237 if (ret != -EAGAIN) {
d3e2ba59
JD
238 ERR("Taking UST snapshot");
239 ret = -1;
c585821b 240 goto end;
d3e2ba59 241 }
70190e1c 242 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
243 if (ret < 0) {
244 PERROR("ustctl_get_stream_id");
c585821b 245 goto end;
528f2ffa 246 }
d3e2ba59 247 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 248 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 249 if (ret < 0) {
c585821b 250 goto end;
d3e2ba59
JD
251 }
252 }
253 ret = 0;
c585821b
MD
254end:
255 return ret;
256}
d3e2ba59 257
c585821b
MD
258static int check_ust_stream(struct lttng_consumer_stream *stream)
259{
260 int ret;
261
262 assert(stream);
263 assert(stream->ustream);
264 /*
265 * While holding the stream mutex, try to take a snapshot, if it
266 * succeeds, it means that data is ready to be sent, just let the data
267 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
268 * means that there is no data to read after the flush, so we can
269 * safely send the empty index.
270 *
271 * Doing a trylock and checking if waiting on metadata if
272 * trylock fails. Bail out of the stream is indeed waiting for
273 * metadata to be pushed. Busy wait on trylock otherwise.
274 */
275 for (;;) {
276 ret = pthread_mutex_trylock(&stream->lock);
277 switch (ret) {
278 case 0:
279 break; /* We have the lock. */
280 case EBUSY:
281 pthread_mutex_lock(&stream->metadata_timer_lock);
282 if (stream->waiting_on_metadata) {
283 ret = 0;
284 stream->missed_metadata_flush = true;
285 pthread_mutex_unlock(&stream->metadata_timer_lock);
286 goto end; /* Bail out. */
287 }
288 pthread_mutex_unlock(&stream->metadata_timer_lock);
289 /* Try again. */
290 caa_cpu_relax();
291 continue;
292 default:
293 ERR("Unexpected pthread_mutex_trylock error %d", ret);
294 ret = -1;
295 goto end;
296 }
297 break;
298 }
299 ret = consumer_flush_ust_index(stream);
d3e2ba59 300 pthread_mutex_unlock(&stream->lock);
c585821b 301end:
d3e2ba59
JD
302 return ret;
303}
304
305/*
306 * Execute action on a live timer
307 */
308static void live_timer(struct lttng_consumer_local_data *ctx,
309 int sig, siginfo_t *si, void *uc)
310{
311 int ret;
312 struct lttng_consumer_channel *channel;
313 struct lttng_consumer_stream *stream;
314 struct lttng_ht *ht;
315 struct lttng_ht_iter iter;
316
317 channel = si->si_value.sival_ptr;
318 assert(channel);
319
320 if (channel->switch_timer_error) {
321 goto error;
322 }
323 ht = consumer_data.stream_per_chan_id_ht;
324
325 DBG("Live timer for channel %" PRIu64, channel->key);
326
327 rcu_read_lock();
328 switch (ctx->type) {
329 case LTTNG_CONSUMER32_UST:
330 case LTTNG_CONSUMER64_UST:
331 cds_lfht_for_each_entry_duplicate(ht->ht,
332 ht->hash_fct(&channel->key, lttng_ht_seed),
333 ht->match_fct, &channel->key, &iter.iter,
334 stream, node_channel_id.node) {
335 ret = check_ust_stream(stream);
336 if (ret < 0) {
337 goto error_unlock;
338 }
339 }
340 break;
341 case LTTNG_CONSUMER_KERNEL:
342 cds_lfht_for_each_entry_duplicate(ht->ht,
343 ht->hash_fct(&channel->key, lttng_ht_seed),
344 ht->match_fct, &channel->key, &iter.iter,
345 stream, node_channel_id.node) {
346 ret = check_kernel_stream(stream);
347 if (ret < 0) {
348 goto error_unlock;
349 }
350 }
351 break;
352 case LTTNG_CONSUMER_UNKNOWN:
353 assert(0);
354 break;
355 }
356
357error_unlock:
358 rcu_read_unlock();
359
360error:
361 return;
362}
363
2b8f8754
MD
364static
365void consumer_timer_signal_thread_qs(unsigned int signr)
366{
367 sigset_t pending_set;
368 int ret;
369
370 /*
371 * We need to be the only thread interacting with the thread
372 * that manages signals for teardown synchronization.
373 */
374 pthread_mutex_lock(&timer_signal.lock);
375
376 /* Ensure we don't have any signal queued for this channel. */
377 for (;;) {
378 ret = sigemptyset(&pending_set);
379 if (ret == -1) {
380 PERROR("sigemptyset");
381 }
382 ret = sigpending(&pending_set);
383 if (ret == -1) {
384 PERROR("sigpending");
385 }
386 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
387 break;
388 }
389 caa_cpu_relax();
390 }
391
392 /*
393 * From this point, no new signal handler will be fired that would try to
394 * access "chan". However, we still need to wait for any currently
395 * executing handler to complete.
396 */
397 cmm_smp_mb();
398 CMM_STORE_SHARED(timer_signal.qs_done, 0);
399 cmm_smp_mb();
400
401 /*
402 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
403 * up.
404 */
405 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
406
407 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
408 caa_cpu_relax();
409 }
410 cmm_smp_mb();
411
412 pthread_mutex_unlock(&timer_signal.lock);
413}
414
331744e3
JD
415/*
416 * Set the timer for periodical metadata flush.
417 */
418void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
419 unsigned int switch_timer_interval)
420{
421 int ret;
422 struct sigevent sev;
423 struct itimerspec its;
424
425 assert(channel);
426 assert(channel->key);
427
428 if (switch_timer_interval == 0) {
429 return;
430 }
431
432 sev.sigev_notify = SIGEV_SIGNAL;
433 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
434 sev.sigev_value.sival_ptr = channel;
435 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
436 if (ret == -1) {
437 PERROR("timer_create");
438 }
439 channel->switch_timer_enabled = 1;
440
441 its.it_value.tv_sec = switch_timer_interval / 1000000;
442 its.it_value.tv_nsec = switch_timer_interval % 1000000;
443 its.it_interval.tv_sec = its.it_value.tv_sec;
444 its.it_interval.tv_nsec = its.it_value.tv_nsec;
445
446 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
447 if (ret == -1) {
448 PERROR("timer_settime");
449 }
450}
451
452/*
453 * Stop and delete timer.
454 */
455void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
456{
457 int ret;
331744e3
JD
458
459 assert(channel);
460
461 ret = timer_delete(channel->switch_timer);
462 if (ret == -1) {
463 PERROR("timer_delete");
464 }
465
2b8f8754 466 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 467
2b8f8754
MD
468 channel->switch_timer = 0;
469 channel->switch_timer_enabled = 0;
331744e3
JD
470}
471
d3e2ba59
JD
472/*
473 * Set the timer for the live mode.
474 */
475void consumer_timer_live_start(struct lttng_consumer_channel *channel,
476 int live_timer_interval)
477{
478 int ret;
479 struct sigevent sev;
480 struct itimerspec its;
481
482 assert(channel);
483 assert(channel->key);
484
fac41e72 485 if (live_timer_interval <= 0) {
d3e2ba59
JD
486 return;
487 }
488
489 sev.sigev_notify = SIGEV_SIGNAL;
490 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
491 sev.sigev_value.sival_ptr = channel;
492 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
493 if (ret == -1) {
494 PERROR("timer_create");
495 }
496 channel->live_timer_enabled = 1;
497
498 its.it_value.tv_sec = live_timer_interval / 1000000;
499 its.it_value.tv_nsec = live_timer_interval % 1000000;
500 its.it_interval.tv_sec = its.it_value.tv_sec;
501 its.it_interval.tv_nsec = its.it_value.tv_nsec;
502
503 ret = timer_settime(channel->live_timer, 0, &its, NULL);
504 if (ret == -1) {
505 PERROR("timer_settime");
506 }
507}
508
509/*
510 * Stop and delete timer.
511 */
512void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
513{
514 int ret;
515
516 assert(channel);
517
518 ret = timer_delete(channel->live_timer);
519 if (ret == -1) {
520 PERROR("timer_delete");
521 }
522
523 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
524
525 channel->live_timer = 0;
526 channel->live_timer_enabled = 0;
527}
528
331744e3
JD
529/*
530 * Block the RT signals for the entire process. It must be called from the
531 * consumer main before creating the threads
532 */
73664f81 533int consumer_signal_init(void)
331744e3
JD
534{
535 int ret;
536 sigset_t mask;
537
538 /* Block signal for entire process, so only our thread processes it. */
539 setmask(&mask);
540 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
541 if (ret) {
542 errno = ret;
543 PERROR("pthread_sigmask");
73664f81 544 return -1;
331744e3 545 }
73664f81 546 return 0;
331744e3
JD
547}
548
549/*
d3e2ba59
JD
550 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
551 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 552 */
d3e2ba59 553void *consumer_timer_thread(void *data)
331744e3
JD
554{
555 int signr;
556 sigset_t mask;
557 siginfo_t info;
558 struct lttng_consumer_local_data *ctx = data;
559
1fc79fb4
MD
560 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
561
2d57de81
MD
562 if (testpoint(consumerd_thread_metadata_timer)) {
563 goto error_testpoint;
564 }
565
9ce5646a
MD
566 health_code_update();
567
331744e3
JD
568 /* Only self thread will receive signal mask. */
569 setmask(&mask);
570 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
571
572 while (1) {
9ce5646a
MD
573 health_code_update();
574
575 health_poll_entry();
331744e3 576 signr = sigwaitinfo(&mask, &info);
9ce5646a 577 health_poll_exit();
331744e3
JD
578 if (signr == -1) {
579 if (errno != EINTR) {
580 PERROR("sigwaitinfo");
581 }
582 continue;
583 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
584 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
585 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
586 cmm_smp_mb();
587 CMM_STORE_SHARED(timer_signal.qs_done, 1);
588 cmm_smp_mb();
589 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
590 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
591 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
592 } else {
593 ERR("Unexpected signal %d\n", info.si_signo);
594 }
595 }
596
2d57de81
MD
597error_testpoint:
598 /* Only reached in testpoint error */
599 health_error();
1fc79fb4
MD
600 health_unregister(health_consumerd);
601
602 /* Never return */
331744e3
JD
603 return NULL;
604}
This page took 0.05544 seconds and 4 git commands to generate.