X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=abd975762eed6dbe40d4dbbf21f0de279b2aa051;hb=1fd97f9f4f773e3e9dfc787e9c90b1418fa5a7d4;hp=c5123a6415e39afc75cc55b8208b9b6924fbf975;hpb=aece661f429d4ca54cf1e47bacc556679c28f342;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index c5123a64..abd97576 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -54,6 +54,7 @@ #include #include #include +#include #include #include @@ -64,6 +65,7 @@ #include #include #include +#include /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -146,8 +148,8 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) struct channel *chan = buf->backend.chan; lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); - kfree(buf->commit_hot); - kfree(buf->commit_cold); + lttng_kvfree(buf->commit_hot); + lttng_kvfree(buf->commit_cold); lib_ring_buffer_backend_free(&buf->backend); } @@ -244,20 +246,22 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, return ret; buf->commit_hot = - kzalloc_node(ALIGN(sizeof(*buf->commit_hot) + lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_hot) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); + GFP_KERNEL | __GFP_NOWARN, + cpu_to_node(max(cpu, 0))); if (!buf->commit_hot) { ret = -ENOMEM; goto free_chanbuf; } buf->commit_cold = - kzalloc_node(ALIGN(sizeof(*buf->commit_cold) + lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_cold) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); + GFP_KERNEL | __GFP_NOWARN, + cpu_to_node(max(cpu, 0))); if (!buf->commit_cold) { ret = -ENOMEM; goto free_commit; @@ -302,17 +306,17 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, /* Error handling */ free_init: - kfree(buf->commit_cold); + lttng_kvfree(buf->commit_cold); free_commit: - kfree(buf->commit_hot); + lttng_kvfree(buf->commit_hot); free_chanbuf: lib_ring_buffer_backend_free(&buf->backend); return ret; } -static void switch_buffer_timer(unsigned long data) +static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) { - struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; + struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer); struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -337,22 +341,22 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; + unsigned int flags = 0; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_init_timer_pinned(&buf->switch_timer); - else - init_timer(&buf->switch_timer); + flags = LTTNG_TIMER_PINNED; - buf->switch_timer.function = switch_buffer_timer; + lttng_timer_setup(&buf->switch_timer, switch_buffer_timer, flags, buf); buf->switch_timer.expires = jiffies + chan->switch_timer_interval; - buf->switch_timer.data = (unsigned long)buf; + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) add_timer_on(&buf->switch_timer, buf->backend.cpu); else add_timer(&buf->switch_timer); + buf->switch_timer_enabled = 1; } @@ -373,9 +377,9 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) /* * Polling timer to check the channels for data. */ -static void read_buffer_timer(unsigned long data) +static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) { - struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; + struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer); struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -402,6 +406,7 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; + unsigned int flags; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -409,18 +414,16 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) return; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_init_timer_pinned(&buf->read_timer); - else - init_timer(&buf->read_timer); + flags = LTTNG_TIMER_PINNED; - buf->read_timer.function = read_buffer_timer; + lttng_timer_setup(&buf->read_timer, read_buffer_timer, flags, buf); buf->read_timer.expires = jiffies + chan->read_timer_interval; - buf->read_timer.data = (unsigned long)buf; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) add_timer_on(&buf->read_timer, buf->backend.cpu); else add_timer(&buf->read_timer); + buf->read_timer_enabled = 1; } @@ -449,7 +452,81 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) buf->read_timer_enabled = 0; } +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) + +enum cpuhp_state lttng_rb_hp_prepare; +enum cpuhp_state lttng_rb_hp_online; + +void lttng_rb_set_hp_prepare(enum cpuhp_state val) +{ + lttng_rb_hp_prepare = val; +} +EXPORT_SYMBOL_GPL(lttng_rb_set_hp_prepare); + +void lttng_rb_set_hp_online(enum cpuhp_state val) +{ + lttng_rb_hp_online = val; +} +EXPORT_SYMBOL_GPL(lttng_rb_set_hp_online); + +int lttng_cpuhp_rb_frontend_dead(unsigned int cpu, + struct lttng_cpuhp_node *node) +{ + struct channel *chan = container_of(node, struct channel, + cpuhp_prepare); + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lib_ring_buffer_config *config = &chan->backend.config; + + CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); + + /* + * Performing a buffer switch on a remote CPU. Performed by + * the CPU responsible for doing the hotunplug after the target + * CPU stopped running completely. Ensures that all data + * from that remote CPU is flushed. + */ + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + return 0; +} +EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_dead); + +int lttng_cpuhp_rb_frontend_online(unsigned int cpu, + struct lttng_cpuhp_node *node) +{ + struct channel *chan = container_of(node, struct channel, + cpuhp_online); + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lib_ring_buffer_config *config = &chan->backend.config; + + CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); + + wake_up_interruptible(&chan->hp_wait); + lib_ring_buffer_start_switch_timer(buf); + lib_ring_buffer_start_read_timer(buf); + return 0; +} +EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_online); + +int lttng_cpuhp_rb_frontend_offline(unsigned int cpu, + struct lttng_cpuhp_node *node) +{ + struct channel *chan = container_of(node, struct channel, + cpuhp_online); + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); + const struct lib_ring_buffer_config *config = &chan->backend.config; + + CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); + + lib_ring_buffer_stop_switch_timer(buf); + lib_ring_buffer_stop_read_timer(buf); + return 0; +} +EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline); + +#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ + #ifdef CONFIG_HOTPLUG_CPU + /** * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback * @nb: notifier block @@ -505,8 +582,11 @@ int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, return NOTIFY_DONE; } } + #endif +#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ + #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* * For per-cpu buffers, call the reader wakeups before switching the buffer, so @@ -595,7 +675,6 @@ void notrace lib_ring_buffer_tick_nohz_restart(void) static void channel_unregister_notifiers(struct channel *chan) { const struct lib_ring_buffer_config *config = &chan->backend.config; - int cpu; channel_iterator_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { @@ -612,25 +691,42 @@ static void channel_unregister_notifiers(struct channel *chan) * concurrency. */ #endif /* CONFIG_NO_HZ */ -#ifdef CONFIG_HOTPLUG_CPU - get_online_cpus(); - chan->cpu_hp_enable = 0; - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) + { + int ret; + + ret = cpuhp_state_remove_instance(lttng_rb_hp_online, + &chan->cpuhp_online.node); + WARN_ON(ret); + ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare, + &chan->cpuhp_prepare.node); + WARN_ON(ret); } - put_online_cpus(); - unregister_cpu_notifier(&chan->cpu_hp_notifier); +#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ + { + int cpu; + +#ifdef CONFIG_HOTPLUG_CPU + get_online_cpus(); + chan->cpu_hp_enable = 0; + for_each_online_cpu(cpu) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + lib_ring_buffer_stop_switch_timer(buf); + lib_ring_buffer_stop_read_timer(buf); + } + put_online_cpus(); + unregister_cpu_notifier(&chan->cpu_hp_notifier); #else - for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - } + for_each_possible_cpu(cpu) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + lib_ring_buffer_stop_switch_timer(buf); + lib_ring_buffer_stop_read_timer(buf); + } #endif + } +#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ } else { struct lib_ring_buffer *buf = chan->backend.buf; @@ -731,7 +827,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval) { - int ret, cpu; + int ret; struct channel *chan; if (lib_ring_buffer_check_config(config, switch_timer_interval, @@ -759,6 +855,56 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) + chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND; + ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare, + &chan->cpuhp_prepare.node); + if (ret) + goto cpuhp_prepare_error; + + chan->cpuhp_online.component = LTTNG_RING_BUFFER_FRONTEND; + ret = cpuhp_state_add_instance(lttng_rb_hp_online, + &chan->cpuhp_online.node); + if (ret) + goto cpuhp_online_error; +#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ + { + int cpu; + /* + * In case of non-hotplug cpu, if the ring-buffer is allocated + * in early initcall, it will not be notified of secondary cpus. + * In that off case, we need to allocate for all possible cpus. + */ +#ifdef CONFIG_HOTPLUG_CPU + chan->cpu_hp_notifier.notifier_call = + lib_ring_buffer_cpu_hp_callback; + chan->cpu_hp_notifier.priority = 6; + register_cpu_notifier(&chan->cpu_hp_notifier); + + get_online_cpus(); + for_each_online_cpu(cpu) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); + lib_ring_buffer_start_switch_timer(buf); + lib_ring_buffer_start_read_timer(buf); + spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); + } + chan->cpu_hp_enable = 1; + put_online_cpus(); +#else + for_each_possible_cpu(cpu) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); + lib_ring_buffer_start_switch_timer(buf); + lib_ring_buffer_start_read_timer(buf); + spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); + } +#endif + } +#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ + #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* Only benefit from NO_HZ idle with per-cpu buffers for now. */ chan->tick_nohz_notifier.notifier_call = @@ -768,38 +914,6 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, &chan->tick_nohz_notifier); #endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ - /* - * In case of non-hotplug cpu, if the ring-buffer is allocated - * in early initcall, it will not be notified of secondary cpus. - * In that off case, we need to allocate for all possible cpus. - */ -#ifdef CONFIG_HOTPLUG_CPU - chan->cpu_hp_notifier.notifier_call = - lib_ring_buffer_cpu_hp_callback; - chan->cpu_hp_notifier.priority = 6; - register_cpu_notifier(&chan->cpu_hp_notifier); - - get_online_cpus(); - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); - } - chan->cpu_hp_enable = 1; - put_online_cpus(); -#else - for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); - } -#endif } else { struct lib_ring_buffer *buf = chan->backend.buf; @@ -809,6 +923,13 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, return chan; +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) +cpuhp_online_error: + ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare, + &chan->cpuhp_prepare.node); + WARN_ON(ret); +cpuhp_prepare_error: +#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ error_free_backend: channel_backend_free(&chan->backend); error: @@ -999,6 +1120,37 @@ nodata: } EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); +/** + * Performs the same function as lib_ring_buffer_snapshot(), but the positions + * are saved regardless of whether the consumed and produced positions are + * in the same subbuffer. + * @buf: ring buffer + * @consumed: consumed byte count indicating the last position read + * @produced: produced byte count indicating the last position written + * + * This function is meant to provide information on the exact producer and + * consumer positions without regard for the "snapshot" feature. + */ +int lib_ring_buffer_snapshot_sample_positions(struct lib_ring_buffer *buf, + unsigned long *consumed, unsigned long *produced) +{ + struct channel *chan = buf->backend.chan; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + smp_rmb(); + *consumed = atomic_long_read(&buf->consumed); + /* + * No need to issue a memory barrier between consumed count read and + * write offset read, because consumed count can only change + * concurrently in overwrite mode, and we keep a sequence counter + * identifier derived from the write offset to check we are getting + * the same sub-buffer we are expecting (the sub-buffers are atomically + * "tagged" upon writes, tags are checked upon read). + */ + *produced = v_read(config, &buf->offset); + return 0; +} + /** * lib_ring_buffer_put_snapshot - move consumed counter forward * @@ -1029,6 +1181,47 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, } EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); +#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE +static void lib_ring_buffer_flush_read_subbuf_dcache( + const struct lib_ring_buffer_config *config, + struct channel *chan, + struct lib_ring_buffer *buf) +{ + struct lib_ring_buffer_backend_pages *pages; + unsigned long sb_bindex, id, i, nr_pages; + + if (config->output != RING_BUFFER_MMAP) + return; + + /* + * Architectures with caches aliased on virtual addresses may + * use different cache lines for the linear mapping vs + * user-space memory mapping. Given that the ring buffer is + * based on the kernel linear mapping, aligning it with the + * user-space mapping is not straightforward, and would require + * extra TLB entries. Therefore, simply flush the dcache for the + * entire sub-buffer before reading it. + */ + id = buf->backend.buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + pages = buf->backend.array[sb_bindex]; + nr_pages = buf->backend.num_pages_per_subbuf; + for (i = 0; i < nr_pages; i++) { + struct lib_ring_buffer_backend_page *backend_page; + + backend_page = &pages->p[i]; + flush_dcache_page(pfn_to_page(backend_page->pfn)); + } +} +#else +static void lib_ring_buffer_flush_read_subbuf_dcache( + const struct lib_ring_buffer_config *config, + struct channel *chan, + struct lib_ring_buffer *buf) +{ +} +#endif + /** * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading * @buf: ring buffer @@ -1171,6 +1364,8 @@ retry: buf->get_subbuf_consumed = consumed; buf->get_subbuf = 1; + lib_ring_buffer_flush_read_subbuf_dcache(config, chan, buf); + return 0; nodata: @@ -1344,8 +1539,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, /* * lib_ring_buffer_switch_old_start: Populate old subbuffer header. * - * Only executed by SWITCH_FLUSH, which can be issued while tracing is active - * or at buffer finalization (destroy). + * Only executed when the buffer is finalized, in SWITCH_FLUSH. */ static void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, @@ -1356,6 +1550,7 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; + struct commit_counters_hot *cc_hot; config->cb.buffer_begin(buf, tsc, oldidx); @@ -1372,15 +1567,15 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, barrier(); } else smp_wmb(); - v_add(config, config->cb.subbuffer_header_size(), - &buf->commit_hot[oldidx].cc); - commit_count = v_read(config, &buf->commit_hot[oldidx].cc); + cc_hot = &buf->commit_hot[oldidx]; + v_add(config, config->cb.subbuffer_header_size(), &cc_hot->cc); + commit_count = v_read(config, &cc_hot->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, commit_count, oldidx, tsc); - lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, + lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->old + config->cb.subbuffer_header_size(), - commit_count); + commit_count, cc_hot); } /* @@ -1400,6 +1595,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; + struct commit_counters_hot *cc_hot; data_size = subbuf_offset(offsets->old - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; @@ -1418,12 +1614,14 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, barrier(); } else smp_wmb(); - v_add(config, padding_size, &buf->commit_hot[oldidx].cc); - commit_count = v_read(config, &buf->commit_hot[oldidx].cc); + cc_hot = &buf->commit_hot[oldidx]; + v_add(config, padding_size, &cc_hot->cc); + commit_count = v_read(config, &cc_hot->cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, commit_count, oldidx, tsc); - lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, - offsets->old + padding_size, commit_count); + lib_ring_buffer_write_commit_counter(config, buf, chan, + offsets->old + padding_size, commit_count, + cc_hot); } /* @@ -1442,6 +1640,7 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; + struct commit_counters_hot *cc_hot; config->cb.buffer_begin(buf, tsc, beginidx); @@ -1458,15 +1657,15 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, barrier(); } else smp_wmb(); - v_add(config, config->cb.subbuffer_header_size(), - &buf->commit_hot[beginidx].cc); - commit_count = v_read(config, &buf->commit_hot[beginidx].cc); + cc_hot = &buf->commit_hot[beginidx]; + v_add(config, config->cb.subbuffer_header_size(), &cc_hot->cc); + commit_count = v_read(config, &cc_hot->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, commit_count, beginidx, tsc); - lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx, + lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->begin + config->cb.subbuffer_header_size(), - commit_count); + commit_count, cc_hot); } /* @@ -1536,14 +1735,12 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, unsigned long sb_index, commit_count; /* - * We are performing a SWITCH_FLUSH. There may be concurrent - * writes into the buffer if e.g. invoked while performing a - * snapshot on an active trace. + * We are performing a SWITCH_FLUSH. At this stage, there are no + * concurrent writes into the buffer. * - * If the client does not save any header information (sub-buffer - * header size == 0), don't switch empty subbuffer on finalize, - * because it is invalid to deliver a completely empty - * subbuffer. + * The client does not save any header information. Don't + * switch empty subbuffer on finalize, because it is invalid to + * deliver a completely empty subbuffer. */ if (!config->cb.subbuffer_header_size()) return -1; @@ -1687,16 +1884,14 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, } /* - * Taking lock on CPU hotplug to ensure two things: first, that the + * Disabling preemption ensures two things: first, that the * target cpu is not taken concurrently offline while we are within - * smp_call_function_single() (I don't trust that get_cpu() on the - * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be - * confirmed)). Secondly, if it happens that the CPU is not online, our - * own call to lib_ring_buffer_switch_slow() needs to be protected from - * CPU hotplug handlers, which can also perform a remote subbuffer - * switch. + * smp_call_function_single(). Secondly, if it happens that the + * CPU is not online, our own call to lib_ring_buffer_switch_slow() + * needs to be protected from CPU hotplug handlers, which can + * also perform a remote subbuffer switch. */ - get_online_cpus(); + preempt_disable(); param.buf = buf; param.mode = mode; ret = smp_call_function_single(buf->backend.cpu, @@ -1705,9 +1900,10 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, /* Remote CPU is offline, do it ourself. */ lib_ring_buffer_switch_slow(buf, mode); } - put_online_cpus(); + preempt_enable(); } +/* Switch sub-buffer if current sub-buffer is non-empty. */ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) { _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE); @@ -1732,7 +1928,8 @@ static int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - struct lib_ring_buffer_ctx *ctx) + struct lib_ring_buffer_ctx *ctx, + void *client_ctx) { const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long reserve_commit_diff, offset_cmp; @@ -1758,7 +1955,7 @@ retry: offsets->size = config->cb.record_header_size(config, chan, offsets->begin, &offsets->pre_header_padding, - ctx); + ctx, client_ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -1842,7 +2039,7 @@ retry: config->cb.record_header_size(config, chan, offsets->begin, &offsets->pre_header_padding, - ctx); + ctx, client_ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -1906,7 +2103,8 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big); * -EIO for other errors, else returns 0. * It will take care of sub-buffer switching. */ -int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) +int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx, + void *client_ctx) { struct channel *chan = ctx->chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -1919,7 +2117,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) do { ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets, - ctx); + ctx, client_ctx); if (unlikely(ret)) return ret; } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old, @@ -1980,6 +2178,33 @@ void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *c v_set(config, &buf->commit_hot[idx].seq, commit_count); } +/* + * The ring buffer can count events recorded and overwritten per buffer, + * but it is disabled by default due to its performance overhead. + */ +#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS +static +void deliver_count_events(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + unsigned long idx) +{ + v_add(config, subbuffer_get_records_count(config, + &buf->backend, idx), + &buf->records_count); + v_add(config, subbuffer_count_records_overrun(config, + &buf->backend, idx), + &buf->records_overrun); +} +#else /* LTTNG_RING_BUFFER_COUNT_EVENTS */ +static +void deliver_count_events(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + unsigned long idx) +{ +} +#endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */ + + void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, struct channel *chan, @@ -2032,15 +2257,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con * and any other writer trying to access this subbuffer * in this state is required to drop records. */ - v_add(config, - subbuffer_get_records_count(config, - &buf->backend, idx), - &buf->records_count); - v_add(config, - subbuffer_count_records_overrun(config, - &buf->backend, - idx), - &buf->records_overrun); + deliver_count_events(config, buf, idx); config->cb.buffer_end(buf, tsc, idx, lib_ring_buffer_get_data_size(config, buf,