Backported to glibc 2.8
[lttng-tools.git] / src / common / consumer-timer.c
index 641478697904e516df1feb7e5c214b814fd38fb0..c659bf63333a0de56fc568f99c167fb55e12e24c 100644 (file)
 #include <inttypes.h>
 #include <signal.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
+#include <common/compat/endian.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/consumer-stream.h>
 
 #include "consumer-timer.h"
+#include "consumer-testpoint.h"
 #include "ust-consumer/ust-consumer.h"
 
-static struct timer_signal_data timer_signal;
+static struct timer_signal_data timer_signal = {
+       .tid = 0,
+       .setup_done = 0,
+       .qs_done = 0,
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
 
 /*
  * Set custom signal mask to current thread.
@@ -41,16 +52,24 @@ static void setmask(sigset_t *mask)
        }
        ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
        if (ret) {
-               PERROR("sigaddset");
+               PERROR("sigaddset switch");
        }
        ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
        if (ret) {
-               PERROR("sigaddset");
+               PERROR("sigaddset teardown");
+       }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
+       if (ret) {
+               PERROR("sigaddset live");
        }
 }
 
 /*
  * Execute action on a timer switch.
+ *
+ * Beware: metadata_switch_timer() should *never* take a mutex also held
+ * while consumer_timer_switch_stop() is called. It would result in
+ * deadlocks.
  */
 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
                int sig, siginfo_t *si, void *uc)
@@ -69,7 +88,21 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
        switch (ctx->type) {
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustconsumer_request_metadata(ctx, channel);
+               /*
+                * Locks taken by lttng_ustconsumer_request_metadata():
+                * - metadata_socket_lock
+                *   - Calling lttng_ustconsumer_recv_metadata():
+                *     - channel->metadata_cache->lock
+                *     - Calling consumer_metadata_cache_flushed():
+                *       - channel->timer_lock
+                *         - channel->metadata_cache->lock
+                *
+                * Ensure that neither consumer_data.lock nor
+                * channel->lock are taken within this function, since
+                * they are held while consumer_timer_switch_stop() is
+                * called.
+                */
+               ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
                if (ret < 0) {
                        channel->switch_timer_error = 1;
                }
@@ -81,10 +114,223 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
        }
 }
 
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
+{
+       int ret;
+       struct ctf_packet_index index;
+
+       memset(&index, 0, sizeof(index));
+       index.timestamp_end = htobe64(ts);
+       ret = consumer_stream_write_index(stream, &index);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       return ret;
+}
+
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
+{
+       uint64_t ts;
+       int ret;
+
+       /*
+        * While holding the stream mutex, try to take a snapshot, if it
+        * succeeds, it means that data is ready to be sent, just let the data
+        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+        * means that there is no data to read after the flush, so we can
+        * safely send the empty index.
+        */
+       pthread_mutex_lock(&stream->lock);
+       ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
+       if (ret < 0) {
+               ERR("Failed to get the current timestamp");
+               goto error_unlock;
+       }
+       ret = kernctl_buffer_flush(stream->wait_fd);
+       if (ret < 0) {
+               ERR("Failed to flush kernel stream");
+               goto error_unlock;
+       }
+       ret = kernctl_snapshot(stream->wait_fd);
+       if (ret < 0) {
+               if (errno != EAGAIN && errno != ENODATA) {
+                       PERROR("live timer kernel snapshot");
+                       ret = -1;
+                       goto error_unlock;
+               }
+               DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
+               ret = send_empty_index(stream, ts);
+               if (ret < 0) {
+                       goto error_unlock;
+               }
+       }
+       ret = 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
+}
+
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+       uint64_t ts;
+       int ret;
+
+       assert(stream);
+       assert(stream->ustream);
+       /*
+        * While holding the stream mutex, try to take a snapshot, if it
+        * succeeds, it means that data is ready to be sent, just let the data
+        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+        * means that there is no data to read after the flush, so we can
+        * safely send the empty index.
+        */
+       pthread_mutex_lock(&stream->lock);
+       ret = cds_lfht_is_node_deleted(&stream->node.node);
+       if (ret) {
+               goto error_unlock;
+       }
+
+       ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
+       if (ret < 0) {
+               ERR("Failed to get the current timestamp");
+               goto error_unlock;
+       }
+       lttng_ustconsumer_flush_buffer(stream, 1);
+       ret = lttng_ustconsumer_take_snapshot(stream);
+       if (ret < 0) {
+               if (ret != -EAGAIN) {
+                       ERR("Taking UST snapshot");
+                       ret = -1;
+                       goto error_unlock;
+               }
+               DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
+               ret = send_empty_index(stream, ts);
+               if (ret < 0) {
+                       goto error_unlock;
+               }
+       }
+       ret = 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
+}
+
+/*
+ * Execute action on a live timer
+ */
+static void live_timer(struct lttng_consumer_local_data *ctx,
+               int sig, siginfo_t *si, void *uc)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       channel = si->si_value.sival_ptr;
+       assert(channel);
+
+       if (channel->switch_timer_error) {
+               goto error;
+       }
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       DBG("Live timer for channel %" PRIu64, channel->key);
+
+       rcu_read_lock();
+       switch (ctx->type) {
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&channel->key, lttng_ht_seed),
+                               ht->match_fct, &channel->key, &iter.iter,
+                               stream, node_channel_id.node) {
+                       ret = check_ust_stream(stream);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER_KERNEL:
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&channel->key, lttng_ht_seed),
+                               ht->match_fct, &channel->key, &iter.iter,
+                               stream, node_channel_id.node) {
+                       ret = check_kernel_stream(stream);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER_UNKNOWN:
+               assert(0);
+               break;
+       }
+
+error_unlock:
+       rcu_read_unlock();
+
+error:
+       return;
+}
+
+static
+void consumer_timer_signal_thread_qs(unsigned int signr)
+{
+       sigset_t pending_set;
+       int ret;
+
+       /*
+        * We need to be the only thread interacting with the thread
+        * that manages signals for teardown synchronization.
+        */
+       pthread_mutex_lock(&timer_signal.lock);
+
+       /* Ensure we don't have any signal queued for this channel. */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+                       break;
+               }
+               caa_cpu_relax();
+       }
+
+       /*
+        * From this point, no new signal handler will be fired that would try to
+        * access "chan". However, we still need to wait for any currently
+        * executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
+        * up.
+        */
+       kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
+               caa_cpu_relax();
+       }
+       cmm_smp_mb();
+
+       pthread_mutex_unlock(&timer_signal.lock);
+}
+
 /*
  * Set the timer for periodical metadata flush.
- * Should be called only from the recv cmd thread (single thread ensures
- * mutual exclusion).
  */
 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
                unsigned int switch_timer_interval)
