Rename struct lib_ring_buffer_ctx to struct lttng_kernel_ring_buffer_ctx
[lttng-modules.git] / src / lib / ringbuffer / ring_buffer_frontend.c
index fca37fbc3a5b143a08cb55b94f762bf664fb7187..cd8d0aaa0b6fdf1708da881b1ee6c6de36e0c083 100644 (file)
@@ -133,6 +133,8 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf)
 {
        struct channel *chan = buf->backend.chan;
 
+       irq_work_sync(&buf->wakeup_pending);
+
        lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
        lttng_kvfree(buf->commit_hot);
        lttng_kvfree(buf->commit_cold);
@@ -206,6 +208,19 @@ void channel_reset(struct channel *chan)
 }
 EXPORT_SYMBOL_GPL(channel_reset);
 
+static void lib_ring_buffer_pending_wakeup_buf(struct irq_work *entry)
+{
+       struct lib_ring_buffer *buf = container_of(entry, struct lib_ring_buffer,
+                                                  wakeup_pending);
+       wake_up_interruptible(&buf->read_wait);
+}
+
+static void lib_ring_buffer_pending_wakeup_chan(struct irq_work *entry)
+{
+       struct channel *chan = container_of(entry, struct channel, wakeup_pending);
+       wake_up_interruptible(&chan->read_wait);
+}
+
 /*
  * Must be called under cpu hotplug protection.
  */
@@ -268,6 +283,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
 
        init_waitqueue_head(&buf->read_wait);
        init_waitqueue_head(&buf->write_wait);
+       init_irq_work(&buf->wakeup_pending, lib_ring_buffer_pending_wakeup_buf);
        raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
 
        /*
@@ -453,7 +469,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
        buf->read_timer_enabled = 0;
 }
 
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
 
 enum cpuhp_state lttng_rb_hp_prepare;
 enum cpuhp_state lttng_rb_hp_online;
@@ -524,7 +540,7 @@ int lttng_cpuhp_rb_frontend_offline(unsigned int cpu,
 }
 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline);
 
-#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
 
 #ifdef CONFIG_HOTPLUG_CPU
 
@@ -586,7 +602,7 @@ int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
 
 #endif
 
-#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
 
 #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
 /*
@@ -692,7 +708,7 @@ static void channel_unregister_notifiers(struct channel *chan)
                 * concurrency.
                 */
 #endif /* CONFIG_NO_HZ */
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
                {
                        int ret;
 
@@ -703,7 +719,7 @@ static void channel_unregister_notifiers(struct channel *chan)
                                &chan->cpuhp_prepare.node);
                        WARN_ON(ret);
                }
-#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
                {
                        int cpu;
 
@@ -727,7 +743,7 @@ static void channel_unregister_notifiers(struct channel *chan)
                        }
 #endif
                }
-#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
        } else {
                struct lib_ring_buffer *buf = chan->backend.buf;
 
@@ -854,9 +870,10 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
        kref_init(&chan->ref);
        init_waitqueue_head(&chan->read_wait);
        init_waitqueue_head(&chan->hp_wait);
+       init_irq_work(&chan->wakeup_pending, lib_ring_buffer_pending_wakeup_chan);
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
                chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND;
                ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare,
                        &chan->cpuhp_prepare.node);
@@ -868,7 +885,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
                        &chan->cpuhp_online.node);
                if (ret)
                        goto cpuhp_online_error;
-#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
                {
                        int cpu;
                        /*
@@ -904,7 +921,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
                        }
 #endif
                }
-#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
 
 #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
                /* Only benefit from NO_HZ idle with per-cpu buffers for now. */
@@ -924,13 +941,13 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
 
        return chan;
 
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+#if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
 cpuhp_online_error:
        ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
                        &chan->cpuhp_prepare.node);
        WARN_ON(ret);
 cpuhp_prepare_error:
-#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+#endif /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
 error_free_backend:
        channel_backend_free(&chan->backend);
 error:
@@ -963,6 +980,8 @@ void *channel_destroy(struct channel *chan)
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        void *priv;
 
+       irq_work_sync(&chan->wakeup_pending);
+
        channel_unregister_notifiers(chan);
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
@@ -1074,7 +1093,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
        int finalized;
 
 retry:
-       finalized = READ_ONCE(buf->finalized);
+       finalized = LTTNG_READ_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
@@ -1245,7 +1264,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
                return -EBUSY;
        }
 retry:
-       finalized = READ_ONCE(buf->finalized);
+       finalized = LTTNG_READ_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
@@ -1455,7 +1474,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
 
        if (subbuf_offset(commit_count, chan) != 0)
                printk(KERN_WARNING
-                      "ring buffer %s, cpu %d: "
+                      "LTTng: ring buffer %s, cpu %d: "
                       "commit count in subbuffer %lu,\n"
                       "expecting multiples of %lu bytes\n"
                       "  [ %lu bytes committed, %lu bytes reader-visible ]\n",
@@ -1463,7 +1482,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
                       chan->backend.subbuf_size,
                       commit_count, commit_count_sb);
 
-       printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n",
+       printk(KERN_DEBUG "LTTng: ring buffer: %s, cpu %d: %lu bytes committed\n",
               chan->backend.name, cpu, commit_count);
 }
 
