Move libconsumer under common/consumer/
[lttng-tools.git] / src / common / consumer / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _LGPL_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer/consumer-stream.h>
30 #include <common/consumer/consumer-timer.h>
31 #include <common/consumer/consumer-testpoint.h>
32 #include <common/ust-consumer/ust-consumer.h>
33
34 static struct timer_signal_data timer_signal = {
35 .tid = 0,
36 .setup_done = 0,
37 .qs_done = 0,
38 .lock = PTHREAD_MUTEX_INITIALIZER,
39 };
40
41 /*
42 * Set custom signal mask to current thread.
43 */
44 static void setmask(sigset_t *mask)
45 {
46 int ret;
47
48 ret = sigemptyset(mask);
49 if (ret) {
50 PERROR("sigemptyset");
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
53 if (ret) {
54 PERROR("sigaddset switch");
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
57 if (ret) {
58 PERROR("sigaddset teardown");
59 }
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
61 if (ret) {
62 PERROR("sigaddset live");
63 }
64 }
65
66 /*
67 * Execute action on a timer switch.
68 *
69 * Beware: metadata_switch_timer() should *never* take a mutex also held
70 * while consumer_timer_switch_stop() is called. It would result in
71 * deadlocks.
72 */
73 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
74 int sig, siginfo_t *si, void *uc)
75 {
76 int ret;
77 struct lttng_consumer_channel *channel;
78
79 channel = si->si_value.sival_ptr;
80 assert(channel);
81
82 if (channel->switch_timer_error) {
83 return;
84 }
85
86 DBG("Switch timer for channel %" PRIu64, channel->key);
87 switch (ctx->type) {
88 case LTTNG_CONSUMER32_UST:
89 case LTTNG_CONSUMER64_UST:
90 /*
91 * Locks taken by lttng_ustconsumer_request_metadata():
92 * - metadata_socket_lock
93 * - Calling lttng_ustconsumer_recv_metadata():
94 * - channel->metadata_cache->lock
95 * - Calling consumer_metadata_cache_flushed():
96 * - channel->timer_lock
97 * - channel->metadata_cache->lock
98 *
99 * Ensure that neither consumer_data.lock nor
100 * channel->lock are taken within this function, since
101 * they are held while consumer_timer_switch_stop() is
102 * called.
103 */
104 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
105 if (ret < 0) {
106 channel->switch_timer_error = 1;
107 }
108 break;
109 case LTTNG_CONSUMER_KERNEL:
110 case LTTNG_CONSUMER_UNKNOWN:
111 assert(0);
112 break;
113 }
114 }
115
116 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
117 uint64_t stream_id)
118 {
119 int ret;
120 struct ctf_packet_index index;
121
122 memset(&index, 0, sizeof(index));
123 index.stream_id = htobe64(stream_id);
124 index.timestamp_end = htobe64(ts);
125 ret = consumer_stream_write_index(stream, &index);
126 if (ret < 0) {
127 goto error;
128 }
129
130 error:
131 return ret;
132 }
133
134 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
135 {
136 uint64_t ts, stream_id;
137 int ret;
138
139 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
140 if (ret < 0) {
141 ERR("Failed to get the current timestamp");
142 goto end;
143 }
144 ret = kernctl_buffer_flush(stream->wait_fd);
145 if (ret < 0) {
146 ERR("Failed to flush kernel stream");
147 goto end;
148 }
149 ret = kernctl_snapshot(stream->wait_fd);
150 if (ret < 0) {
151 if (errno != EAGAIN && errno != ENODATA) {
152 PERROR("live timer kernel snapshot");
153 ret = -1;
154 goto end;
155 }
156 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
157 if (ret < 0) {
158 PERROR("kernctl_get_stream_id");
159 goto end;
160 }
161 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
162 ret = send_empty_index(stream, ts, stream_id);
163 if (ret < 0) {
164 goto end;
165 }
166 }
167 ret = 0;
168 end:
169 return ret;
170 }
171
172 static int check_kernel_stream(struct lttng_consumer_stream *stream)
173 {
174 int ret;
175
176 /*
177 * While holding the stream mutex, try to take a snapshot, if it
178 * succeeds, it means that data is ready to be sent, just let the data
179 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
180 * means that there is no data to read after the flush, so we can
181 * safely send the empty index.
182 *
183 * Doing a trylock and checking if waiting on metadata if
184 * trylock fails. Bail out of the stream is indeed waiting for
185 * metadata to be pushed. Busy wait on trylock otherwise.
186 */
187 for (;;) {
188 ret = pthread_mutex_trylock(&stream->lock);
189 switch (ret) {
190 case 0:
191 break; /* We have the lock. */
192 case EBUSY:
193 pthread_mutex_lock(&stream->metadata_timer_lock);
194 if (stream->waiting_on_metadata) {
195 ret = 0;
196 stream->missed_metadata_flush = true;
197 pthread_mutex_unlock(&stream->metadata_timer_lock);
198 goto end; /* Bail out. */
199 }
200 pthread_mutex_unlock(&stream->metadata_timer_lock);
201 /* Try again. */
202 caa_cpu_relax();
203 continue;
204 default:
205 ERR("Unexpected pthread_mutex_trylock error %d", ret);
206 ret = -1;
207 goto end;
208 }
209 break;
210 }
211 ret = consumer_flush_kernel_index(stream);
212 pthread_mutex_unlock(&stream->lock);
213 end:
214 return ret;
215 }
216
217 int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
218 {
219 uint64_t ts, stream_id;
220 int ret;
221
222 ret = cds_lfht_is_node_deleted(&stream->node.node);
223 if (ret) {
224 goto end;
225 }
226
227 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
228 if (ret < 0) {
229 ERR("Failed to get the current timestamp");
230 goto end;
231 }
232 lttng_ustconsumer_flush_buffer(stream, 1);
233 ret = lttng_ustconsumer_take_snapshot(stream);
234 if (ret < 0) {
235 if (ret != -EAGAIN) {
236 ERR("Taking UST snapshot");
237 ret = -1;
238 goto end;
239 }
240 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
241 if (ret < 0) {
242 PERROR("ustctl_get_stream_id");
243 goto end;
244 }
245 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
246 ret = send_empty_index(stream, ts, stream_id);
247 if (ret < 0) {
248 goto end;
249 }
250 }
251 ret = 0;
252 end:
253 return ret;
254 }
255
256 static int check_ust_stream(struct lttng_consumer_stream *stream)
257 {
258 int ret;
259
260 assert(stream);
261 assert(stream->ustream);
262 /*
263 * While holding the stream mutex, try to take a snapshot, if it
264 * succeeds, it means that data is ready to be sent, just let the data
265 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
266 * means that there is no data to read after the flush, so we can
267 * safely send the empty index.
268 *
269 * Doing a trylock and checking if waiting on metadata if
270 * trylock fails. Bail out of the stream is indeed waiting for
271 * metadata to be pushed. Busy wait on trylock otherwise.
272 */
273 for (;;) {
274 ret = pthread_mutex_trylock(&stream->lock);
275 switch (ret) {
276 case 0:
277 break; /* We have the lock. */
278 case EBUSY:
279 pthread_mutex_lock(&stream->metadata_timer_lock);
280 if (stream->waiting_on_metadata) {
281 ret = 0;
282 stream->missed_metadata_flush = true;
283 pthread_mutex_unlock(&stream->metadata_timer_lock);
284 goto end; /* Bail out. */
285 }
286 pthread_mutex_unlock(&stream->metadata_timer_lock);
287 /* Try again. */
288 caa_cpu_relax();
289 continue;
290 default:
291 ERR("Unexpected pthread_mutex_trylock error %d", ret);
292 ret = -1;
293 goto end;
294 }
295 break;
296 }
297 ret = consumer_flush_ust_index(stream);
298 pthread_mutex_unlock(&stream->lock);
299 end:
300 return ret;
301 }
302
303 /*
304 * Execute action on a live timer
305 */
306 static void live_timer(struct lttng_consumer_local_data *ctx,
307 int sig, siginfo_t *si, void *uc)
308 {
309 int ret;
310 struct lttng_consumer_channel *channel;
311 struct lttng_consumer_stream *stream;
312 struct lttng_ht *ht;
313 struct lttng_ht_iter iter;
314
315 channel = si->si_value.sival_ptr;
316 assert(channel);
317
318 if (channel->switch_timer_error) {
319 goto error;
320 }
321 ht = consumer_data.stream_per_chan_id_ht;
322
323 DBG("Live timer for channel %" PRIu64, channel->key);
324
325 rcu_read_lock();
326 switch (ctx->type) {
327 case LTTNG_CONSUMER32_UST:
328 case LTTNG_CONSUMER64_UST:
329 cds_lfht_for_each_entry_duplicate(ht->ht,
330 ht->hash_fct(&channel->key, lttng_ht_seed),
331 ht->match_fct, &channel->key, &iter.iter,
332 stream, node_channel_id.node) {
333 ret = check_ust_stream(stream);
334 if (ret < 0) {
335 goto error_unlock;
336 }
337 }
338 break;
339 case LTTNG_CONSUMER_KERNEL:
340 cds_lfht_for_each_entry_duplicate(ht->ht,
341 ht->hash_fct(&channel->key, lttng_ht_seed),
342 ht->match_fct, &channel->key, &iter.iter,
343 stream, node_channel_id.node) {
344 ret = check_kernel_stream(stream);
345 if (ret < 0) {
346 goto error_unlock;
347 }
348 }
349 break;
350 case LTTNG_CONSUMER_UNKNOWN:
351 assert(0);
352 break;
353 }
354
355 error_unlock:
356 rcu_read_unlock();
357
358 error:
359 return;
360 }
361
362 static
363 void consumer_timer_signal_thread_qs(unsigned int signr)
364 {
365 sigset_t pending_set;
366 int ret;
367
368 /*
369 * We need to be the only thread interacting with the thread
370 * that manages signals for teardown synchronization.
371 */
372 pthread_mutex_lock(&timer_signal.lock);
373
374 /* Ensure we don't have any signal queued for this channel. */
375 for (;;) {
376 ret = sigemptyset(&pending_set);
377 if (ret == -1) {
378 PERROR("sigemptyset");
379 }
380 ret = sigpending(&pending_set);
381 if (ret == -1) {
382 PERROR("sigpending");
383 }
384 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
385 break;
386 }
387 caa_cpu_relax();
388 }
389
390 /*
391 * From this point, no new signal handler will be fired that would try to
392 * access "chan". However, we still need to wait for any currently
393 * executing handler to complete.
394 */
395 cmm_smp_mb();
396 CMM_STORE_SHARED(timer_signal.qs_done, 0);
397 cmm_smp_mb();
398
399 /*
400 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
401 * up.
402 */
403 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
404
405 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
406 caa_cpu_relax();
407 }
408 cmm_smp_mb();
409
410 pthread_mutex_unlock(&timer_signal.lock);
411 }
412
413 /*
414 * Set the timer for periodical metadata flush.
415 */
416 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
417 unsigned int switch_timer_interval)
418 {
419 int ret;
420 struct sigevent sev;
421 struct itimerspec its;
422
423 assert(channel);
424 assert(channel->key);
425
426 if (switch_timer_interval == 0) {
427 return;
428 }
429
430 sev.sigev_notify = SIGEV_SIGNAL;
431 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
432 sev.sigev_value.sival_ptr = channel;
433 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
434 if (ret == -1) {
435 PERROR("timer_create");
436 }
437 channel->switch_timer_enabled = 1;
438
439 its.it_value.tv_sec = switch_timer_interval / 1000000;
440 its.it_value.tv_nsec = switch_timer_interval % 1000000;
441 its.it_interval.tv_sec = its.it_value.tv_sec;
442 its.it_interval.tv_nsec = its.it_value.tv_nsec;
443
444 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
445 if (ret == -1) {
446 PERROR("timer_settime");
447 }
448 }
449
450 /*
451 * Stop and delete timer.
452 */
453 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
454 {
455 int ret;
456
457 assert(channel);
458
459 ret = timer_delete(channel->switch_timer);
460 if (ret == -1) {
461 PERROR("timer_delete");
462 }
463
464 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
465
466 channel->switch_timer = 0;
467 channel->switch_timer_enabled = 0;
468 }
469
470 /*
471 * Set the timer for the live mode.
472 */
473 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
474 int live_timer_interval)
475 {
476 int ret;
477 struct sigevent sev;
478 struct itimerspec its;
479
480 assert(channel);
481 assert(channel->key);
482
483 if (live_timer_interval <= 0) {
484 return;
485 }
486
487 sev.sigev_notify = SIGEV_SIGNAL;
488 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
489 sev.sigev_value.sival_ptr = channel;
490 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
491 if (ret == -1) {
492 PERROR("timer_create");
493 }
494 channel->live_timer_enabled = 1;
495
496 its.it_value.tv_sec = live_timer_interval / 1000000;
497 its.it_value.tv_nsec = live_timer_interval % 1000000;
498 its.it_interval.tv_sec = its.it_value.tv_sec;
499 its.it_interval.tv_nsec = its.it_value.tv_nsec;
500
501 ret = timer_settime(channel->live_timer, 0, &its, NULL);
502 if (ret == -1) {
503 PERROR("timer_settime");
504 }
505 }
506
507 /*
508 * Stop and delete timer.
509 */
510 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
511 {
512 int ret;
513
514 assert(channel);
515
516 ret = timer_delete(channel->live_timer);
517 if (ret == -1) {
518 PERROR("timer_delete");
519 }
520
521 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
522
523 channel->live_timer = 0;
524 channel->live_timer_enabled = 0;
525 }
526
527 /*
528 * Block the RT signals for the entire process. It must be called from the
529 * consumer main before creating the threads
530 */
531 int consumer_signal_init(void)
532 {
533 int ret;
534 sigset_t mask;
535
536 /* Block signal for entire process, so only our thread processes it. */
537 setmask(&mask);
538 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
539 if (ret) {
540 errno = ret;
541 PERROR("pthread_sigmask");
542 return -1;
543 }
544 return 0;
545 }
546
547 /*
548 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
549 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
550 */
551 void *consumer_timer_thread(void *data)
552 {
553 int signr;
554 sigset_t mask;
555 siginfo_t info;
556 struct lttng_consumer_local_data *ctx = data;
557
558 rcu_register_thread();
559
560 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
561
562 if (testpoint(consumerd_thread_metadata_timer)) {
563 goto error_testpoint;
564 }
565
566 health_code_update();
567
568 /* Only self thread will receive signal mask. */
569 setmask(&mask);
570 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
571
572 while (1) {
573 health_code_update();
574
575 health_poll_entry();
576 signr = sigwaitinfo(&mask, &info);
577 health_poll_exit();
578 if (signr == -1) {
579 if (errno != EINTR) {
580 PERROR("sigwaitinfo");
581 }
582 continue;
583 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
584 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
585 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
586 cmm_smp_mb();
587 CMM_STORE_SHARED(timer_signal.qs_done, 1);
588 cmm_smp_mb();
589 DBG("Signal timer metadata thread teardown");
590 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
591 live_timer(ctx, info.si_signo, &info, NULL);
592 } else {
593 ERR("Unexpected signal %d\n", info.si_signo);
594 }
595 }
596
597 error_testpoint:
598 /* Only reached in testpoint error */
599 health_error();
600 health_unregister(health_consumerd);
601
602 rcu_unregister_thread();
603
604 /* Never return */
605 return NULL;
606 }
This page took 0.041737 seconds and 4 git commands to generate.