X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=c5123a6415e39afc75cc55b8208b9b6924fbf975;hb=aece661f429d4ca54cf1e47bacc556679c28f342;hp=dbe52c157c71be0ec560bbc6b97dc77d1b1e27ea;hpb=9c1f4643eb4a11d451a979d81389f0c2ff666af2;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index dbe52c15..c5123a64 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -55,14 +55,15 @@ #include #include -#include "../../wrapper/ringbuffer/config.h" -#include "../../wrapper/ringbuffer/backend.h" -#include "../../wrapper/ringbuffer/frontend.h" -#include "../../wrapper/ringbuffer/iterator.h" -#include "../../wrapper/ringbuffer/nohz.h" -#include "../../wrapper/atomic.h" -#include "../../wrapper/kref.h" -#include "../../wrapper/percpu-defs.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -92,6 +93,50 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); static void lib_ring_buffer_print_errors(struct channel *chan, struct lib_ring_buffer *buf, int cpu); +static +void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode); + +static +int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + struct channel *chan) +{ + unsigned long consumed_old, consumed_idx, commit_count, write_offset; + + consumed_old = atomic_long_read(&buf->consumed); + consumed_idx = subbuf_index(consumed_old, chan); + commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); + /* + * No memory barrier here, since we are only interested + * in a statistically correct polling result. The next poll will + * get the data is we are racing. The mb() that ensures correct + * memory order is in get_subbuf. + */ + write_offset = v_read(config, &buf->offset); + + /* + * Check that the subbuffer we are trying to consume has been + * already fully committed. + */ + + if (((commit_count - chan->backend.subbuf_size) + & chan->commit_count_mask) + - (buf_trunc(consumed_old, chan) + >> chan->backend.num_subbuf_order) + != 0) + return 0; + + /* + * Check that we are not about to read the same subbuffer in + * which the writer head is. + */ + if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_old, chan) + == 0) + return 0; + + return 1; +} /* * Must be called under cpu hotplug protection. @@ -278,7 +323,7 @@ static void switch_buffer_timer(unsigned long data) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->switch_timer, + lttng_mod_timer_pinned(&buf->switch_timer, jiffies + chan->switch_timer_interval); else mod_timer(&buf->switch_timer, @@ -295,7 +340,12 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; - init_timer(&buf->switch_timer); + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + lttng_init_timer_pinned(&buf->switch_timer); + else + init_timer(&buf->switch_timer); + buf->switch_timer.function = switch_buffer_timer; buf->switch_timer.expires = jiffies + chan->switch_timer_interval; buf->switch_timer.data = (unsigned long)buf; @@ -338,7 +388,7 @@ static void read_buffer_timer(unsigned long data) } if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->read_timer, + lttng_mod_timer_pinned(&buf->read_timer, jiffies + chan->read_timer_interval); else mod_timer(&buf->read_timer, @@ -358,7 +408,11 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) || buf->read_timer_enabled) return; - init_timer(&buf->read_timer); + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + lttng_init_timer_pinned(&buf->read_timer); + else + init_timer(&buf->read_timer); + buf->read_timer.function = read_buffer_timer; buf->read_timer.expires = jiffies + chan->read_timer_interval; buf->read_timer.data = (unsigned long)buf; @@ -586,6 +640,63 @@ static void channel_unregister_notifiers(struct channel *chan) channel_backend_unregister_notifiers(&chan->backend); } +static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf) +{ + if (!buf->quiescent) { + buf->quiescent = true; + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); + } +} + +static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf) +{ + buf->quiescent = false; +} + +void lib_ring_buffer_set_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_set_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_set_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel); + +void lib_ring_buffer_clear_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_clear_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_clear_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel); + static void channel_free(struct channel *chan) { if (chan->backend.release_priv_ops) { @@ -746,7 +857,7 @@ void *channel_destroy(struct channel *chan) chan->backend.priv, cpu); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -760,7 +871,7 @@ void *channel_destroy(struct channel *chan) if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -1233,7 +1344,8 @@ void lib_ring_buffer_print_errors(struct channel *chan, /* * lib_ring_buffer_switch_old_start: Populate old subbuffer header. * - * Only executed when the buffer is finalized, in SWITCH_FLUSH. + * Only executed by SWITCH_FLUSH, which can be issued while tracing is active + * or at buffer finalization (destroy). */ static void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, @@ -1424,12 +1536,14 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, unsigned long sb_index, commit_count; /* - * We are performing a SWITCH_FLUSH. At this stage, there are no - * concurrent writes into the buffer. + * We are performing a SWITCH_FLUSH. There may be concurrent + * writes into the buffer if e.g. invoked while performing a + * snapshot on an active trace. * - * The client does not save any header information. Don't - * switch empty subbuffer on finalize, because it is invalid to - * deliver a completely empty subbuffer. + * If the client does not save any header information (sub-buffer + * header size == 0), don't switch empty subbuffer on finalize, + * because it is invalid to deliver a completely empty + * subbuffer. */ if (!config->cb.subbuffer_header_size()) return -1; @@ -1543,24 +1657,32 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); +struct switch_param { + struct lib_ring_buffer *buf; + enum switch_mode mode; +}; + static void remote_switch(void *info) { - struct lib_ring_buffer *buf = info; + struct switch_param *param = info; + struct lib_ring_buffer *buf = param->buf; - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, param->mode); } -void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; int ret; + struct switch_param param; /* * With global synchronization we don't need to use the IPI scheme. */ if (config->sync == RING_BUFFER_SYNC_GLOBAL) { - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); return; } @@ -1575,16 +1697,30 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) * switch. */ get_online_cpus(); + param.buf = buf; + param.mode = mode; ret = smp_call_function_single(buf->backend.cpu, - remote_switch, buf, 1); + remote_switch, ¶m, 1); if (ret) { /* Remote CPU is offline, do it ourself. */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); } put_online_cpus(); } + +void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE); +} EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); +/* Switch sub-buffer even if current sub-buffer is empty. */ +void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty); + /* * Returns : * 0 if ok @@ -1743,6 +1879,25 @@ retry: return 0; } +static struct lib_ring_buffer *get_current_buf(struct channel *chan, int cpu) +{ + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + return per_cpu_ptr(chan->backend.buf, cpu); + else + return chan->backend.buf; +} + +void lib_ring_buffer_lost_event_too_big(struct channel *chan) +{ + const struct lib_ring_buffer_config *config = &chan->backend.config; + struct lib_ring_buffer *buf = get_current_buf(chan, smp_processor_id()); + + v_inc(config, &buf->records_lost_big); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big); + /** * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer. * @ctx: ring buffer context. @@ -1759,12 +1914,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) struct switch_offsets offsets; int ret; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - buf = per_cpu_ptr(chan->backend.buf, ctx->cpu); - else - buf = chan->backend.buf; - ctx->buf = buf; - + ctx->buf = buf = get_current_buf(chan, ctx->cpu); offsets.size = 0; do { @@ -1820,6 +1970,128 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) } EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); +static +void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + unsigned long commit_count, + unsigned long idx) +{ + if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) + v_set(config, &buf->commit_hot[idx].seq, commit_count); +} + +void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + struct channel *chan, + unsigned long offset, + unsigned long commit_count, + unsigned long idx, + u64 tsc) +{ + unsigned long old_commit_count = commit_count + - chan->backend.subbuf_size; + + /* + * If we succeeded at updating cc_sb below, we are the subbuffer + * writer delivering the subbuffer. Deals with concurrent + * updates of the "cc" value without adding a add_return atomic + * operation to the fast path. + * + * We are doing the delivery in two steps: + * - First, we cmpxchg() cc_sb to the new value + * old_commit_count + 1. This ensures that we are the only + * subbuffer user successfully filling the subbuffer, but we + * do _not_ set the cc_sb value to "commit_count" yet. + * Therefore, other writers that would wrap around the ring + * buffer and try to start writing to our subbuffer would + * have to drop records, because it would appear as + * non-filled. + * We therefore have exclusive access to the subbuffer control + * structures. This mutual exclusion with other writers is + * crucially important to perform record overruns count in + * flight recorder mode locklessly. + * - When we are ready to release the subbuffer (either for + * reading or for overrun by other writers), we simply set the + * cc_sb value to "commit_count" and perform delivery. + * + * The subbuffer size is least 2 bytes (minimum size: 1 page). + * This guarantees that old_commit_count + 1 != commit_count. + */ + + /* + * Order prior updates to reserve count prior to the + * commit_cold cc_sb update. + */ + smp_wmb(); + if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, + old_commit_count, old_commit_count + 1) + == old_commit_count)) { + /* + * Start of exclusive subbuffer access. We are + * guaranteed to be the last writer in this subbuffer + * and any other writer trying to access this subbuffer + * in this state is required to drop records. + */ + v_add(config, + subbuffer_get_records_count(config, + &buf->backend, idx), + &buf->records_count); + v_add(config, + subbuffer_count_records_overrun(config, + &buf->backend, + idx), + &buf->records_overrun); + config->cb.buffer_end(buf, tsc, idx, + lib_ring_buffer_get_data_size(config, + buf, + idx)); + + /* + * Increment the packet counter while we have exclusive + * access. + */ + subbuffer_inc_packet_count(config, &buf->backend, idx); + + /* + * Set noref flag and offset for this subbuffer id. + * Contains a memory barrier that ensures counter stores + * are ordered before set noref and offset. + */ + lib_ring_buffer_set_noref_offset(config, &buf->backend, idx, + buf_trunc_val(offset, chan)); + + /* + * Order set_noref and record counter updates before the + * end of subbuffer exclusive access. Orders with + * respect to writers coming into the subbuffer after + * wrap around, and also order wrt concurrent readers. + */ + smp_mb(); + /* End of exclusive subbuffer access */ + v_set(config, &buf->commit_cold[idx].cc_sb, + commit_count); + /* + * Order later updates to reserve count after + * the commit_cold cc_sb update. + */ + smp_wmb(); + lib_ring_buffer_vmcore_check_deliver(config, buf, + commit_count, idx); + + /* + * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. + */ + if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER + && atomic_long_read(&buf->active_readers) + && lib_ring_buffer_poll_deliver(config, buf, chan)) { + wake_up_interruptible(&buf->read_wait); + wake_up_interruptible(&chan->read_wait); + } + + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_check_deliver_slow); + int __init init_lib_ring_buffer_frontend(void) { int cpu;