@@ -1484,7 +1503,7 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
        cons_offset = atomic_long_read(&buf->consumed);
        if (write_offset != cons_offset)
                printk(KERN_DEBUG
-                      "ring buffer %s, cpu %d: "
+                      "LTTng: ring buffer %s, cpu %d: "
                       "non-consumed data\n"
                       "  [ %lu bytes written, %lu bytes read ]\n",
                       chan->backend.name, cpu, write_offset, cons_offset);
@@ -1507,13 +1526,13 @@ void lib_ring_buffer_print_records_count(struct channel *chan,
        const struct lib_ring_buffer_config *config = &chan->backend.config;
 
        if (!strcmp(chan->backend.name, "relay-metadata")) {
-               printk(KERN_DEBUG "ring buffer %s: %lu records written, "
+               printk(KERN_DEBUG "LTTng: ring buffer %s: %lu records written, "
                        "%lu records overrun\n",
                        chan->backend.name,
                        v_read(config, &buf->records_count),
                        v_read(config, &buf->records_overrun));
        } else {
-               printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, "
+               printk(KERN_DEBUG "LTTng: ring buffer %s, cpu %d: %lu records written, "
                        "%lu records overrun\n",
                        chan->backend.name, cpu,
                        v_read(config, &buf->records_count),
@@ -1542,7 +1561,7 @@ void lib_ring_buffer_print_errors(struct channel *chan,
                    || v_read(config, &buf->records_lost_wrap)
                    || v_read(config, &buf->records_lost_big))
                        printk(KERN_WARNING
-                               "ring buffer %s, cpu %d: records were lost. Caused by:\n"
+                               "LTTng: ring buffer %s, cpu %d: records were lost. Caused by:\n"
                                "  [ %lu buffer full, %lu nest buffer wrap-around, "
                                "%lu event too big ]\n",
                                chan->backend.name, cpu,
@@ -1978,7 +1997,7 @@ static
 int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                     struct channel *chan,
                                     struct switch_offsets *offsets,
-                                    struct lib_ring_buffer_ctx *ctx,
+                                    struct lttng_kernel_ring_buffer_ctx *ctx,
                                     void *client_ctx)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
@@ -1992,14 +2011,14 @@ retry:
        offsets->switch_old_end = 0;
        offsets->pre_header_padding = 0;
 
-       ctx->tsc = config->cb.ring_buffer_clock_read(chan);
-       if ((int64_t) ctx->tsc == -EIO)
+       ctx->priv.tsc = config->cb.ring_buffer_clock_read(chan);
+       if ((int64_t) ctx->priv.tsc == -EIO)
                return -EIO;
 
-       if (last_tsc_overflow(config, buf, ctx->tsc))
-               ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
+       if (last_tsc_overflow(config, buf, ctx->priv.tsc))
+               ctx->priv.rflags |= RING_BUFFER_RFLAG_FULL_TSC;
 
-       if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
+       if (unlikely(subbuf_offset(offsets->begin, ctx->priv.chan) == 0)) {
                offsets->switch_new_start = 1;          /* For offsets->begin */
        } else {
                offsets->size = config->cb.record_header_size(config, chan,
@@ -2153,16 +2172,16 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big);
  * -EIO for other errors, else returns 0.
  * It will take care of sub-buffer switching.
  */
-int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx,
+int lib_ring_buffer_reserve_slow(struct lttng_kernel_ring_buffer_ctx *ctx,
                void *client_ctx)
 {
-       struct channel *chan = ctx->chan;
+       struct channel *chan = ctx->priv.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        struct lib_ring_buffer *buf;
        struct switch_offsets offsets;
        int ret;
 
-       ctx->buf = buf = get_current_buf(chan, ctx->cpu);
+       ctx->priv.buf = buf = get_current_buf(chan, ctx->priv.reserve_cpu);
        offsets.size = 0;
 
        do {
@@ -2180,7 +2199,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx,
         * records, never the opposite (missing a full TSC record when it would
         * be needed).
         */
-       save_last_tsc(config, buf, ctx->tsc);
+       save_last_tsc(config, buf, ctx->priv.tsc);
 
        /*
         * Push the reader if necessary
@@ -2199,21 +2218,21 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx,
        if (unlikely(offsets.switch_old_end)) {
                lib_ring_buffer_clear_noref(config, &buf->backend,
                                            subbuf_index(offsets.old - 1, chan));
-               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
+               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->priv.tsc);
        }
 
        /*
         * Populate new subbuffer.
         */
        if (unlikely(offsets.switch_new_start))
-               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
+               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->priv.tsc);
 
        if (unlikely(offsets.switch_new_end))
-               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
+               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->priv.tsc);
 
-       ctx->slot_size = offsets.size;
-       ctx->pre_offset = offsets.begin;
-       ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
+       ctx->priv.slot_size = offsets.size;
+       ctx->priv.pre_offset = offsets.begin;
+       ctx->priv.buf_offset = offsets.begin + offsets.pre_header_padding;
        return 0;
 }
 EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
@@ -2356,13 +2375,14 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con
                                                 commit_count, idx);
 
                /*
-                * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
+                * RING_BUFFER_WAKEUP_BY_WRITER uses an irq_work to issue
+                * the wakeups.
                 */
                if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
                    && atomic_long_read(&buf->active_readers)
                    && lib_ring_buffer_poll_deliver(config, buf, chan)) {
-                       wake_up_interruptible(&buf->read_wait);
-                       wake_up_interruptible(&chan->read_wait);
+                       irq_work_queue(&buf->wakeup_pending);
+                       irq_work_queue(&chan->wakeup_pending);
                }
 
        }
This page took 0.02978 seconds and 4 git commands to generate.