+/*
+ * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#define _LGPL_SOURCE
+#include <inttypes.h>
+#include <signal.h>
+
+#include <bin/lttng-consumerd/health-consumerd.h>
+#include <common/common.h>
+#include <common/compat/endian.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/consumer/consumer-stream.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-testpoint.h>
+#include <common/ust-consumer/ust-consumer.h>
+
+typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
+ unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
+ unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
+
+static struct timer_signal_data timer_signal = {
+ .tid = 0,
+ .setup_done = 0,
+ .qs_done = 0,
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+
+/*
+ * Set custom signal mask to current thread.
+ */
+static void setmask(sigset_t *mask)
+{
+ int ret;
+
+ ret = sigemptyset(mask);
+ if (ret) {
+ PERROR("sigemptyset");
+ }
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
+ if (ret) {
+ PERROR("sigaddset switch");
+ }
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
+ if (ret) {
+ PERROR("sigaddset teardown");
+ }
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
+ if (ret) {
+ PERROR("sigaddset live");
+ }
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
+ if (ret) {
+ PERROR("sigaddset monitor");
+ }
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+ if (ret) {
+ PERROR("sigaddset exit");
+ }
+}
+
+static int the_channel_monitor_pipe = -1;
+
+/*
+ * Execute action on a timer switch.
+ *
+ * Beware: metadata_switch_timer() should *never* take a mutex also held
+ * while consumer_timer_switch_stop() is called. It would result in
+ * deadlocks.
+ */
+static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
+ siginfo_t *si)
+{
+ int ret;
+ struct lttng_consumer_channel *channel;
+
+ channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
+ LTTNG_ASSERT(channel);
+
+ if (channel->switch_timer_error) {
+ return;
+ }
+
+ DBG("Switch timer for channel %" PRIu64, channel->key);
+ switch (ctx->type) {
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Locks taken by lttng_ustconsumer_request_metadata():
+ * - metadata_socket_lock
+ * - Calling lttng_ustconsumer_recv_metadata():
+ * - channel->metadata_cache->lock
+ * - Calling consumer_metadata_cache_flushed():
+ * - channel->timer_lock
+ * - channel->metadata_cache->lock
+ *
+ * Ensure that neither consumer_data.lock nor
+ * channel->lock are taken within this function, since
+ * they are held while consumer_timer_switch_stop() is
+ * called.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
+ if (ret < 0) {
+ channel->switch_timer_error = 1;
+ }
+ break;
+ case LTTNG_CONSUMER_KERNEL:
+ case LTTNG_CONSUMER_UNKNOWN:
+ abort();
+ break;
+ }
+}
+
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
+ uint64_t stream_id)
+{
+ int ret;
+ struct ctf_packet_index index;
+
+ memset(&index, 0, sizeof(index));
+ index.stream_id = htobe64(stream_id);
+ index.timestamp_end = htobe64(ts);
+ ret = consumer_stream_write_index(stream, &index);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
+ ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
+ if (ret < 0) {
+ ERR("Failed to get the current timestamp");
+ goto end;
+ }
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ ret = kernctl_snapshot(stream->wait_fd);
+ if (ret < 0) {
+ if (ret != -EAGAIN && ret != -ENODATA) {
+ PERROR("live timer kernel snapshot");
+ ret = -1;
+ goto end;
+ }
+ ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
+ if (ret < 0) {
+ PERROR("kernctl_get_stream_id");
+ goto end;
+ }
+ DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
+ ret = send_empty_index(stream, ts, stream_id);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+ ret = 0;
+end:
+ return ret;
+}
+
+static int check_stream(struct lttng_consumer_stream *stream,
+ flush_index_cb flush_index)
+{
+ int ret;
+
+ /*
+ * While holding the stream mutex, try to take a snapshot, if it
+ * succeeds, it means that data is ready to be sent, just let the data
+ * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+ * means that there is no data to read after the flush, so we can
+ * safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
+ */
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = flush_index(stream);
+ pthread_mutex_unlock(&stream->lock);
+end:
+ return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (ret) {
+ goto end;
+ }
+
+ ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
+ if (ret < 0) {
+ ERR("Failed to get the current timestamp");
+ goto end;
+ }
+ ret = lttng_ustconsumer_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush buffer while flushing index");
+ goto end;
+ }
+ ret = lttng_ustconsumer_take_snapshot(stream);
+ if (ret < 0) {
+ if (ret != -EAGAIN) {
+ ERR("Taking UST snapshot");
+ ret = -1;
+ goto end;
+ }
+ ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
+ if (ret < 0) {
+ PERROR("lttng_ust_ctl_get_stream_id");
+ goto end;
+ }
+ DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
+ ret = send_empty_index(stream, ts, stream_id);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+ ret = 0;
+end:
+ return ret;
+}
+
+/*
+ * Execute action on a live timer
+ */
+static void live_timer(struct lttng_consumer_local_data *ctx,
+ siginfo_t *si)
+{
+ int ret;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const flush_index_cb flush_index =
+ ctx->type == LTTNG_CONSUMER_KERNEL ?
+ consumer_flush_kernel_index :
+ consumer_flush_ust_index;
+
+ channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
+ LTTNG_ASSERT(channel);
+
+ if (channel->switch_timer_error) {
+ goto error;
+ }
+
+ DBG("Live timer for channel %" PRIu64, channel->key);
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ ret = check_stream(stream, flush_index);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
+
+error_unlock:
+ rcu_read_unlock();
+
+error:
+ return;
+}
+
+static
+void consumer_timer_signal_thread_qs(unsigned int signr)
+{
+ sigset_t pending_set;
+ int ret;
+
+ /*
+ * We need to be the only thread interacting with the thread
+ * that manages signals for teardown synchronization.
+ */
+ pthread_mutex_lock(&timer_signal.lock);
+
+ /* Ensure we don't have any signal queued for this channel. */
+ for (;;) {
+ ret = sigemptyset(&pending_set);
+ if (ret == -1) {
+ PERROR("sigemptyset");
+ }
+ ret = sigpending(&pending_set);
+ if (ret == -1) {
+ PERROR("sigpending");
+ }
+ if (!sigismember(&pending_set, signr)) {
+ break;
+ }
+ caa_cpu_relax();
+ }
+
+ /*
+ * From this point, no new signal handler will be fired that would try to
+ * access "chan". However, we still need to wait for any currently
+ * executing handler to complete.
+ */
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 0);
+ cmm_smp_mb();
+
+ /*
+ * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
+ * up.
+ */
+ kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
+
+ while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
+ caa_cpu_relax();
+ }
+ cmm_smp_mb();
+
+ pthread_mutex_unlock(&timer_signal.lock);
+}
+
+/*
+ * Start a timer channel timer which will fire at a given interval
+ * (timer_interval_us)and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
+ */
+static
+int consumer_channel_timer_start(timer_t *timer_id,
+ struct lttng_consumer_channel *channel,
+ unsigned int timer_interval_us, int signal)
+{
+ int ret = 0, delete_ret;
+ struct sigevent sev = {};
+ struct itimerspec its;
+
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
+
+ if (timer_interval_us == 0) {
+ /* No creation needed; not an error. */
+ ret = 1;
+ goto end;
+ }
+
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = signal;
+ sev.sigev_value.sival_ptr = channel;
+ ret = timer_create(CLOCKID, &sev, timer_id);
+ if (ret == -1) {
+ PERROR("timer_create");
+ goto end;
+ }
+
+ its.it_value.tv_sec = timer_interval_us / 1000000;
+ its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
+ its.it_interval.tv_sec = its.it_value.tv_sec;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ ret = timer_settime(*timer_id, 0, &its, NULL);
+ if (ret == -1) {
+ PERROR("timer_settime");
+ goto error_destroy_timer;
+ }
+end:
+ return ret;
+error_destroy_timer:
+ delete_ret = timer_delete(*timer_id);
+ if (delete_ret == -1) {
+ PERROR("timer_delete");
+ }
+ goto end;
+}
+
+static
+int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+{
+ int ret = 0;
+
+ ret = timer_delete(*timer_id);
+ if (ret == -1) {
+ PERROR("timer_delete");
+ goto end;
+ }
+
+ consumer_timer_signal_thread_qs(signal);
+ *timer_id = 0;
+end:
+ return ret;
+}
+
+/*
+ * Set the channel's switch timer.
+ */
+void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
+ unsigned int switch_timer_interval_us)
+{
+ int ret;
+
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
+
+ ret = consumer_channel_timer_start(&channel->switch_timer, channel,
+ switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+
+ channel->switch_timer_enabled = !!(ret == 0);
+}
+
+/*
+ * Stop and delete the channel's switch timer.
+ */
+void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
+{
+ int ret;
+
+ LTTNG_ASSERT(channel);
+
+ ret = consumer_channel_timer_stop(&channel->switch_timer,
+ LTTNG_CONSUMER_SIG_SWITCH);
+ if (ret == -1) {
+ ERR("Failed to stop switch timer");
+ }
+
+ channel->switch_timer_enabled = 0;
+}
+
+/*
+ * Set the channel's live timer.
+ */
+void consumer_timer_live_start(struct lttng_consumer_channel *channel,
+ unsigned int live_timer_interval_us)
+{
+ int ret;
+
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
+
+ ret = consumer_channel_timer_start(&channel->live_timer, channel,
+ live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
+
+ channel->live_timer_enabled = !!(ret == 0);
+}
+
+/*
+ * Stop and delete the channel's live timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+ int ret;
+
+ LTTNG_ASSERT(channel);
+
+ ret = consumer_channel_timer_stop(&channel->live_timer,
+ LTTNG_CONSUMER_SIG_LIVE);
+ if (ret == -1) {
+ ERR("Failed to stop live timer");
+ }
+
+ channel->live_timer_enabled = 0;
+}
+
+/*
+ * Set the channel's monitoring timer.
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
+ */
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+ unsigned int monitor_timer_interval_us)
+{
+ int ret;
+
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
+ LTTNG_ASSERT(!channel->monitor_timer_enabled);
+
+ ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
+ monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+ channel->monitor_timer_enabled = !!(ret == 0);
+ return ret;
+}
+
+/*
+ * Stop and delete the channel's monitoring timer.
+ */
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
+{
+ int ret;
+
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->monitor_timer_enabled);
+
+ ret = consumer_channel_timer_stop(&channel->monitor_timer,
+ LTTNG_CONSUMER_SIG_MONITOR);
+ if (ret == -1) {
+ ERR("Failed to stop live timer");
+ goto end;
+ }
+
+ channel->monitor_timer_enabled = 0;
+end:
+ return ret;
+}
+
+/*
+ * Block the RT signals for the entire process. It must be called from the
+ * consumer main before creating the threads
+ */
+int consumer_signal_init(void)
+{
+ int ret;
+ sigset_t mask;
+
+ /* Block signal for entire process, so only our thread processes it. */
+ setmask(&mask);
+ ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_sigmask");
+ return -1;
+ }
+ return 0;
+}
+
+static
+int sample_channel_positions(struct lttng_consumer_channel *channel,
+ uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
+ sample_positions_cb sample, get_consumed_cb get_consumed,
+ get_produced_cb get_produced)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool empty_channel = true;
+ uint64_t high = 0, low = UINT64_MAX;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+
+ *_total_consumed = 0;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key,
+ &iter.iter, stream, node_channel_id.node) {
+ unsigned long produced, consumed, usage;
+
+ empty_channel = false;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ ret = sample(stream);
+ if (ret) {
+ ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_consumed(stream, &consumed);
+ if (ret) {
+ ERR("Failed to get buffer consumed position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_produced(stream, &produced);
+ if (ret) {
+ ERR("Failed to get buffer produced position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+
+ usage = produced - consumed;
+ high = (usage > high) ? usage : high;
+ low = (usage < low) ? usage : low;
+
+ /*
+ * We don't use consumed here for 2 reasons:
+ * - output_written takes into account the padding written in the
+ * tracefiles when we stop the session;
+ * - the consumed position is not the accurate representation of what
+ * was extracted from a buffer in overwrite mode.
+ */
+ *_total_consumed += stream->output_written;
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ *_highest_use = high;
+ *_lowest_use = low;
+end:
+ rcu_read_unlock();
+ if (empty_channel) {
+ ret = -1;
+ }
+ return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ int channel_monitor_pipe =
+ consumer_timer_thread_get_channel_monitor_pipe();
+ struct lttcomm_consumer_channel_monitor_msg msg = {
+ .key = channel->key,
+ };
+ sample_positions_cb sample;
+ get_consumed_cb get_consumed;
+ get_produced_cb get_produced;
+ uint64_t lowest = 0, highest = 0, total_consumed = 0;
+
+ LTTNG_ASSERT(channel);
+
+ if (channel_monitor_pipe < 0) {
+ return;
+ }
+
+ switch (the_consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ sample = lttng_kconsumer_sample_snapshot_positions;
+ get_consumed = lttng_kconsumer_get_consumed_snapshot;
+ get_produced = lttng_kconsumer_get_produced_snapshot;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ sample = lttng_ustconsumer_sample_snapshot_positions;
+ get_consumed = lttng_ustconsumer_get_consumed_snapshot;
+ get_produced = lttng_ustconsumer_get_produced_snapshot;
+ break;
+ default:
+ abort();
+ }
+
+ ret = sample_channel_positions(channel, &highest, &lowest,
+ &total_consumed, sample, get_consumed, get_produced);
+ if (ret) {
+ return;
+ }
+ msg.highest = highest;
+ msg.lowest = lowest;
+ msg.total_consumed = total_consumed;
+
+ /*
+ * Writes performed here are assumed to be atomic which is only
+ * guaranteed for sizes < than PIPE_BUF.
+ */
+ LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF);
+
+ do {
+ ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ if (errno == EAGAIN) {
+ /* Not an error, the sample is merely dropped. */
+ DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
+ channel->key);
+ } else {
+ PERROR("write to the channel monitor pipe");
+ }
+ } else {
+ DBG("Sent channel monitoring sample for channel key %" PRIu64
+ ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
+ channel->key, msg.highest, msg.lowest);
+ }
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+ return uatomic_read(&the_channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+ int ret;
+
+ ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
+ if (ret != -1) {
+ ret = -1;
+ goto end;
+ }
+ ret = 0;
+end:
+ return ret;
+}
+
+/*
+ * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
+ */
+void *consumer_timer_thread(void *data)
+{
+ int signr;
+ sigset_t mask;
+ siginfo_t info;
+ struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
+
+ rcu_register_thread();
+
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
+
+ if (testpoint(consumerd_thread_metadata_timer)) {
+ goto error_testpoint;
+ }
+
+ health_code_update();
+
+ /* Only self thread will receive signal mask. */
+ setmask(&mask);
+ CMM_STORE_SHARED(timer_signal.tid, pthread_self());
+
+ while (1) {
+ health_code_update();
+
+ health_poll_entry();
+ signr = sigwaitinfo(&mask, &info);
+ health_poll_exit();
+
+ /*
+ * NOTE: cascading conditions are used instead of a switch case
+ * since the use of SIGRTMIN in the definition of the signals'
+ * values prevents the reduction to an integer constant.
+ */
+ if (signr == -1) {
+ if (errno != EINTR) {
+ PERROR("sigwaitinfo");
+ }
+ continue;
+ } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
+ metadata_switch_timer(ctx, &info);
+ } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 1);
+ cmm_smp_mb();
+ DBG("Signal timer metadata thread teardown");
+ } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
+ live_timer(ctx, &info);
+ } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
+ struct lttng_consumer_channel *channel;
+
+ channel = (lttng_consumer_channel *) info.si_value.sival_ptr;
+ monitor_timer(channel);
+ } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+ LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
+ goto end;
+ } else {
+ ERR("Unexpected signal %d\n", info.si_signo);
+ }
+ }
+
+error_testpoint:
+ /* Only reached in testpoint error */
+ health_error();
+end:
+ health_unregister(health_consumerd);
+ rcu_unregister_thread();
+ return NULL;
+}