@@ -122,13 +368,10 @@ void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
 
 /*
  * Stop and delete timer.
- * Should be called only from the recv cmd thread (single thread ensures
- * mutual exclusion).
  */
 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 {
        int ret;
-       sigset_t pending_set;
 
        assert(channel);
 
@@ -137,41 +380,67 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
                PERROR("timer_delete");
        }
 
-       /* Ensure we don't have any signal queued for this channel. */
-       for (;;) {
-               ret = sigemptyset(&pending_set);
-               if (ret == -1) {
-                       PERROR("sigemptyset");
-               }
-               ret = sigpending(&pending_set);
-               if (ret == -1) {
-                       PERROR("sigpending");
-               }
-               if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
-                       break;
-               }
-               caa_cpu_relax();
+       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
+
+       channel->switch_timer = 0;
+       channel->switch_timer_enabled = 0;
+}
+
+/*
+ * Set the timer for the live mode.
+ */
+void consumer_timer_live_start(struct lttng_consumer_channel *channel,
+               int live_timer_interval)
+{
+       int ret;
+       struct sigevent sev;
+       struct itimerspec its;
+
+       assert(channel);
+       assert(channel->key);
+
+       if (live_timer_interval <= 0) {
+               return;
        }
 
-       /*
-        * From this point, no new signal handler will be fired that would try to
-        * access "chan". However, we still need to wait for any currently
-        * executing handler to complete.
-        */
-       cmm_smp_mb();
-       CMM_STORE_SHARED(timer_signal.qs_done, 0);
-       cmm_smp_mb();
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
+       sev.sigev_value.sival_ptr = channel;
+       ret = timer_create(CLOCKID, &sev, &channel->live_timer);
+       if (ret == -1) {
+               PERROR("timer_create");
+       }
+       channel->live_timer_enabled = 1;
 
-       /*
-        * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
-        * up.
-        */
-       kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
+       its.it_value.tv_sec = live_timer_interval / 1000000;
+       its.it_value.tv_nsec = live_timer_interval % 1000000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
-               caa_cpu_relax();
+       ret = timer_settime(channel->live_timer, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
        }
-       cmm_smp_mb();
+}
+
+/*
+ * Stop and delete timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       assert(channel);
+
+       ret = timer_delete(channel->live_timer);
+       if (ret == -1) {
+               PERROR("timer_delete");
+       }
+
+       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
+
+       channel->live_timer = 0;
+       channel->live_timer_enabled = 0;
 }
 
 /*
@@ -193,23 +462,34 @@ void consumer_signal_init(void)
 }
 
 /*
- * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH and
- * LTTNG_CONSUMER_SIG_TEARDOWN that are emitted by the periodic timer to check
- * if new metadata is available.
+ * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
+ * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
  */
-void *consumer_timer_metadata_thread(void *data)
+void *consumer_timer_thread(void *data)
 {
        int signr;
        sigset_t mask;
        siginfo_t info;
        struct lttng_consumer_local_data *ctx = data;
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
+
+       if (testpoint(consumerd_thread_metadata_timer)) {
+               goto error_testpoint;
+       }
+
+       health_code_update();
+
        /* Only self thread will receive signal mask. */
        setmask(&mask);
        CMM_STORE_SHARED(timer_signal.tid, pthread_self());
 
        while (1) {
+               health_code_update();
+
+               health_poll_entry();
                signr = sigwaitinfo(&mask, &info);
+               health_poll_exit();
                if (signr == -1) {
                        if (errno != EINTR) {
                                PERROR("sigwaitinfo");
@@ -222,10 +502,18 @@ void *consumer_timer_metadata_thread(void *data)
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
                        cmm_smp_mb();
                        DBG("Signal timer metadata thread teardown");
+               } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
+                       live_timer(ctx, info.si_signo, &info, NULL);
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
        }
 
+error_testpoint:
+       /* Only reached in testpoint error */
+       health_error();
+       health_unregister(health_consumerd);
+
+       /* Never return */
        return NULL;
 }
This page took 0.028597 seconds and 4 git commands to generate.