X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Flib%2Fringbuffer%2Fring_buffer_frontend.c;h=fbf3a16837c8574ebf6b90a2986d804626ba2d97;hb=18fd64422b4297f9beea82f1c1a97b80791312e4;hp=8d1983573e4d7cf429f37815074a543f9231d0e1;hpb=860c213b645593fa19d7a3abf7ffdd1282f0a1c6;p=lttng-modules.git diff --git a/src/lib/ringbuffer/ring_buffer_frontend.c b/src/lib/ringbuffer/ring_buffer_frontend.c index 8d198357..fbf3a168 100644 --- a/src/lib/ringbuffer/ring_buffer_frontend.c +++ b/src/lib/ringbuffer/ring_buffer_frontend.c @@ -37,9 +37,11 @@ * - put_subbuf */ +#include #include #include #include +#include #include #include @@ -47,9 +49,8 @@ #include #include #include -#include +#include #include -#include #include #include @@ -80,14 +81,14 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); static void lib_ring_buffer_print_errors(struct lttng_kernel_ring_buffer_channel *chan, - struct lib_ring_buffer *buf, int cpu); + struct lttng_kernel_ring_buffer *buf, int cpu); static -void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, +void _lib_ring_buffer_switch_remote(struct lttng_kernel_ring_buffer *buf, enum switch_mode mode); static -int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +int lib_ring_buffer_poll_deliver(const struct lttng_kernel_ring_buffer_config *config, + struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan) { unsigned long consumed_old, consumed_idx, commit_count, write_offset; @@ -129,7 +130,7 @@ int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, /* * Must be called under cpu hotplug protection. */ -void lib_ring_buffer_free(struct lib_ring_buffer *buf) +void lib_ring_buffer_free(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; @@ -152,10 +153,10 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) * should not be using the iterator concurrently with reset. The previous * current iterator record is reset. */ -void lib_ring_buffer_reset(struct lib_ring_buffer *buf) +void lib_ring_buffer_reset(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned int i; /* @@ -210,7 +211,7 @@ EXPORT_SYMBOL_GPL(channel_reset); static void lib_ring_buffer_pending_wakeup_buf(struct irq_work *entry) { - struct lib_ring_buffer *buf = container_of(entry, struct lib_ring_buffer, + struct lttng_kernel_ring_buffer *buf = container_of(entry, struct lttng_kernel_ring_buffer, wakeup_pending); wake_up_interruptible(&buf->read_wait); } @@ -224,10 +225,10 @@ static void lib_ring_buffer_pending_wakeup_chan(struct irq_work *entry) /* * Must be called under cpu hotplug protection. */ -int lib_ring_buffer_create(struct lib_ring_buffer *buf, +int lib_ring_buffer_create(struct lttng_kernel_ring_buffer *buf, struct channel_backend *chanb, int cpu) { - const struct lib_ring_buffer_config *config = &chanb->config; + const struct lttng_kernel_ring_buffer_config *config = &chanb->config; struct lttng_kernel_ring_buffer_channel *chan = container_of(chanb, struct lttng_kernel_ring_buffer_channel, backend); void *priv = chanb->priv; size_t subbuf_header_size; @@ -333,9 +334,9 @@ free_chanbuf: static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) { - struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer); + struct lttng_kernel_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer); struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; /* * Only flush buffers periodically if readers are active. @@ -354,10 +355,10 @@ static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) /* * Called with ring_buffer_nohz_lock held for per-cpu buffers. */ -static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_start_switch_timer(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned int flags = 0; if (!chan->switch_timer_interval || buf->switch_timer_enabled) @@ -380,7 +381,7 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) /* * Called with ring_buffer_nohz_lock held for per-cpu buffers. */ -static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_stop_switch_timer(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; @@ -396,9 +397,9 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) */ static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) { - struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer); + struct lttng_kernel_ring_buffer *buf = lttng_from_timer(buf, t, read_timer); struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); @@ -419,10 +420,10 @@ static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) /* * Called with ring_buffer_nohz_lock held for per-cpu buffers. */ -static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_start_read_timer(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned int flags = 0; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -447,10 +448,10 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) /* * Called with ring_buffer_nohz_lock held for per-cpu buffers. */ -static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_stop_read_timer(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -491,8 +492,8 @@ int lttng_cpuhp_rb_frontend_dead(unsigned int cpu, { struct lttng_kernel_ring_buffer_channel *chan = container_of(node, struct lttng_kernel_ring_buffer_channel, cpuhp_prepare); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); @@ -512,8 +513,8 @@ int lttng_cpuhp_rb_frontend_online(unsigned int cpu, { struct lttng_kernel_ring_buffer_channel *chan = container_of(node, struct lttng_kernel_ring_buffer_channel, cpuhp_online); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); @@ -529,8 +530,8 @@ int lttng_cpuhp_rb_frontend_offline(unsigned int cpu, { struct lttng_kernel_ring_buffer_channel *chan = container_of(node, struct lttng_kernel_ring_buffer_channel, cpuhp_online); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); @@ -560,8 +561,8 @@ int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, unsigned int cpu = (unsigned long)hcpu; struct lttng_kernel_ring_buffer_channel *chan = container_of(nb, struct lttng_kernel_ring_buffer_channel, cpu_hp_notifier); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; if (!chan->cpu_hp_enable) return NOTIFY_DONE; @@ -619,8 +620,8 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, { struct lttng_kernel_ring_buffer_channel *chan = container_of(nb, struct lttng_kernel_ring_buffer_channel, tick_nohz_notifier); - const struct lib_ring_buffer_config *config = &chan->backend.config; - struct lib_ring_buffer *buf; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf; int cpu = smp_processor_id(); if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) { @@ -651,16 +652,16 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, raw_spin_unlock(&buf->raw_tick_nohz_spinlock); break; case TICK_NOHZ_STOP: - spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock)); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); - spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock)); break; case TICK_NOHZ_RESTART: - spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock)); lib_ring_buffer_start_read_timer(buf); lib_ring_buffer_start_switch_timer(buf); - spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock)); break; } @@ -691,7 +692,7 @@ void notrace lib_ring_buffer_tick_nohz_restart(void) */ static void channel_unregister_notifiers(struct lttng_kernel_ring_buffer_channel *chan) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; channel_iterator_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { @@ -724,19 +725,19 @@ static void channel_unregister_notifiers(struct lttng_kernel_ring_buffer_channel int cpu; #ifdef CONFIG_HOTPLUG_CPU - get_online_cpus(); + lttng_cpus_read_lock(); chan->cpu_hp_enable = 0; for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); } - put_online_cpus(); + lttng_cpus_read_unlock(); unregister_cpu_notifier(&chan->cpu_hp_notifier); #else for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); @@ -745,7 +746,7 @@ static void channel_unregister_notifiers(struct lttng_kernel_ring_buffer_channel } #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */ } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lttng_kernel_ring_buffer *buf = chan->backend.buf; lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); @@ -753,7 +754,7 @@ static void channel_unregister_notifiers(struct lttng_kernel_ring_buffer_channel channel_backend_unregister_notifiers(&chan->backend); } -static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf) +static void lib_ring_buffer_set_quiescent(struct lttng_kernel_ring_buffer *buf) { if (!buf->quiescent) { buf->quiescent = true; @@ -761,7 +762,7 @@ static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf) } } -static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf) +static void lib_ring_buffer_clear_quiescent(struct lttng_kernel_ring_buffer *buf) { buf->quiescent = false; } @@ -769,19 +770,19 @@ static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf) void lib_ring_buffer_set_quiescent_channel(struct lttng_kernel_ring_buffer_channel *chan) { int cpu; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - get_online_cpus(); + lttng_cpus_read_lock(); for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); lib_ring_buffer_set_quiescent(buf); } - put_online_cpus(); + lttng_cpus_read_unlock(); } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lttng_kernel_ring_buffer *buf = chan->backend.buf; lib_ring_buffer_set_quiescent(buf); } @@ -791,19 +792,19 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel); void lib_ring_buffer_clear_quiescent_channel(struct lttng_kernel_ring_buffer_channel *chan) { int cpu; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - get_online_cpus(); + lttng_cpus_read_lock(); for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); lib_ring_buffer_clear_quiescent(buf); } - put_online_cpus(); + lttng_cpus_read_unlock(); } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lttng_kernel_ring_buffer *buf = chan->backend.buf; lib_ring_buffer_clear_quiescent(buf); } @@ -838,7 +839,7 @@ static void channel_free(struct lttng_kernel_ring_buffer_channel *chan) * Holds cpu hotplug. * Returns NULL on failure. */ -struct lttng_kernel_ring_buffer_channel *channel_create(const struct lib_ring_buffer_config *config, +struct lttng_kernel_ring_buffer_channel *channel_create(const struct lttng_kernel_ring_buffer_config *config, const char *name, void *priv, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, @@ -899,9 +900,9 @@ struct lttng_kernel_ring_buffer_channel *channel_create(const struct lib_ring_bu chan->cpu_hp_notifier.priority = 6; register_cpu_notifier(&chan->cpu_hp_notifier); - get_online_cpus(); + lttng_cpus_read_lock(); for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); lib_ring_buffer_start_switch_timer(buf); @@ -909,10 +910,10 @@ struct lttng_kernel_ring_buffer_channel *channel_create(const struct lib_ring_bu spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); } chan->cpu_hp_enable = 1; - put_online_cpus(); + lttng_cpus_read_unlock(); #else for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); lib_ring_buffer_start_switch_timer(buf); @@ -933,7 +934,7 @@ struct lttng_kernel_ring_buffer_channel *channel_create(const struct lib_ring_bu #endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lttng_kernel_ring_buffer *buf = chan->backend.buf; lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); @@ -977,7 +978,7 @@ void channel_release(struct kref *kref) void *channel_destroy(struct lttng_kernel_ring_buffer_channel *chan) { int cpu; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; void *priv; irq_work_sync(&chan->wakeup_pending); @@ -990,7 +991,7 @@ void *channel_destroy(struct lttng_kernel_ring_buffer_channel *chan) * unregistered. */ for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + struct lttng_kernel_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); if (config->cb.buffer_finalize) @@ -1005,7 +1006,7 @@ void *channel_destroy(struct lttng_kernel_ring_buffer_channel *chan) wake_up_interruptible(&buf->read_wait); } } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lttng_kernel_ring_buffer *buf = chan->backend.buf; if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); @@ -1025,8 +1026,8 @@ void *channel_destroy(struct lttng_kernel_ring_buffer_channel *chan) } EXPORT_SYMBOL_GPL(channel_destroy); -struct lib_ring_buffer *channel_get_ring_buffer( - const struct lib_ring_buffer_config *config, +struct lttng_kernel_ring_buffer *channel_get_ring_buffer( + const struct lttng_kernel_ring_buffer_config *config, struct lttng_kernel_ring_buffer_channel *chan, int cpu) { if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) @@ -1036,7 +1037,7 @@ struct lib_ring_buffer *channel_get_ring_buffer( } EXPORT_SYMBOL_GPL(channel_get_ring_buffer); -int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) +int lib_ring_buffer_open_read(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; @@ -1046,17 +1047,17 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) atomic_long_dec(&buf->active_readers); return -EOVERFLOW; } - lttng_smp_mb__after_atomic(); + smp_mb__after_atomic(); return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read); -void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) +void lib_ring_buffer_release_read(struct lttng_kernel_ring_buffer *buf) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); - lttng_smp_mb__before_atomic(); + smp_mb__before_atomic(); atomic_long_dec(&buf->active_readers); kref_put(&chan->ref, channel_release); } @@ -1084,11 +1085,11 @@ static void remote_mb(void *info) * Busy-loop trying to get data if the tick_nohz sequence lock is held. */ -int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, +int lib_ring_buffer_snapshot(struct lttng_kernel_ring_buffer *buf, unsigned long *consumed, unsigned long *produced) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; @@ -1147,11 +1148,11 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); * This function is meant to provide information on the exact producer and * consumer positions without regard for the "snapshot" feature. */ -int lib_ring_buffer_snapshot_sample_positions(struct lib_ring_buffer *buf, +int lib_ring_buffer_snapshot_sample_positions(struct lttng_kernel_ring_buffer *buf, unsigned long *consumed, unsigned long *produced) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; smp_rmb(); *consumed = atomic_long_read(&buf->consumed); @@ -1174,10 +1175,10 @@ int lib_ring_buffer_snapshot_sample_positions(struct lib_ring_buffer *buf, * @buf: ring buffer * @consumed_new: new consumed count value */ -void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, +void lib_ring_buffer_move_consumer(struct lttng_kernel_ring_buffer *buf, unsigned long consumed_new) { - struct lib_ring_buffer_backend *bufb = &buf->backend; + struct lttng_kernel_ring_buffer_backend *bufb = &buf->backend; struct lttng_kernel_ring_buffer_channel *chan = bufb->chan; unsigned long consumed; @@ -1199,16 +1200,26 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); #if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE static void lib_ring_buffer_flush_read_subbuf_dcache( - const struct lib_ring_buffer_config *config, + const struct lttng_kernel_ring_buffer_config *config, struct lttng_kernel_ring_buffer_channel *chan, - struct lib_ring_buffer *buf) + struct lttng_kernel_ring_buffer *buf) { - struct lib_ring_buffer_backend_pages *pages; + struct lttng_kernel_ring_buffer_backend_pages *pages; unsigned long sb_bindex, id, i, nr_pages; if (config->output != RING_BUFFER_MMAP) return; +#ifdef cpu_dcache_is_aliasing + /* + * Some architectures implement flush_dcache_page() but don't + * actually have aliasing dcache. cpu_dcache_is_aliasing() was + * introduced in kernel v6.9 to query this more precisely. + */ + if (!cpu_dcache_is_aliasing()) + return; +#endif + /* * Architectures with caches aliased on virtual addresses may * use different cache lines for the linear mapping vs @@ -1223,7 +1234,7 @@ static void lib_ring_buffer_flush_read_subbuf_dcache( pages = buf->backend.array[sb_bindex]; nr_pages = buf->backend.num_pages_per_subbuf; for (i = 0; i < nr_pages; i++) { - struct lib_ring_buffer_backend_page *backend_page; + struct lttng_kernel_ring_buffer_backend_page *backend_page; backend_page = &pages->p[i]; flush_dcache_page(pfn_to_page(backend_page->pfn)); @@ -1231,9 +1242,9 @@ static void lib_ring_buffer_flush_read_subbuf_dcache( } #else static void lib_ring_buffer_flush_read_subbuf_dcache( - const struct lib_ring_buffer_config *config, + const struct lttng_kernel_ring_buffer_config *config, struct lttng_kernel_ring_buffer_channel *chan, - struct lib_ring_buffer *buf) + struct lttng_kernel_ring_buffer *buf) { } #endif @@ -1247,11 +1258,11 @@ static void lib_ring_buffer_flush_read_subbuf_dcache( * data to read at consumed position, or 0 if the get operation succeeds. * Busy-loop trying to get data if the tick_nohz sequence lock is held. */ -int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, +int lib_ring_buffer_get_subbuf(struct lttng_kernel_ring_buffer *buf, unsigned long consumed) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; int finalized; @@ -1402,11 +1413,11 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf); * lib_ring_buffer_put_subbuf - release exclusive subbuffer access * @buf: ring buffer */ -void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) +void lib_ring_buffer_put_subbuf(struct lttng_kernel_ring_buffer *buf) { - struct lib_ring_buffer_backend *bufb = &buf->backend; + struct lttng_kernel_ring_buffer_backend *bufb = &buf->backend; struct lttng_kernel_ring_buffer_channel *chan = bufb->chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); @@ -1460,12 +1471,12 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf); * position and the writer position. (inclusive) */ static -void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, +void lib_ring_buffer_print_subbuffer_errors(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, unsigned long cons_offset, int cpu) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); @@ -1487,11 +1498,11 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, } static -void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, +void lib_ring_buffer_print_buffer_errors(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, void *priv, int cpu) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; /* @@ -1520,10 +1531,10 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, #ifdef LTTNG_RING_BUFFER_COUNT_EVENTS static void lib_ring_buffer_print_records_count(struct lttng_kernel_ring_buffer_channel *chan, - struct lib_ring_buffer *buf, + struct lttng_kernel_ring_buffer *buf, int cpu) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; if (!strcmp(chan->backend.name, "relay-metadata")) { printk(KERN_DEBUG "LTTng: ring buffer %s: %lu records written, " @@ -1542,7 +1553,7 @@ void lib_ring_buffer_print_records_count(struct lttng_kernel_ring_buffer_channel #else static void lib_ring_buffer_print_records_count(struct lttng_kernel_ring_buffer_channel *chan, - struct lib_ring_buffer *buf, + struct lttng_kernel_ring_buffer *buf, int cpu) { } @@ -1550,9 +1561,9 @@ void lib_ring_buffer_print_records_count(struct lttng_kernel_ring_buffer_channel static void lib_ring_buffer_print_errors(struct lttng_kernel_ring_buffer_channel *chan, - struct lib_ring_buffer *buf, int cpu) + struct lttng_kernel_ring_buffer *buf, int cpu) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; void *priv = chan->backend.priv; lib_ring_buffer_print_records_count(chan, buf, cpu); @@ -1578,17 +1589,17 @@ void lib_ring_buffer_print_errors(struct lttng_kernel_ring_buffer_channel *chan, * Only executed when the buffer is finalized, in SWITCH_FLUSH. */ static -void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_old_start(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; struct commit_counters_hot *cc_hot; - config->cb.buffer_begin(buf, tsc, oldidx); + config->cb.buffer_begin(buf, ctx->priv.tsc, oldidx); /* * Order all writes to buffer before the commit count update that will @@ -1608,7 +1619,7 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, commit_count = v_read(config, &cc_hot->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, - commit_count, oldidx, tsc); + commit_count, oldidx, ctx); lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->old + config->cb.subbuffer_header_size(), commit_count, cc_hot); @@ -1623,12 +1634,12 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, * subbuffer. */ static -void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; struct commit_counters_hot *cc_hot; @@ -1647,7 +1658,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, * postponed until the commit counter is incremented for the * current space reservation. */ - *ts_end = tsc; + *ts_end = ctx->priv.tsc; /* * Order all writes to buffer and store to ts_end before the commit @@ -1666,7 +1677,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, v_add(config, padding_size, &cc_hot->cc); commit_count = v_read(config, &cc_hot->cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, - commit_count, oldidx, tsc); + commit_count, oldidx, ctx); lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->old + padding_size, commit_count, cc_hot); @@ -1680,17 +1691,17 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, * that this code is executed before the deliver of this sub-buffer. */ static -void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_new_start(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; struct commit_counters_hot *cc_hot; - config->cb.buffer_begin(buf, tsc, beginidx); + config->cb.buffer_begin(buf, ctx->priv.tsc, beginidx); /* * Order all writes to buffer before the commit count update that will @@ -1710,7 +1721,7 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, commit_count = v_read(config, &cc_hot->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, - commit_count, beginidx, tsc); + commit_count, beginidx, ctx); lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->begin + config->cb.subbuffer_header_size(), commit_count, cc_hot); @@ -1725,12 +1736,12 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, * we are currently doing the space reservation. */ static -void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_new_end(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long endidx, data_size; u64 *ts_end; @@ -1746,7 +1757,7 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, * postponed until the commit counter is incremented for the * current space reservation. */ - *ts_end = tsc; + *ts_end = ctx->priv.tsc; } /* @@ -1756,12 +1767,12 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, */ static int lib_ring_buffer_try_switch_slow(enum switch_mode mode, - struct lib_ring_buffer *buf, + struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 *tsc) + struct lttng_kernel_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long off, reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); @@ -1769,7 +1780,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, offsets->switch_old_start = 0; off = subbuf_offset(offsets->begin, chan); - *tsc = config->cb.ring_buffer_clock_read(chan); + ctx->priv.tsc = config->cb.ring_buffer_clock_read(chan); /* * Ensure we flush the header of an empty subbuffer when doing the @@ -1851,6 +1862,13 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, offsets->begin = subbuf_align(offsets->begin, chan); /* Note: old points to the next subbuf at offset 0 */ offsets->end = offsets->begin; + /* + * Populate the records lost counters prior to performing a + * sub-buffer switch. + */ + ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full); + ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap); + ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big); return 0; } @@ -1862,13 +1880,13 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * operations, this function must be called from the CPU which owns the buffer * for a ACTIVE flush. */ -void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode) +void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum switch_mode mode) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer_ctx ctx; struct switch_offsets offsets; unsigned long oldidx; - u64 tsc; offsets.size = 0; @@ -1877,7 +1895,7 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m */ do { if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets, - &tsc)) + &ctx)) return; /* Switch not needed */ } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old); @@ -1888,7 +1906,7 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m * records, never the opposite (missing a full TSC record when it would * be needed). */ - save_last_tsc(config, buf, tsc); + save_last_tsc(config, buf, ctx.priv.tsc); /* * Push the reader if necessary @@ -1902,35 +1920,35 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m * May need to populate header start on SWITCH_FLUSH. */ if (offsets.switch_old_start) { - lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc); + lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx); offsets.old += config->cb.subbuffer_header_size(); } /* * Switch old subbuffer. */ - lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx); } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); struct switch_param { - struct lib_ring_buffer *buf; + struct lttng_kernel_ring_buffer *buf; enum switch_mode mode; }; static void remote_switch(void *info) { struct switch_param *param = info; - struct lib_ring_buffer *buf = param->buf; + struct lttng_kernel_ring_buffer *buf = param->buf; lib_ring_buffer_switch_slow(buf, param->mode); } -static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, +static void _lib_ring_buffer_switch_remote(struct lttng_kernel_ring_buffer *buf, enum switch_mode mode) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; int ret; struct switch_param param; @@ -1963,22 +1981,22 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, } /* Switch sub-buffer if current sub-buffer is non-empty. */ -void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +void lib_ring_buffer_switch_remote(struct lttng_kernel_ring_buffer *buf) { _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE); } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); /* Switch sub-buffer even if current sub-buffer is empty. */ -void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf) +void lib_ring_buffer_switch_remote_empty(struct lttng_kernel_ring_buffer *buf) { _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty); -void lib_ring_buffer_clear(struct lib_ring_buffer *buf) +void lib_ring_buffer_clear(struct lttng_kernel_ring_buffer *buf) { - struct lib_ring_buffer_backend *bufb = &buf->backend; + struct lttng_kernel_ring_buffer_backend *bufb = &buf->backend; struct lttng_kernel_ring_buffer_channel *chan = bufb->chan; lib_ring_buffer_switch_remote(buf); @@ -1994,13 +2012,13 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_clear); * -EIO if data cannot be written into the buffer for any other reason. */ static -int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, +int lib_ring_buffer_try_reserve_slow(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, struct lttng_kernel_ring_buffer_ctx *ctx, void *client_ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long reserve_commit_diff, offset_cmp; retry: @@ -2142,12 +2160,21 @@ retry: */ offsets->switch_new_end = 1; /* For offsets->begin */ } + /* + * Populate the records lost counters when the space reservation + * may cause a sub-buffer switch. + */ + if (offsets->switch_new_end || offsets->switch_old_end) { + ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full); + ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap); + ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big); + } return 0; } -static struct lib_ring_buffer *get_current_buf(struct lttng_kernel_ring_buffer_channel *chan, int cpu) +static struct lttng_kernel_ring_buffer *get_current_buf(struct lttng_kernel_ring_buffer_channel *chan, int cpu) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) return per_cpu_ptr(chan->backend.buf, cpu); @@ -2157,8 +2184,8 @@ static struct lib_ring_buffer *get_current_buf(struct lttng_kernel_ring_buffer_c void lib_ring_buffer_lost_event_too_big(struct lttng_kernel_ring_buffer_channel *chan) { - const struct lib_ring_buffer_config *config = &chan->backend.config; - struct lib_ring_buffer *buf = get_current_buf(chan, smp_processor_id()); + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf = get_current_buf(chan, smp_processor_id()); v_inc(config, &buf->records_lost_big); } @@ -2176,8 +2203,8 @@ int lib_ring_buffer_reserve_slow(struct lttng_kernel_ring_buffer_ctx *ctx, void *client_ctx) { struct lttng_kernel_ring_buffer_channel *chan = ctx->priv.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; - struct lib_ring_buffer *buf; + const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer *buf; struct switch_offsets offsets; int ret; @@ -2218,17 +2245,17 @@ int lib_ring_buffer_reserve_slow(struct lttng_kernel_ring_buffer_ctx *ctx, if (unlikely(offsets.switch_old_end)) { lib_ring_buffer_clear_noref(config, &buf->backend, subbuf_index(offsets.old - 1, chan)); - lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->priv.tsc); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx); } /* * Populate new subbuffer. */ if (unlikely(offsets.switch_new_start)) - lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->priv.tsc); + lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx); if (unlikely(offsets.switch_new_end)) - lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->priv.tsc); + lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx); ctx->priv.slot_size = offsets.size; ctx->priv.pre_offset = offsets.begin; @@ -2238,8 +2265,8 @@ int lib_ring_buffer_reserve_slow(struct lttng_kernel_ring_buffer_ctx *ctx, EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); static -void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void lib_ring_buffer_vmcore_check_deliver(const struct lttng_kernel_ring_buffer_config *config, + struct lttng_kernel_ring_buffer *buf, unsigned long commit_count, unsigned long idx) { @@ -2253,8 +2280,8 @@ void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *c */ #ifdef LTTNG_RING_BUFFER_COUNT_EVENTS static -void deliver_count_events(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void deliver_count_events(const struct lttng_kernel_ring_buffer_config *config, + struct lttng_kernel_ring_buffer *buf, unsigned long idx) { v_add(config, subbuffer_get_records_count(config, @@ -2266,21 +2293,21 @@ void deliver_count_events(const struct lib_ring_buffer_config *config, } #else /* LTTNG_RING_BUFFER_COUNT_EVENTS */ static -void deliver_count_events(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void deliver_count_events(const struct lttng_kernel_ring_buffer_config *config, + struct lttng_kernel_ring_buffer *buf, unsigned long idx) { } #endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */ -void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_config *config, + struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, unsigned long offset, unsigned long commit_count, unsigned long idx, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -2340,7 +2367,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con config->cb.buffer_end(buf, *ts_end, idx, lib_ring_buffer_get_data_size(config, buf, - idx)); + idx), ctx); /* * Increment the packet counter while we have exclusive