Implement read timer (for RT)
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index d0e9466810cc0cb45b7ef51e40a67c08e367cf33..6504f0e64cee59a07344703d8c402a96c375f5b1 100644 (file)
@@ -80,8 +80,9 @@
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
 
-#define LTTNG_UST_RB_SIG               SIGRTMIN
-#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_FLUSH         SIGRTMIN
+#define LTTNG_UST_RB_SIG_READ          SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 2
 #define CLOCKID                CLOCK_MONOTONIC
 
 /*
@@ -282,7 +283,7 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
        handle = chan->handle;
        config = &chan->backend.config;
 
-       DBG("Timer for channel %p\n", chan);
+       DBG("Switch timer for channel %p\n", chan);
 
        /*
         * Only flush buffers periodically if readers are active.
@@ -300,14 +301,64 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
                struct lttng_ust_lib_ring_buffer *buf =
                        shmp(handle, chan->backend.buf[0].shmp);
 
-                       if (uatomic_read(&buf->active_readers))
-                               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
-                                       chan->handle);
+               if (uatomic_read(&buf->active_readers))
+                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                               chan->handle);
        }
        pthread_mutex_unlock(&wakeup_fd_mutex);
        return;
 }
 
+static
+void lib_ring_buffer_channel_do_read(struct channel *chan)
+{
+       const struct lttng_ust_lib_ring_buffer_config *config;
+       struct lttng_ust_shm_handle *handle;
+       int cpu;
+
+       handle = chan->handle;
+       config = &chan->backend.config;
+
+       /*
+        * Only flush buffers periodically if readers are active.
+        */
+       pthread_mutex_lock(&wakeup_fd_mutex);
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               for_each_possible_cpu(cpu) {
+                       struct lttng_ust_lib_ring_buffer *buf =
+                               shmp(handle, chan->backend.buf[cpu].shmp);
+
+                       if (uatomic_read(&buf->active_readers)
+                           && lib_ring_buffer_poll_deliver(config, buf,
+                                       chan, handle)) {
+                               lib_ring_buffer_wakeup(buf, handle);
+                       }
+               }
+       } else {
+               struct lttng_ust_lib_ring_buffer *buf =
+                       shmp(handle, chan->backend.buf[0].shmp);
+
+               if (uatomic_read(&buf->active_readers)
+                   && lib_ring_buffer_poll_deliver(config, buf,
+                               chan, handle)) {
+                       lib_ring_buffer_wakeup(buf, handle);
+               }
+       }
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+}
+
+static
+void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
+{
+       struct channel *chan;
+
+       assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+       chan = si->si_value.sival_ptr;
+       DBG("Read timer for channel %p\n", chan);
+       lib_ring_buffer_channel_do_read(chan);
+       return;
+}
+
 static
 void rb_setmask(sigset_t *mask)
 {
@@ -317,7 +368,11 @@ void rb_setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigemptyset");
        }
-       ret = sigaddset(mask, LTTNG_UST_RB_SIG);
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH);
+       if (ret) {
+               PERROR("sigaddset");
+       }
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ);
        if (ret) {
                PERROR("sigaddset");
        }
@@ -341,12 +396,16 @@ void *sig_thread(void *arg)
        for (;;) {
                signr = sigwaitinfo(&mask, &info);
                if (signr == -1) {
-                       PERROR("sigwaitinfo");
+                       if (errno != EINTR)
+                               PERROR("sigwaitinfo");
                        continue;
                }
-               if (signr == LTTNG_UST_RB_SIG) {
+               if (signr == LTTNG_UST_RB_SIG_FLUSH) {
                        lib_ring_buffer_channel_switch_timer(info.si_signo,
                                        &info, NULL);
+               } else if (signr == LTTNG_UST_RB_SIG_READ) {
+                       lib_ring_buffer_channel_read_timer(info.si_signo,
+                                       &info, NULL);
                } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
@@ -402,7 +461,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
        lib_ring_buffer_setup_timer_thread();
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_UST_RB_SIG;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
        sev.sigev_value.sival_ptr = chan;
        ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
        if (ret == -1) {
@@ -427,7 +486,7 @@ static
 void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
 {
        sigset_t pending_set;
-       int sig_is_pending, ret;
+       int ret;
 
        if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
                return;
@@ -449,8 +508,7 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
                if (ret == -1) {
                        PERROR("sigpending");
                }
-               sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG);
-               if (!sig_is_pending)
+               if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH))
                        break;
                caa_cpu_relax();
        }
@@ -478,82 +536,108 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
        chan->switch_timer_enabled = 0;
 }
 
