Fix: streamline ret/errno of run_as()
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
6c1c0768 20#define _LGPL_SOURCE
331744e3
JD
21#include <assert.h>
22#include <inttypes.h>
23#include <signal.h>
24
51a9e1c7 25#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 26#include <common/common.h>
f263b7fd 27#include <common/compat/endian.h>
d3e2ba59
JD
28#include <common/kernel-ctl/kernel-ctl.h>
29#include <common/kernel-consumer/kernel-consumer.h>
30#include <common/consumer-stream.h>
331744e3
JD
31
32#include "consumer-timer.h"
2d57de81 33#include "consumer-testpoint.h"
331744e3
JD
34#include "ust-consumer/ust-consumer.h"
35
2b8f8754
MD
36static struct timer_signal_data timer_signal = {
37 .tid = 0,
38 .setup_done = 0,
39 .qs_done = 0,
40 .lock = PTHREAD_MUTEX_INITIALIZER,
41};
331744e3
JD
42
43/*
44 * Set custom signal mask to current thread.
45 */
46static void setmask(sigset_t *mask)
47{
48 int ret;
49
50 ret = sigemptyset(mask);
51 if (ret) {
52 PERROR("sigemptyset");
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
55 if (ret) {
d3e2ba59 56 PERROR("sigaddset switch");
331744e3
JD
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
59 if (ret) {
d3e2ba59
JD
60 PERROR("sigaddset teardown");
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
63 if (ret) {
64 PERROR("sigaddset live");
331744e3
JD
65 }
66}
67
68/*
69 * Execute action on a timer switch.
d98a47c7
MD
70 *
71 * Beware: metadata_switch_timer() should *never* take a mutex also held
72 * while consumer_timer_switch_stop() is called. It would result in
73 * deadlocks.
331744e3
JD
74 */
75static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
76 int sig, siginfo_t *si, void *uc)
77{
78 int ret;
79 struct lttng_consumer_channel *channel;
80
81 channel = si->si_value.sival_ptr;
82 assert(channel);
83
4419b4fb
MD
84 if (channel->switch_timer_error) {
85 return;
86 }
87
331744e3
JD
88 DBG("Switch timer for channel %" PRIu64, channel->key);
89 switch (ctx->type) {
90 case LTTNG_CONSUMER32_UST:
91 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
92 /*
93 * Locks taken by lttng_ustconsumer_request_metadata():
94 * - metadata_socket_lock
95 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 96 * - channel->metadata_cache->lock
4fa3dc0e 97 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
98 * - channel->timer_lock
99 * - channel->metadata_cache->lock
4fa3dc0e 100 *
5e41ebe1
MD
101 * Ensure that neither consumer_data.lock nor
102 * channel->lock are taken within this function, since
103 * they are held while consumer_timer_switch_stop() is
104 * called.
4fa3dc0e 105 */
94d49140 106 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 107 if (ret < 0) {
4419b4fb 108 channel->switch_timer_error = 1;
331744e3
JD
109 }
110 break;
111 case LTTNG_CONSUMER_KERNEL:
112 case LTTNG_CONSUMER_UNKNOWN:
113 assert(0);
114 break;
115 }
116}
117
528f2ffa
JD
118static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
119 uint64_t stream_id)
d3e2ba59
JD
120{
121 int ret;
50adc264 122 struct ctf_packet_index index;
d3e2ba59
JD
123
124 memset(&index, 0, sizeof(index));
528f2ffa 125 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
126 index.timestamp_end = htobe64(ts);
127 ret = consumer_stream_write_index(stream, &index);
128 if (ret < 0) {
129 goto error;
130 }
131
132error:
133 return ret;
134}
135
136static int check_kernel_stream(struct lttng_consumer_stream *stream)
137{
528f2ffa 138 uint64_t ts, stream_id;
d3e2ba59
JD
139 int ret;
140
141 /*
142 * While holding the stream mutex, try to take a snapshot, if it
143 * succeeds, it means that data is ready to be sent, just let the data
144 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
145 * means that there is no data to read after the flush, so we can
146 * safely send the empty index.
147 */
148 pthread_mutex_lock(&stream->lock);
149 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
150 if (ret < 0) {
151 ERR("Failed to get the current timestamp");
152 goto error_unlock;
153 }
154 ret = kernctl_buffer_flush(stream->wait_fd);
155 if (ret < 0) {
156 ERR("Failed to flush kernel stream");
157 goto error_unlock;
158 }
159 ret = kernctl_snapshot(stream->wait_fd);
160 if (ret < 0) {
08b1dcd3
DG
161 if (errno != EAGAIN && errno != ENODATA) {
162 PERROR("live timer kernel snapshot");
d3e2ba59
JD
163 ret = -1;
164 goto error_unlock;
165 }
528f2ffa
JD
166 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
167 if (ret < 0) {
168 PERROR("kernctl_get_stream_id");
169 goto error_unlock;
170 }
d3e2ba59 171 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 172 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59
JD
173 if (ret < 0) {
174 goto error_unlock;
175 }
176 }
177 ret = 0;
178
179error_unlock:
180 pthread_mutex_unlock(&stream->lock);
181 return ret;
182}
183
184static int check_ust_stream(struct lttng_consumer_stream *stream)
185{
528f2ffa 186 uint64_t ts, stream_id;
d3e2ba59
JD
187 int ret;
188
189 assert(stream);
190 assert(stream->ustream);
191 /*
192 * While holding the stream mutex, try to take a snapshot, if it
193 * succeeds, it means that data is ready to be sent, just let the data
194 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
195 * means that there is no data to read after the flush, so we can
196 * safely send the empty index.
197 */
198 pthread_mutex_lock(&stream->lock);
94d49140
JD
199 ret = cds_lfht_is_node_deleted(&stream->node.node);
200 if (ret) {
201 goto error_unlock;
202 }
203
84a182ce 204 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
205 if (ret < 0) {
206 ERR("Failed to get the current timestamp");
207 goto error_unlock;
208 }
84a182ce
DG
209 lttng_ustconsumer_flush_buffer(stream, 1);
210 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 211 if (ret < 0) {
94d49140 212 if (ret != -EAGAIN) {
d3e2ba59
JD
213 ERR("Taking UST snapshot");
214 ret = -1;
215 goto error_unlock;
216 }
70190e1c 217 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
218 if (ret < 0) {
219 PERROR("ustctl_get_stream_id");
220 goto error_unlock;
221 }
d3e2ba59 222 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 223 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59
JD
224 if (ret < 0) {
225 goto error_unlock;
226 }
227 }
228 ret = 0;
229
230error_unlock:
231 pthread_mutex_unlock(&stream->lock);
232 return ret;
233}
234
235/*
236 * Execute action on a live timer
237 */
238static void live_timer(struct lttng_consumer_local_data *ctx,
239 int sig, siginfo_t *si, void *uc)
240{
241 int ret;
242 struct lttng_consumer_channel *channel;
243 struct lttng_consumer_stream *stream;
244 struct lttng_ht *ht;
245 struct lttng_ht_iter iter;
246
247 channel = si->si_value.sival_ptr;
248 assert(channel);
249
250 if (channel->switch_timer_error) {
251 goto error;
252 }
253 ht = consumer_data.stream_per_chan_id_ht;
254
255 DBG("Live timer for channel %" PRIu64, channel->key);
256
257 rcu_read_lock();
258 switch (ctx->type) {
259 case LTTNG_CONSUMER32_UST:
260 case LTTNG_CONSUMER64_UST:
261 cds_lfht_for_each_entry_duplicate(ht->ht,
262 ht->hash_fct(&channel->key, lttng_ht_seed),
263 ht->match_fct, &channel->key, &iter.iter,
264 stream, node_channel_id.node) {
265 ret = check_ust_stream(stream);
266 if (ret < 0) {
267 goto error_unlock;
268 }
269 }
270 break;
271 case LTTNG_CONSUMER_KERNEL:
272 cds_lfht_for_each_entry_duplicate(ht->ht,
273 ht->hash_fct(&channel->key, lttng_ht_seed),
274 ht->match_fct, &channel->key, &iter.iter,
275 stream, node_channel_id.node) {
276 ret = check_kernel_stream(stream);
277 if (ret < 0) {
278 goto error_unlock;
279 }
280 }
281 break;
282 case LTTNG_CONSUMER_UNKNOWN:
283 assert(0);
284 break;
285 }
286
287error_unlock:
288 rcu_read_unlock();
289
290error:
291 return;
292}
293
2b8f8754
MD
294static
295void consumer_timer_signal_thread_qs(unsigned int signr)
296{
297 sigset_t pending_set;
298 int ret;
299
300 /*
301 * We need to be the only thread interacting with the thread
302 * that manages signals for teardown synchronization.
303 */
304 pthread_mutex_lock(&timer_signal.lock);
305
306 /* Ensure we don't have any signal queued for this channel. */
307 for (;;) {
308 ret = sigemptyset(&pending_set);
309 if (ret == -1) {
310 PERROR("sigemptyset");
311 }
312 ret = sigpending(&pending_set);
313 if (ret == -1) {
314 PERROR("sigpending");
315 }
316 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
317 break;
318 }
319 caa_cpu_relax();
320 }
321
322 /*
323 * From this point, no new signal handler will be fired that would try to
324 * access "chan". However, we still need to wait for any currently
325 * executing handler to complete.
326 */
327 cmm_smp_mb();
328 CMM_STORE_SHARED(timer_signal.qs_done, 0);
329 cmm_smp_mb();
330
331 /*
332 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
333 * up.
334 */
335 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
336
337 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
338 caa_cpu_relax();
339 }
340 cmm_smp_mb();
341
342 pthread_mutex_unlock(&timer_signal.lock);
343}
344
331744e3
JD
345/*
346 * Set the timer for periodical metadata flush.
347 */
348void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
349 unsigned int switch_timer_interval)
350{
351 int ret;
352 struct sigevent sev;
353 struct itimerspec its;
354
355 assert(channel);
356 assert(channel->key);
357
358 if (switch_timer_interval == 0) {
359 return;
360 }
361
362 sev.sigev_notify = SIGEV_SIGNAL;
363 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
364 sev.sigev_value.sival_ptr = channel;
365 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
366 if (ret == -1) {
367 PERROR("timer_create");
368 }
369 channel->switch_timer_enabled = 1;
370
371 its.it_value.tv_sec = switch_timer_interval / 1000000;
372 its.it_value.tv_nsec = switch_timer_interval % 1000000;
373 its.it_interval.tv_sec = its.it_value.tv_sec;
374 its.it_interval.tv_nsec = its.it_value.tv_nsec;
375
376 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
377 if (ret == -1) {
378 PERROR("timer_settime");
379 }
380}
381
382/*
383 * Stop and delete timer.
384 */
385void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
386{
387 int ret;
331744e3
JD
388
389 assert(channel);
390
391 ret = timer_delete(channel->switch_timer);
392 if (ret == -1) {
393 PERROR("timer_delete");
394 }
395
2b8f8754 396 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 397
2b8f8754
MD
398 channel->switch_timer = 0;
399 channel->switch_timer_enabled = 0;
331744e3
JD
400}
401
d3e2ba59
JD
402/*
403 * Set the timer for the live mode.
404 */
405void consumer_timer_live_start(struct lttng_consumer_channel *channel,
406 int live_timer_interval)
407{
408 int ret;
409 struct sigevent sev;
410 struct itimerspec its;
411
412 assert(channel);
413 assert(channel->key);
414
fac41e72 415 if (live_timer_interval <= 0) {
d3e2ba59
JD
416 return;
417 }
418
419 sev.sigev_notify = SIGEV_SIGNAL;
420 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
421 sev.sigev_value.sival_ptr = channel;
422 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
423 if (ret == -1) {
424 PERROR("timer_create");
425 }
426 channel->live_timer_enabled = 1;
427
428 its.it_value.tv_sec = live_timer_interval / 1000000;
429 its.it_value.tv_nsec = live_timer_interval % 1000000;
430 its.it_interval.tv_sec = its.it_value.tv_sec;
431 its.it_interval.tv_nsec = its.it_value.tv_nsec;
432
433 ret = timer_settime(channel->live_timer, 0, &its, NULL);
434 if (ret == -1) {
435 PERROR("timer_settime");
436 }
437}
438
439/*
440 * Stop and delete timer.
441 */
442void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
443{
444 int ret;
445
446 assert(channel);
447
448 ret = timer_delete(channel->live_timer);
449 if (ret == -1) {
450 PERROR("timer_delete");
451 }
452
453 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
454
455 channel->live_timer = 0;
456 channel->live_timer_enabled = 0;
457}
458
331744e3
JD
459/*
460 * Block the RT signals for the entire process. It must be called from the
461 * consumer main before creating the threads
462 */
73664f81 463int consumer_signal_init(void)
331744e3
JD
464{
465 int ret;
466 sigset_t mask;
467
468 /* Block signal for entire process, so only our thread processes it. */
469 setmask(&mask);
470 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
471 if (ret) {
472 errno = ret;
473 PERROR("pthread_sigmask");
73664f81 474 return -1;
331744e3 475 }
73664f81 476 return 0;
331744e3
JD
477}
478
479/*
d3e2ba59
JD
480 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
481 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 482 */
d3e2ba59 483void *consumer_timer_thread(void *data)
331744e3
JD
484{
485 int signr;
486 sigset_t mask;
487 siginfo_t info;
488 struct lttng_consumer_local_data *ctx = data;
489
1fc79fb4
MD
490 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
491
2d57de81
MD
492 if (testpoint(consumerd_thread_metadata_timer)) {
493 goto error_testpoint;
494 }
495
9ce5646a
MD
496 health_code_update();
497
331744e3
JD
498 /* Only self thread will receive signal mask. */
499 setmask(&mask);
500 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
501
502 while (1) {
9ce5646a
MD
503 health_code_update();
504
505 health_poll_entry();
331744e3 506 signr = sigwaitinfo(&mask, &info);
9ce5646a 507 health_poll_exit();
331744e3
JD
508 if (signr == -1) {
509 if (errno != EINTR) {
510 PERROR("sigwaitinfo");
511 }
512 continue;
513 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
514 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
515 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
516 cmm_smp_mb();
517 CMM_STORE_SHARED(timer_signal.qs_done, 1);
518 cmm_smp_mb();
519 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
520 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
521 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
522 } else {
523 ERR("Unexpected signal %d\n", info.si_signo);
524 }
525 }
526
2d57de81
MD
527error_testpoint:
528 /* Only reached in testpoint error */
529 health_error();
1fc79fb4
MD
530 health_unregister(health_consumerd);
531
532 /* Never return */
331744e3
JD
533 return NULL;
534}
This page took 0.051802 seconds and 4 git commands to generate.