Fix: leak of UST app hash tables
[lttng-tools.git] / src / common / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #define _LGPL_SOURCE
21 #include <assert.h>
22 #include <inttypes.h>
23 #include <signal.h>
24
25 #include <bin/lttng-consumerd/health-consumerd.h>
26 #include <common/common.h>
27 #include <common/compat/endian.h>
28 #include <common/kernel-ctl/kernel-ctl.h>
29 #include <common/kernel-consumer/kernel-consumer.h>
30 #include <common/consumer-stream.h>
31
32 #include "consumer-timer.h"
33 #include "consumer-testpoint.h"
34 #include "ust-consumer/ust-consumer.h"
35
36 static struct timer_signal_data timer_signal = {
37 .tid = 0,
38 .setup_done = 0,
39 .qs_done = 0,
40 .lock = PTHREAD_MUTEX_INITIALIZER,
41 };
42
43 /*
44 * Set custom signal mask to current thread.
45 */
46 static void setmask(sigset_t *mask)
47 {
48 int ret;
49
50 ret = sigemptyset(mask);
51 if (ret) {
52 PERROR("sigemptyset");
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
55 if (ret) {
56 PERROR("sigaddset switch");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
59 if (ret) {
60 PERROR("sigaddset teardown");
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
63 if (ret) {
64 PERROR("sigaddset live");
65 }
66 }
67
68 /*
69 * Execute action on a timer switch.
70 *
71 * Beware: metadata_switch_timer() should *never* take a mutex also held
72 * while consumer_timer_switch_stop() is called. It would result in
73 * deadlocks.
74 */
75 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
76 int sig, siginfo_t *si, void *uc)
77 {
78 int ret;
79 struct lttng_consumer_channel *channel;
80
81 channel = si->si_value.sival_ptr;
82 assert(channel);
83
84 if (channel->switch_timer_error) {
85 return;
86 }
87
88 DBG("Switch timer for channel %" PRIu64, channel->key);
89 switch (ctx->type) {
90 case LTTNG_CONSUMER32_UST:
91 case LTTNG_CONSUMER64_UST:
92 /*
93 * Locks taken by lttng_ustconsumer_request_metadata():
94 * - metadata_socket_lock
95 * - Calling lttng_ustconsumer_recv_metadata():
96 * - channel->metadata_cache->lock
97 * - Calling consumer_metadata_cache_flushed():
98 * - channel->timer_lock
99 * - channel->metadata_cache->lock
100 *
101 * Ensure that neither consumer_data.lock nor
102 * channel->lock are taken within this function, since
103 * they are held while consumer_timer_switch_stop() is
104 * called.
105 */
106 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
107 if (ret < 0) {
108 channel->switch_timer_error = 1;
109 }
110 break;
111 case LTTNG_CONSUMER_KERNEL:
112 case LTTNG_CONSUMER_UNKNOWN:
113 assert(0);
114 break;
115 }
116 }
117
118 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
119 uint64_t stream_id)
120 {
121 int ret;
122 struct ctf_packet_index index;
123
124 memset(&index, 0, sizeof(index));
125 index.stream_id = htobe64(stream_id);
126 index.timestamp_end = htobe64(ts);
127 ret = consumer_stream_write_index(stream, &index);
128 if (ret < 0) {
129 goto error;
130 }
131
132 error:
133 return ret;
134 }
135
136 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
137 {
138 uint64_t ts, stream_id;
139 int ret;
140
141 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
142 if (ret < 0) {
143 ERR("Failed to get the current timestamp");
144 goto end;
145 }
146 ret = kernctl_buffer_flush(stream->wait_fd);
147 if (ret < 0) {
148 ERR("Failed to flush kernel stream");
149 goto end;
150 }
151 ret = kernctl_snapshot(stream->wait_fd);
152 if (ret < 0) {
153 if (errno != EAGAIN && errno != ENODATA) {
154 PERROR("live timer kernel snapshot");
155 ret = -1;
156 goto end;
157 }
158 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
159 if (ret < 0) {
160 PERROR("kernctl_get_stream_id");
161 goto end;
162 }
163 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
164 ret = send_empty_index(stream, ts, stream_id);
165 if (ret < 0) {
166 goto end;
167 }
168 }
169 ret = 0;
170 end:
171 return ret;
172 }
173
174 static int check_kernel_stream(struct lttng_consumer_stream *stream)
175 {
176 int ret;
177
178 /*
179 * While holding the stream mutex, try to take a snapshot, if it
180 * succeeds, it means that data is ready to be sent, just let the data
181 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
182 * means that there is no data to read after the flush, so we can
183 * safely send the empty index.
184 *
185 * Doing a trylock and checking if waiting on metadata if
186 * trylock fails. Bail out of the stream is indeed waiting for
187 * metadata to be pushed. Busy wait on trylock otherwise.
188 */
189 for (;;) {
190 ret = pthread_mutex_trylock(&stream->lock);
191 switch (ret) {
192 case 0:
193 break; /* We have the lock. */
194 case EBUSY:
195 pthread_mutex_lock(&stream->metadata_timer_lock);
196 if (stream->waiting_on_metadata) {
197 ret = 0;
198 stream->missed_metadata_flush = true;
199 pthread_mutex_unlock(&stream->metadata_timer_lock);
200 goto end; /* Bail out. */
201 }
202 pthread_mutex_unlock(&stream->metadata_timer_lock);
203 /* Try again. */
204 caa_cpu_relax();
205 continue;
206 default:
207 ERR("Unexpected pthread_mutex_trylock error %d", ret);
208 ret = -1;
209 goto end;
210 }
211 break;
212 }
213 ret = consumer_flush_kernel_index(stream);
214 pthread_mutex_unlock(&stream->lock);
215 end:
216 return ret;
217 }
218
219 int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
220 {
221 uint64_t ts, stream_id;
222 int ret;
223
224 ret = cds_lfht_is_node_deleted(&stream->node.node);
225 if (ret) {
226 goto end;
227 }
228
229 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
230 if (ret < 0) {
231 ERR("Failed to get the current timestamp");
232 goto end;
233 }
234 lttng_ustconsumer_flush_buffer(stream, 1);
235 ret = lttng_ustconsumer_take_snapshot(stream);
236 if (ret < 0) {
237 if (ret != -EAGAIN) {
238 ERR("Taking UST snapshot");
239 ret = -1;
240 goto end;
241 }
242 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
243 if (ret < 0) {
244 PERROR("ustctl_get_stream_id");
245 goto end;
246 }
247 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
248 ret = send_empty_index(stream, ts, stream_id);
249 if (ret < 0) {
250 goto end;
251 }
252 }
253 ret = 0;
254 end:
255 return ret;
256 }
257
258 static int check_ust_stream(struct lttng_consumer_stream *stream)
259 {
260 int ret;
261
262 assert(stream);
263 assert(stream->ustream);
264 /*
265 * While holding the stream mutex, try to take a snapshot, if it
266 * succeeds, it means that data is ready to be sent, just let the data
267 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
268 * means that there is no data to read after the flush, so we can
269 * safely send the empty index.
270 *
271 * Doing a trylock and checking if waiting on metadata if
272 * trylock fails. Bail out of the stream is indeed waiting for
273 * metadata to be pushed. Busy wait on trylock otherwise.
274 */
275 for (;;) {
276 ret = pthread_mutex_trylock(&stream->lock);
277 switch (ret) {
278 case 0:
279 break; /* We have the lock. */
280 case EBUSY:
281 pthread_mutex_lock(&stream->metadata_timer_lock);
282 if (stream->waiting_on_metadata) {
283 ret = 0;
284 stream->missed_metadata_flush = true;
285 pthread_mutex_unlock(&stream->metadata_timer_lock);
286 goto end; /* Bail out. */
287 }
288 pthread_mutex_unlock(&stream->metadata_timer_lock);
289 /* Try again. */
290 caa_cpu_relax();
291 continue;
292 default:
293 ERR("Unexpected pthread_mutex_trylock error %d", ret);
294 ret = -1;
295 goto end;
296 }
297 break;
298 }
299 ret = consumer_flush_ust_index(stream);
300 pthread_mutex_unlock(&stream->lock);
301 end:
302 return ret;
303 }
304
305 /*
306 * Execute action on a live timer
307 */
308 static void live_timer(struct lttng_consumer_local_data *ctx,
309 int sig, siginfo_t *si, void *uc)
310 {
311 int ret;
312 struct lttng_consumer_channel *channel;
313 struct lttng_consumer_stream *stream;
314 struct lttng_ht *ht;
315 struct lttng_ht_iter iter;
316
317 channel = si->si_value.sival_ptr;
318 assert(channel);
319
320 if (channel->switch_timer_error) {
321 goto error;
322 }
323 ht = consumer_data.stream_per_chan_id_ht;
324
325 DBG("Live timer for channel %" PRIu64, channel->key);
326
327 rcu_read_lock();
328 switch (ctx->type) {
329 case LTTNG_CONSUMER32_UST:
330 case LTTNG_CONSUMER64_UST:
331 cds_lfht_for_each_entry_duplicate(ht->ht,
332 ht->hash_fct(&channel->key, lttng_ht_seed),
333 ht->match_fct, &channel->key, &iter.iter,
334 stream, node_channel_id.node) {
335 ret = check_ust_stream(stream);
336 if (ret < 0) {
337 goto error_unlock;
338 }
339 }
340 break;
341 case LTTNG_CONSUMER_KERNEL:
342 cds_lfht_for_each_entry_duplicate(ht->ht,
343 ht->hash_fct(&channel->key, lttng_ht_seed),
344 ht->match_fct, &channel->key, &iter.iter,
345 stream, node_channel_id.node) {
346 ret = check_kernel_stream(stream);
347 if (ret < 0) {
348 goto error_unlock;
349 }
350 }
351 break;
352 case LTTNG_CONSUMER_UNKNOWN:
353 assert(0);
354 break;
355 }
356
357 error_unlock:
358 rcu_read_unlock();
359
360 error:
361 return;
362 }
363
364 static
365 void consumer_timer_signal_thread_qs(unsigned int signr)
366 {
367 sigset_t pending_set;
368 int ret;
369
370 /*
371 * We need to be the only thread interacting with the thread
372 * that manages signals for teardown synchronization.
373 */
374 pthread_mutex_lock(&timer_signal.lock);
375
376 /* Ensure we don't have any signal queued for this channel. */
377 for (;;) {
378 ret = sigemptyset(&pending_set);
379 if (ret == -1) {
380 PERROR("sigemptyset");
381 }
382 ret = sigpending(&pending_set);
383 if (ret == -1) {
384 PERROR("sigpending");
385 }
386 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
387 break;
388 }
389 caa_cpu_relax();
390 }
391
392 /*
393 * From this point, no new signal handler will be fired that would try to
394 * access "chan". However, we still need to wait for any currently
395 * executing handler to complete.
396 */
397 cmm_smp_mb();
398 CMM_STORE_SHARED(timer_signal.qs_done, 0);
399 cmm_smp_mb();
400
401 /*
402 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
403 * up.
404 */
405 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
406
407 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
408 caa_cpu_relax();
409 }
410 cmm_smp_mb();
411
412 pthread_mutex_unlock(&timer_signal.lock);
413 }
414
415 /*
416 * Set the timer for periodical metadata flush.
417 */
418 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
419 unsigned int switch_timer_interval)
420 {
421 int ret;
422 struct sigevent sev;
423 struct itimerspec its;
424
425 assert(channel);
426 assert(channel->key);
427
428 if (switch_timer_interval == 0) {
429 return;
430 }
431
432 sev.sigev_notify = SIGEV_SIGNAL;
433 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
434 sev.sigev_value.sival_ptr = channel;
435 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
436 if (ret == -1) {
437 PERROR("timer_create");
438 }
439 channel->switch_timer_enabled = 1;
440
441 its.it_value.tv_sec = switch_timer_interval / 1000000;
442 its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
443 its.it_interval.tv_sec = its.it_value.tv_sec;
444 its.it_interval.tv_nsec = its.it_value.tv_nsec;
445
446 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
447 if (ret == -1) {
448 PERROR("timer_settime");
449 }
450 }
451
452 /*
453 * Stop and delete timer.
454 */
455 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
456 {
457 int ret;
458
459 assert(channel);
460
461 ret = timer_delete(channel->switch_timer);
462 if (ret == -1) {
463 PERROR("timer_delete");
464 }
465
466 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
467
468 channel->switch_timer = 0;
469 channel->switch_timer_enabled = 0;
470 }
471
472 /*
473 * Set the timer for the live mode.
474 */
475 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
476 int live_timer_interval)
477 {
478 int ret;
479 struct sigevent sev;
480 struct itimerspec its;
481
482 assert(channel);
483 assert(channel->key);
484
485 if (live_timer_interval <= 0) {
486 return;
487 }
488
489 sev.sigev_notify = SIGEV_SIGNAL;
490 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
491 sev.sigev_value.sival_ptr = channel;
492 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
493 if (ret == -1) {
494 PERROR("timer_create");
495 }
496 channel->live_timer_enabled = 1;
497
498 its.it_value.tv_sec = live_timer_interval / 1000000;
499 its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
500 its.it_interval.tv_sec = its.it_value.tv_sec;
501 its.it_interval.tv_nsec = its.it_value.tv_nsec;
502
503 ret = timer_settime(channel->live_timer, 0, &its, NULL);
504 if (ret == -1) {
505 PERROR("timer_settime");
506 }
507 }
508
509 /*
510 * Stop and delete timer.
511 */
512 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
513 {
514 int ret;
515
516 assert(channel);
517
518 ret = timer_delete(channel->live_timer);
519 if (ret == -1) {
520 PERROR("timer_delete");
521 }
522
523 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
524
525 channel->live_timer = 0;
526 channel->live_timer_enabled = 0;
527 }
528
529 /*
530 * Block the RT signals for the entire process. It must be called from the
531 * consumer main before creating the threads
532 */
533 int consumer_signal_init(void)
534 {
535 int ret;
536 sigset_t mask;
537
538 /* Block signal for entire process, so only our thread processes it. */
539 setmask(&mask);
540 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
541 if (ret) {
542 errno = ret;
543 PERROR("pthread_sigmask");
544 return -1;
545 }
546 return 0;
547 }
548
549 /*
550 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
551 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
552 */
553 void *consumer_timer_thread(void *data)
554 {
555 int signr;
556 sigset_t mask;
557 siginfo_t info;
558 struct lttng_consumer_local_data *ctx = data;
559
560 rcu_register_thread();
561
562 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
563
564 if (testpoint(consumerd_thread_metadata_timer)) {
565 goto error_testpoint;
566 }
567
568 health_code_update();
569
570 /* Only self thread will receive signal mask. */
571 setmask(&mask);
572 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
573
574 while (1) {
575 health_code_update();
576
577 health_poll_entry();
578 signr = sigwaitinfo(&mask, &info);
579 health_poll_exit();
580 if (signr == -1) {
581 if (errno != EINTR) {
582 PERROR("sigwaitinfo");
583 }
584 continue;
585 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
586 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
587 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
588 cmm_smp_mb();
589 CMM_STORE_SHARED(timer_signal.qs_done, 1);
590 cmm_smp_mb();
591 DBG("Signal timer metadata thread teardown");
592 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
593 live_timer(ctx, info.si_signo, &info, NULL);
594 } else {
595 ERR("Unexpected signal %d\n", info.si_signo);
596 }
597 }
598
599 error_testpoint:
600 /* Only reached in testpoint error */
601 health_error();
602 health_unregister(health_consumerd);
603
604 rcu_unregister_thread();
605
606 /* Never return */
607 return NULL;
608 }
This page took 0.066631 seconds and 4 git commands to generate.