relayd: add health instrumentation to threads
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
24#include <common/common.h>
d3e2ba59
JD
25#include <common/kernel-ctl/kernel-ctl.h>
26#include <common/kernel-consumer/kernel-consumer.h>
27#include <common/consumer-stream.h>
331744e3
JD
28
29#include "consumer-timer.h"
30#include "ust-consumer/ust-consumer.h"
31
2b8f8754
MD
32static struct timer_signal_data timer_signal = {
33 .tid = 0,
34 .setup_done = 0,
35 .qs_done = 0,
36 .lock = PTHREAD_MUTEX_INITIALIZER,
37};
331744e3
JD
38
39/*
40 * Set custom signal mask to current thread.
41 */
42static void setmask(sigset_t *mask)
43{
44 int ret;
45
46 ret = sigemptyset(mask);
47 if (ret) {
48 PERROR("sigemptyset");
49 }
50 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
51 if (ret) {
d3e2ba59 52 PERROR("sigaddset switch");
331744e3
JD
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
55 if (ret) {
d3e2ba59
JD
56 PERROR("sigaddset teardown");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
59 if (ret) {
60 PERROR("sigaddset live");
331744e3
JD
61 }
62}
63
64/*
65 * Execute action on a timer switch.
d98a47c7
MD
66 *
67 * Beware: metadata_switch_timer() should *never* take a mutex also held
68 * while consumer_timer_switch_stop() is called. It would result in
69 * deadlocks.
331744e3
JD
70 */
71static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
72 int sig, siginfo_t *si, void *uc)
73{
74 int ret;
75 struct lttng_consumer_channel *channel;
76
77 channel = si->si_value.sival_ptr;
78 assert(channel);
79
4419b4fb
MD
80 if (channel->switch_timer_error) {
81 return;
82 }
83
331744e3
JD
84 DBG("Switch timer for channel %" PRIu64, channel->key);
85 switch (ctx->type) {
86 case LTTNG_CONSUMER32_UST:
87 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
88 /*
89 * Locks taken by lttng_ustconsumer_request_metadata():
90 * - metadata_socket_lock
91 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 92 * - channel->metadata_cache->lock
4fa3dc0e 93 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
94 * - channel->timer_lock
95 * - channel->metadata_cache->lock
4fa3dc0e 96 *
5e41ebe1
MD
97 * Ensure that neither consumer_data.lock nor
98 * channel->lock are taken within this function, since
99 * they are held while consumer_timer_switch_stop() is
100 * called.
4fa3dc0e 101 */
94d49140 102 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 103 if (ret < 0) {
4419b4fb 104 channel->switch_timer_error = 1;
331744e3
JD
105 }
106 break;
107 case LTTNG_CONSUMER_KERNEL:
108 case LTTNG_CONSUMER_UNKNOWN:
109 assert(0);
110 break;
111 }
112}
113
d3e2ba59
JD
114static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
115{
116 int ret;
117 struct lttng_packet_index index;
118
119 memset(&index, 0, sizeof(index));
120 index.timestamp_end = htobe64(ts);
121 ret = consumer_stream_write_index(stream, &index);
122 if (ret < 0) {
123 goto error;
124 }
125
126error:
127 return ret;
128}
129
130static int check_kernel_stream(struct lttng_consumer_stream *stream)
131{
132 uint64_t ts;
133 int ret;
134
135 /*
136 * While holding the stream mutex, try to take a snapshot, if it
137 * succeeds, it means that data is ready to be sent, just let the data
138 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
139 * means that there is no data to read after the flush, so we can
140 * safely send the empty index.
141 */
142 pthread_mutex_lock(&stream->lock);
143 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
144 if (ret < 0) {
145 ERR("Failed to get the current timestamp");
146 goto error_unlock;
147 }
148 ret = kernctl_buffer_flush(stream->wait_fd);
149 if (ret < 0) {
150 ERR("Failed to flush kernel stream");
151 goto error_unlock;
152 }
153 ret = kernctl_snapshot(stream->wait_fd);
154 if (ret < 0) {
155 if (errno != EAGAIN) {
156 ERR("Taking kernel snapshot");
157 ret = -1;
158 goto error_unlock;
159 }
160 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
161 ret = send_empty_index(stream, ts);
162 if (ret < 0) {
163 goto error_unlock;
164 }
165 }
166 ret = 0;
167
168error_unlock:
169 pthread_mutex_unlock(&stream->lock);
170 return ret;
171}
172
173static int check_ust_stream(struct lttng_consumer_stream *stream)
174{
175 uint64_t ts;
176 int ret;
177
178 assert(stream);
179 assert(stream->ustream);
180 /*
181 * While holding the stream mutex, try to take a snapshot, if it
182 * succeeds, it means that data is ready to be sent, just let the data
183 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
184 * means that there is no data to read after the flush, so we can
185 * safely send the empty index.
186 */
187 pthread_mutex_lock(&stream->lock);
94d49140
JD
188 ret = cds_lfht_is_node_deleted(&stream->node.node);
189 if (ret) {
190 goto error_unlock;
191 }
192
84a182ce 193 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
194 if (ret < 0) {
195 ERR("Failed to get the current timestamp");
196 goto error_unlock;
197 }
84a182ce
DG
198 lttng_ustconsumer_flush_buffer(stream, 1);
199 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 200 if (ret < 0) {
94d49140 201 if (ret != -EAGAIN) {
d3e2ba59
JD
202 ERR("Taking UST snapshot");
203 ret = -1;
204 goto error_unlock;
205 }
206 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
207 ret = send_empty_index(stream, ts);
208 if (ret < 0) {
209 goto error_unlock;
210 }
211 }
212 ret = 0;
213
214error_unlock:
215 pthread_mutex_unlock(&stream->lock);
216 return ret;
217}
218
219/*
220 * Execute action on a live timer
221 */
222static void live_timer(struct lttng_consumer_local_data *ctx,
223 int sig, siginfo_t *si, void *uc)
224{
225 int ret;
226 struct lttng_consumer_channel *channel;
227 struct lttng_consumer_stream *stream;
228 struct lttng_ht *ht;
229 struct lttng_ht_iter iter;
230
231 channel = si->si_value.sival_ptr;
232 assert(channel);
233
234 if (channel->switch_timer_error) {
235 goto error;
236 }
237 ht = consumer_data.stream_per_chan_id_ht;
238
239 DBG("Live timer for channel %" PRIu64, channel->key);
240
241 rcu_read_lock();
242 switch (ctx->type) {
243 case LTTNG_CONSUMER32_UST:
244 case LTTNG_CONSUMER64_UST:
245 cds_lfht_for_each_entry_duplicate(ht->ht,
246 ht->hash_fct(&channel->key, lttng_ht_seed),
247 ht->match_fct, &channel->key, &iter.iter,
248 stream, node_channel_id.node) {
249 ret = check_ust_stream(stream);
250 if (ret < 0) {
251 goto error_unlock;
252 }
253 }
254 break;
255 case LTTNG_CONSUMER_KERNEL:
256 cds_lfht_for_each_entry_duplicate(ht->ht,
257 ht->hash_fct(&channel->key, lttng_ht_seed),
258 ht->match_fct, &channel->key, &iter.iter,
259 stream, node_channel_id.node) {
260 ret = check_kernel_stream(stream);
261 if (ret < 0) {
262 goto error_unlock;
263 }
264 }
265 break;
266 case LTTNG_CONSUMER_UNKNOWN:
267 assert(0);
268 break;
269 }
270
271error_unlock:
272 rcu_read_unlock();
273
274error:
275 return;
276}
277
2b8f8754
MD
278static
279void consumer_timer_signal_thread_qs(unsigned int signr)
280{
281 sigset_t pending_set;
282 int ret;
283
284 /*
285 * We need to be the only thread interacting with the thread
286 * that manages signals for teardown synchronization.
287 */
288 pthread_mutex_lock(&timer_signal.lock);
289
290 /* Ensure we don't have any signal queued for this channel. */
291 for (;;) {
292 ret = sigemptyset(&pending_set);
293 if (ret == -1) {
294 PERROR("sigemptyset");
295 }
296 ret = sigpending(&pending_set);
297 if (ret == -1) {
298 PERROR("sigpending");
299 }
300 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
301 break;
302 }
303 caa_cpu_relax();
304 }
305
306 /*
307 * From this point, no new signal handler will be fired that would try to
308 * access "chan". However, we still need to wait for any currently
309 * executing handler to complete.
310 */
311 cmm_smp_mb();
312 CMM_STORE_SHARED(timer_signal.qs_done, 0);
313 cmm_smp_mb();
314
315 /*
316 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
317 * up.
318 */
319 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
320
321 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
322 caa_cpu_relax();
323 }
324 cmm_smp_mb();
325
326 pthread_mutex_unlock(&timer_signal.lock);
327}
328
331744e3
JD
329/*
330 * Set the timer for periodical metadata flush.
331 */
332void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
333 unsigned int switch_timer_interval)
334{
335 int ret;
336 struct sigevent sev;
337 struct itimerspec its;
338
339 assert(channel);
340 assert(channel->key);
341
342 if (switch_timer_interval == 0) {
343 return;
344 }
345
346 sev.sigev_notify = SIGEV_SIGNAL;
347 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
348 sev.sigev_value.sival_ptr = channel;
349 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
350 if (ret == -1) {
351 PERROR("timer_create");
352 }
353 channel->switch_timer_enabled = 1;
354
355 its.it_value.tv_sec = switch_timer_interval / 1000000;
356 its.it_value.tv_nsec = switch_timer_interval % 1000000;
357 its.it_interval.tv_sec = its.it_value.tv_sec;
358 its.it_interval.tv_nsec = its.it_value.tv_nsec;
359
360 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
361 if (ret == -1) {
362 PERROR("timer_settime");
363 }
364}
365
366/*
367 * Stop and delete timer.
368 */
369void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
370{
371 int ret;
331744e3
JD
372
373 assert(channel);
374
375 ret = timer_delete(channel->switch_timer);
376 if (ret == -1) {
377 PERROR("timer_delete");
378 }
379
2b8f8754 380 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 381
2b8f8754
MD
382 channel->switch_timer = 0;
383 channel->switch_timer_enabled = 0;
331744e3
JD
384}
385
d3e2ba59
JD
386/*
387 * Set the timer for the live mode.
388 */
389void consumer_timer_live_start(struct lttng_consumer_channel *channel,
390 int live_timer_interval)
391{
392 int ret;
393 struct sigevent sev;
394 struct itimerspec its;
395
396 assert(channel);
397 assert(channel->key);
398
399 if (live_timer_interval == 0) {
400 return;
401 }
402
403 sev.sigev_notify = SIGEV_SIGNAL;
404 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
405 sev.sigev_value.sival_ptr = channel;
406 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
407 if (ret == -1) {
408 PERROR("timer_create");
409 }
410 channel->live_timer_enabled = 1;
411
412 its.it_value.tv_sec = live_timer_interval / 1000000;
413 its.it_value.tv_nsec = live_timer_interval % 1000000;
414 its.it_interval.tv_sec = its.it_value.tv_sec;
415 its.it_interval.tv_nsec = its.it_value.tv_nsec;
416
417 ret = timer_settime(channel->live_timer, 0, &its, NULL);
418 if (ret == -1) {
419 PERROR("timer_settime");
420 }
421}
422
423/*
424 * Stop and delete timer.
425 */
426void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
427{
428 int ret;
429
430 assert(channel);
431
432 ret = timer_delete(channel->live_timer);
433 if (ret == -1) {
434 PERROR("timer_delete");
435 }
436
437 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
438
439 channel->live_timer = 0;
440 channel->live_timer_enabled = 0;
441}
442
331744e3
JD
443/*
444 * Block the RT signals for the entire process. It must be called from the
445 * consumer main before creating the threads
446 */
447void consumer_signal_init(void)
448{
449 int ret;
450 sigset_t mask;
451
452 /* Block signal for entire process, so only our thread processes it. */
453 setmask(&mask);
454 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
455 if (ret) {
456 errno = ret;
457 PERROR("pthread_sigmask");
458 }
459}
460
461/*
d3e2ba59
JD
462 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
463 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 464 */
d3e2ba59 465void *consumer_timer_thread(void *data)
331744e3
JD
466{
467 int signr;
468 sigset_t mask;
469 siginfo_t info;
470 struct lttng_consumer_local_data *ctx = data;
471
472 /* Only self thread will receive signal mask. */
473 setmask(&mask);
474 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
475
476 while (1) {
477 signr = sigwaitinfo(&mask, &info);
478 if (signr == -1) {
479 if (errno != EINTR) {
480 PERROR("sigwaitinfo");
481 }
482 continue;
483 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
484 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
485 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
486 cmm_smp_mb();
487 CMM_STORE_SHARED(timer_signal.qs_done, 1);
488 cmm_smp_mb();
489 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
490 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
491 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
492 } else {
493 ERR("Unexpected signal %d\n", info.si_signo);
494 }
495 }
496
497 return NULL;
498}
This page took 0.044125 seconds and 4 git commands to generate.