Implement consumer ring buffer position sampling
[lttng-tools.git] / src / common / consumer / consumer-timer.c
index b0a434284a87d888f631b3fc8f0d435fc4c1d888..09cf3629331d344e54c63a6d3b2a467c527cb648 100644 (file)
@@ -21,6 +21,7 @@
 #include <inttypes.h>
 #include <signal.h>
 
+#include <lttng/ust-ctl.h>
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/compat/endian.h>
 #include <common/consumer/consumer-testpoint.h>
 #include <common/ust-consumer/ust-consumer.h>
 
+typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
+               unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
+               unsigned long *produced);
+
 static struct timer_signal_data timer_signal = {
        .tid = 0,
        .setup_done = 0,
@@ -61,8 +68,14 @@ static void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset live");
        }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
+       if (ret) {
+               PERROR("sigaddset monitor");
+       }
 }
 
+static int channel_monitor_pipe = -1;
+
 /*
  * Execute action on a timer switch.
  *
@@ -71,7 +84,7 @@ static void setmask(sigset_t *mask)
  * deadlocks.
  */
 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
-               int sig, siginfo_t *si, void *uc)
+               int sig, siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
@@ -304,7 +317,7 @@ end:
  * Execute action on a live timer
  */
 static void live_timer(struct lttng_consumer_local_data *ctx,
-               int sig, siginfo_t *si, void *uc)
+               int sig, siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
@@ -411,44 +424,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
 }
 
 /*
- * Set the timer for periodical metadata flush.
+ * Start a timer channel timer which will fire at a given interval
+ * (timer_interval_us)and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
  */