-#if 0
 /*
- * Polling timer to check the channels for data.
+ * Called with ust_lock() held.
  */
-static void read_buffer_timer(unsigned long data)
+static
+void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
 {
-       struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       struct sigevent sev;
+       struct itimerspec its;
+       int ret;
 
-       CHAN_WARN_ON(chan, !buf->backend.allocated);
+       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
+                       || !chan->read_timer_interval || chan->read_timer_enabled)
+               return;
 
-       if (uatomic_read(&buf->active_readers))
-           && lib_ring_buffer_poll_deliver(config, buf, chan)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
-       }
+       chan->read_timer_enabled = 1;
 
-       //TODO
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      mod_timer_pinned(&buf->read_timer,
-       //                       jiffies + chan->read_timer_interval);
-       //else
-       //      mod_timer(&buf->read_timer,
-       //                jiffies + chan->read_timer_interval);
-}
-#endif //0
+       lib_ring_buffer_setup_timer_thread();
 
-static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
-{
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_READ;
+       sev.sigev_value.sival_ptr = chan;
+       ret = timer_create(CLOCKID, &sev, &chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_create");
+       }
 
-       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || buf->read_timer_enabled)
-               return;
+       its.it_value.tv_sec = chan->read_timer_interval / 1000000;
+       its.it_value.tv_nsec = chan->read_timer_interval % 1000000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       //TODO
-       //init_timer(&buf->read_timer);
-       //buf->read_timer.function = read_buffer_timer;
-       //buf->read_timer.expires = jiffies + chan->read_timer_interval;
-       //buf->read_timer.data = (unsigned long)buf;
-
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      add_timer_on(&buf->read_timer, buf->backend.cpu);
-       //else
-       //      add_timer(&buf->read_timer);
-       buf->read_timer_enabled = 1;
+       ret = timer_settime(chan->read_timer, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
+       }
 }
 
-static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       sigset_t pending_set;
+       int ret;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || !buf->read_timer_enabled)
+                       || !chan->read_timer_interval || !chan->read_timer_enabled)
                return;
 
-       //TODO
-       //del_timer_sync(&buf->read_timer);
+       ret = timer_delete(chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_delete");
+       }
+
        /*
         * do one more check to catch data that has been written in the last
         * timer period.
         */
-       if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
+       lib_ring_buffer_channel_do_read(chan);
+
+
+       /*
+        * Ensure we don't have any signal queued for this channel.
+        */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ))
+                       break;
+               caa_cpu_relax();
        }
-       buf->read_timer_enabled = 0;
+
+       /*
+        * From this point, no new signal handler will be fired that
+        * would try to access "chan". However, we still need to wait
+        * for any currently executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+        * thread wakes up.
+        */
+       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+               caa_cpu_relax();
+       cmm_smp_mb();
+
+       chan->read_timer = 0;
+       chan->read_timer_enabled = 0;
 }
 
 static void channel_unregister_notifiers(struct channel *chan,
@@ -563,18 +647,7 @@ static void channel_unregister_notifiers(struct channel *chan,
        int cpu;
 
        lib_ring_buffer_channel_switch_timer_stop(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-
-                       lib_ring_buffer_stop_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
-
-               lib_ring_buffer_stop_read_timer(buf, handle);
-       }
-       //channel_backend_unregister_notifiers(&chan->backend);
+       lib_ring_buffer_channel_read_timer_stop(chan);
 }
 
 static void channel_print_errors(struct channel *chan,
@@ -708,29 +781,12 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 
        chan->handle = handle;
        chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
-       chan->switch_timer_interval = switch_timer_interval;
-
-       //TODO
-       //chan->read_timer_interval = read_timer_interval;
-       //init_waitqueue_head(&chan->read_wait);
-       //init_waitqueue_head(&chan->hp_wait);
 
+       chan->switch_timer_interval = switch_timer_interval;
+       chan->read_timer_interval = read_timer_interval;
        lib_ring_buffer_channel_switch_timer_start(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               /*
-                * In case of non-hotplug cpu, if the ring-buffer is allocated
-                * in early initcall, it will not be notified of secondary cpus.
-                * In that off case, we need to allocate for all possible cpus.
-                */
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-                       lib_ring_buffer_start_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
+       lib_ring_buffer_channel_read_timer_start(chan);
 
-               lib_ring_buffer_start_read_timer(buf, handle);
-       }
        return handle;
 
 error_backend_init:
This page took 0.027367 seconds and 4 git commands to generate.