-void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
-               unsigned int switch_timer_interval)
+static
+int consumer_channel_timer_start(timer_t *timer_id,
+               struct lttng_consumer_channel *channel,
+               unsigned int timer_interval_us, int signal)
 {
-       int ret;
+       int ret = 0, delete_ret;
        struct sigevent sev;
        struct itimerspec its;
 
        assert(channel);
        assert(channel->key);
 
-       if (switch_timer_interval == 0) {
-               return;
+       if (timer_interval_us == 0) {
+               /* No creation needed; not an error. */
+               ret = 1;
+               goto end;
        }
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
+       sev.sigev_signo = signal;
        sev.sigev_value.sival_ptr = channel;
-       ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
+       ret = timer_create(CLOCKID, &sev, timer_id);
        if (ret == -1) {
                PERROR("timer_create");
+               goto end;
        }
-       channel->switch_timer_enabled = 1;
 
-       its.it_value.tv_sec = switch_timer_interval / 1000000;
-       its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
+       its.it_value.tv_sec = timer_interval_us / 1000000;
+       its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
        its.it_interval.tv_sec = its.it_value.tv_sec;
        its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       ret = timer_settime(channel->switch_timer, 0, &its, NULL);
+       ret = timer_settime(*timer_id, 0, &its, NULL);
        if (ret == -1) {
                PERROR("timer_settime");
+               goto error_destroy_timer;
+       }
+end:
+       return ret;
+error_destroy_timer:
+       delete_ret = timer_delete(*timer_id);
+       if (delete_ret == -1) {
+               PERROR("timer_delete");
+       }
+       goto end;
+}
+
+static
+int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+{
+       int ret = 0;
+
+       ret = timer_delete(*timer_id);
+       if (ret == -1) {
+               PERROR("timer_delete");
+               goto end;
        }
+
+       consumer_timer_signal_thread_qs(signal);
+       *timer_id = 0;
+end:
+       return ret;
+}
+
+/*
+ * Set the channel's switch timer.
+ */
+void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
+               unsigned int switch_timer_interval_us)
+{
+       int ret;
+
+       assert(channel);
+       assert(channel->key);
+
+       ret = consumer_channel_timer_start(&channel->switch_timer, channel,
+                       switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+
+       channel->switch_timer_enabled = !!(ret == 0);
 }
 
 /*
- * Stop and delete timer.
+ * Stop and delete the channel's switch timer.
  */
 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 {
@@ -456,72 +520,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 
        assert(channel);
 
-       ret = timer_delete(channel->switch_timer);
+       ret = consumer_channel_timer_stop(&channel->switch_timer,
+                       LTTNG_CONSUMER_SIG_SWITCH);
        if (ret == -1) {
-               PERROR("timer_delete");
+               ERR("Failed to stop switch timer");
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
-
-       channel->switch_timer = 0;
        channel->switch_timer_enabled = 0;
 }
 
 /*
- * Set the timer for the live mode.
+ * Set the channel's live timer.
  */
 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
-               int live_timer_interval)
+               unsigned int live_timer_interval_us)
 {
        int ret;
-       struct sigevent sev;
-       struct itimerspec its;
 
        assert(channel);
        assert(channel->key);
 
-       if (live_timer_interval <= 0) {
-               return;
-       }
+       ret = consumer_channel_timer_start(&channel->live_timer, channel,
+                       live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
 
-       sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
-       sev.sigev_value.sival_ptr = channel;
-       ret = timer_create(CLOCKID, &sev, &channel->live_timer);
-       if (ret == -1) {
-               PERROR("timer_create");
-       }
-       channel->live_timer_enabled = 1;
+       channel->live_timer_enabled = !!(ret == 0);
+}
 
-       its.it_value.tv_sec = live_timer_interval / 1000000;
-       its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
-       its.it_interval.tv_sec = its.it_value.tv_sec;
-       its.it_interval.tv_nsec = its.it_value.tv_nsec;
+/*
+ * Stop and delete the channel's live timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       assert(channel);
 
-       ret = timer_settime(channel->live_timer, 0, &its, NULL);
+       ret = consumer_channel_timer_stop(&channel->live_timer,
+                       LTTNG_CONSUMER_SIG_LIVE);
        if (ret == -1) {
-               PERROR("timer_settime");
+               ERR("Failed to stop live timer");
        }
+
+       channel->live_timer_enabled = 0;
 }
 
 /*
- * Stop and delete timer.
+ * Set the channel's monitoring timer.
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
  */
-void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+               unsigned int monitor_timer_interval_us)
 {
        int ret;
 
        assert(channel);
+       assert(channel->key);
+       assert(!channel->monitor_timer_enabled);
 
-       ret = timer_delete(channel->live_timer);
+       ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
+                       monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+       channel->monitor_timer_enabled = !!(ret == 0);
+       return ret;
+}
+
+/*
+ * Stop and delete the channel's monitoring timer.
+ */
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       assert(channel);
+       assert(channel->monitor_timer_enabled);
+
+       ret = consumer_channel_timer_stop(&channel->monitor_timer,
+                       LTTNG_CONSUMER_SIG_MONITOR);
        if (ret == -1) {
-               PERROR("timer_delete");
+               ERR("Failed to stop live timer");
+               goto end;
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
-       channel->live_timer = 0;
-       channel->live_timer_enabled = 0;
+       channel->monitor_timer_enabled = 0;
+end:
+       return ret;
 }
 
 /*
@@ -544,9 +627,165 @@ int consumer_signal_init(void)
        return 0;
 }
 
+static
+int sample_channel_positions(struct lttng_consumer_channel *channel,
+               uint64_t *_highest_use, uint64_t *_lowest_use,
+               sample_positions_cb sample, get_consumed_cb get_consumed,
+               get_produced_cb get_produced)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       bool empty_channel = true;
+       uint64_t high = 0, low = UINT64_MAX;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               unsigned long produced, consumed, usage;
+
+               empty_channel = false;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               ret = sample(stream);
+               if (ret) {
+                       ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+               ret = get_consumed(stream, &consumed);
+               if (ret) {
+                       ERR("Failed to get buffer consumed position in monitor timer");
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+               ret = get_produced(stream, &produced);
+               if (ret) {
+                       ERR("Failed to get buffer produced position in monitor timer");
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+
+               usage = produced - consumed;
+               high = (usage > high) ? usage : high;
+               low = (usage < low) ? usage : low;
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       *_highest_use = high;
+       *_lowest_use = low;
+end:
+       rcu_read_unlock();
+       if (empty_channel) {
+               ret = -1;
+       }
+       return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel *channel)
+{
+       int ret;
+       int channel_monitor_pipe =
+                       consumer_timer_thread_get_channel_monitor_pipe();
+       struct lttcomm_consumer_channel_monitor_msg msg = {
+               .key = channel->key,
+       };
+       sample_positions_cb sample;
+       get_consumed_cb get_consumed;
+       get_produced_cb get_produced;
+
+       assert(channel);
+       pthread_mutex_lock(&consumer_data.lock);
+
+       if (channel_monitor_pipe < 0) {
+               goto end;
+       }
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               sample = lttng_kconsumer_sample_snapshot_positions;
+               get_consumed = lttng_kconsumer_get_consumed_snapshot;
+               get_produced = lttng_kconsumer_get_produced_snapshot;
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               sample = lttng_ustconsumer_sample_snapshot_positions;
+               get_consumed = lttng_ustconsumer_get_consumed_snapshot;
+               get_produced = lttng_ustconsumer_get_produced_snapshot;
+               break;
+       default:
+               abort();
+       }
+
+       ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
+                       sample, get_consumed, get_produced);
+       if (ret) {
+               goto end;
+       }
+
+       /*
+        * Writes performed here are assumed to be atomic which is only
+        * guaranteed for sizes < than PIPE_BUF.
+        */
+       assert(sizeof(msg) <= PIPE_BUF);
+
+       do {
+               ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               if (errno == EAGAIN) {
+                       /* Not an error, the sample is merely dropped. */
+                       DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
+                                       channel->key);
+               } else {
+                       PERROR("write to the channel monitor pipe");
+               }
+       } else {
+               DBG("Sent channel monitoring sample for channel key %" PRIu64
+                               ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
+                               channel->key, msg.highest, msg.lowest);
+       }
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+       return uatomic_read(&channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+       int ret;
+
+       ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+       if (ret != -1) {
+               ret = -1;
+               goto end;
+       }
+       ret = 0;
+end:
+       return ret;
+}
+
 /*
  * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
- * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_MONITOR.
  */
 void *consumer_timer_thread(void *data)
 {
@@ -575,20 +814,31 @@ void *consumer_timer_thread(void *data)
                health_poll_entry();
                signr = sigwaitinfo(&mask, &info);
                health_poll_exit();
+
+               /*
+                * NOTE: cascading conditions are used instead of a switch case
+                * since the use of SIGRTMIN in the definition of the signals'
+                * values prevents the reduction to an integer constant.
+                */
                if (signr == -1) {
                        if (errno != EINTR) {
                                PERROR("sigwaitinfo");
                        }
                        continue;
                } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
-                       metadata_switch_timer(ctx, info.si_signo, &info, NULL);
+                       metadata_switch_timer(ctx, info.si_signo, &info);
                } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
                        cmm_smp_mb();
                        DBG("Signal timer metadata thread teardown");
                } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
-                       live_timer(ctx, info.si_signo, &info, NULL);
+                       live_timer(ctx, info.si_signo, &info);
+               } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
+                       struct lttng_consumer_channel *channel;
+
+                       channel = info.si_value.sival_ptr;
+                       monitor_timer(ctx, channel);
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
This page took 0.040742 seconds and 4 git commands to generate.