X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=9d13d2920420ee6c525d1e22ac0558dc030b7b1f;hb=25337cb556cedb95ff65c1e91aab0dea6a3e77b4;hp=017d30f5ddf673c36f2932d2fe83581a35bea50c;hpb=dc5cd5702b74d72f0db0141c6d888a1d820aed9c;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 017d30f5..9d13d292 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -63,6 +63,7 @@ #include #include #include +#include /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -96,6 +97,47 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, enum switch_mode mode); +static +int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + struct channel *chan) +{ + unsigned long consumed_old, consumed_idx, commit_count, write_offset; + + consumed_old = atomic_long_read(&buf->consumed); + consumed_idx = subbuf_index(consumed_old, chan); + commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); + /* + * No memory barrier here, since we are only interested + * in a statistically correct polling result. The next poll will + * get the data is we are racing. The mb() that ensures correct + * memory order is in get_subbuf. + */ + write_offset = v_read(config, &buf->offset); + + /* + * Check that the subbuffer we are trying to consume has been + * already fully committed. + */ + + if (((commit_count - chan->backend.subbuf_size) + & chan->commit_count_mask) + - (buf_trunc(consumed_old, chan) + >> chan->backend.num_subbuf_order) + != 0) + return 0; + + /* + * Check that we are not about to read the same subbuffer in + * which the writer head is. + */ + if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_old, chan) + == 0) + return 0; + + return 1; +} + /* * Must be called under cpu hotplug protection. */ @@ -205,7 +247,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, kzalloc_node(ALIGN(sizeof(*buf->commit_hot) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); + GFP_KERNEL | __GFP_NOWARN, + cpu_to_node(max(cpu, 0))); if (!buf->commit_hot) { ret = -ENOMEM; goto free_chanbuf; @@ -215,7 +258,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, kzalloc_node(ALIGN(sizeof(*buf->commit_cold) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); + GFP_KERNEL | __GFP_NOWARN, + cpu_to_node(max(cpu, 0))); if (!buf->commit_cold) { ret = -ENOMEM; goto free_commit; @@ -281,7 +325,7 @@ static void switch_buffer_timer(unsigned long data) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->switch_timer, + lttng_mod_timer_pinned(&buf->switch_timer, jiffies + chan->switch_timer_interval); else mod_timer(&buf->switch_timer, @@ -298,7 +342,12 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; - init_timer(&buf->switch_timer); + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + lttng_init_timer_pinned(&buf->switch_timer); + else + init_timer(&buf->switch_timer); + buf->switch_timer.function = switch_buffer_timer; buf->switch_timer.expires = jiffies + chan->switch_timer_interval; buf->switch_timer.data = (unsigned long)buf; @@ -341,7 +390,7 @@ static void read_buffer_timer(unsigned long data) } if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->read_timer, + lttng_mod_timer_pinned(&buf->read_timer, jiffies + chan->read_timer_interval); else mod_timer(&buf->read_timer, @@ -361,7 +410,11 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) || buf->read_timer_enabled) return; - init_timer(&buf->read_timer); + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + lttng_init_timer_pinned(&buf->read_timer); + else + init_timer(&buf->read_timer); + buf->read_timer.function = read_buffer_timer; buf->read_timer.expires = jiffies + chan->read_timer_interval; buf->read_timer.data = (unsigned long)buf; @@ -904,15 +957,6 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, unsigned long consumed_cur, write_offset; int finalized; - /* - * First, ensure we perform a "final" flush onto the stream. This will - * ensure we create a packet of padding if we encounter an empty - * packet. This ensures the time-stamps right before the snapshot is - * used as end of packet timestamp. - */ - if (!buf->quiescent) - _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); - retry: finalized = ACCESS_ONCE(buf->finalized); /* @@ -1615,11 +1659,17 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); +struct switch_param { + struct lib_ring_buffer *buf; + enum switch_mode mode; +}; + static void remote_switch(void *info) { - struct lib_ring_buffer *buf = info; + struct switch_param *param = info; + struct lib_ring_buffer *buf = param->buf; - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, param->mode); } static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, @@ -1628,6 +1678,7 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; int ret; + struct switch_param param; /* * With global synchronization we don't need to use the IPI scheme. @@ -1648,8 +1699,10 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, * switch. */ get_online_cpus(); + param.buf = buf; + param.mode = mode; ret = smp_call_function_single(buf->backend.cpu, - remote_switch, buf, 1); + remote_switch, ¶m, 1); if (ret) { /* Remote CPU is offline, do it ourself. */ lib_ring_buffer_switch_slow(buf, mode); @@ -1663,6 +1716,13 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); +/* Switch sub-buffer even if current sub-buffer is empty. */ +void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty); + /* * Returns : * 0 if ok @@ -1912,6 +1972,147 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) } EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); +static +void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + unsigned long commit_count, + unsigned long idx) +{ + if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) + v_set(config, &buf->commit_hot[idx].seq, commit_count); +} + +/* + * The ring buffer can count events recorded and overwritten per buffer, + * but it is disabled by default due to its performance overhead. + */ +#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS +static +void deliver_count_events(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + unsigned long idx) +{ + v_add(config, subbuffer_get_records_count(config, + &buf->backend, idx), + &buf->records_count); + v_add(config, subbuffer_count_records_overrun(config, + &buf->backend, idx), + &buf->records_overrun); +} +#else /* LTTNG_RING_BUFFER_COUNT_EVENTS */ +static +void deliver_count_events(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + unsigned long idx) +{ +} +#endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */ + + +void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer *buf, + struct channel *chan, + unsigned long offset, + unsigned long commit_count, + unsigned long idx, + u64 tsc) +{ + unsigned long old_commit_count = commit_count + - chan->backend.subbuf_size; + + /* + * If we succeeded at updating cc_sb below, we are the subbuffer + * writer delivering the subbuffer. Deals with concurrent + * updates of the "cc" value without adding a add_return atomic + * operation to the fast path. + * + * We are doing the delivery in two steps: + * - First, we cmpxchg() cc_sb to the new value + * old_commit_count + 1. This ensures that we are the only + * subbuffer user successfully filling the subbuffer, but we + * do _not_ set the cc_sb value to "commit_count" yet. + * Therefore, other writers that would wrap around the ring + * buffer and try to start writing to our subbuffer would + * have to drop records, because it would appear as + * non-filled. + * We therefore have exclusive access to the subbuffer control + * structures. This mutual exclusion with other writers is + * crucially important to perform record overruns count in + * flight recorder mode locklessly. + * - When we are ready to release the subbuffer (either for + * reading or for overrun by other writers), we simply set the + * cc_sb value to "commit_count" and perform delivery. + * + * The subbuffer size is least 2 bytes (minimum size: 1 page). + * This guarantees that old_commit_count + 1 != commit_count. + */ + + /* + * Order prior updates to reserve count prior to the + * commit_cold cc_sb update. + */ + smp_wmb(); + if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, + old_commit_count, old_commit_count + 1) + == old_commit_count)) { + /* + * Start of exclusive subbuffer access. We are + * guaranteed to be the last writer in this subbuffer + * and any other writer trying to access this subbuffer + * in this state is required to drop records. + */ + deliver_count_events(config, buf, idx); + config->cb.buffer_end(buf, tsc, idx, + lib_ring_buffer_get_data_size(config, + buf, + idx)); + + /* + * Increment the packet counter while we have exclusive + * access. + */ + subbuffer_inc_packet_count(config, &buf->backend, idx); + + /* + * Set noref flag and offset for this subbuffer id. + * Contains a memory barrier that ensures counter stores + * are ordered before set noref and offset. + */ + lib_ring_buffer_set_noref_offset(config, &buf->backend, idx, + buf_trunc_val(offset, chan)); + + /* + * Order set_noref and record counter updates before the + * end of subbuffer exclusive access. Orders with + * respect to writers coming into the subbuffer after + * wrap around, and also order wrt concurrent readers. + */ + smp_mb(); + /* End of exclusive subbuffer access */ + v_set(config, &buf->commit_cold[idx].cc_sb, + commit_count); + /* + * Order later updates to reserve count after + * the commit_cold cc_sb update. + */ + smp_wmb(); + lib_ring_buffer_vmcore_check_deliver(config, buf, + commit_count, idx); + + /* + * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. + */ + if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER + && atomic_long_read(&buf->active_readers) + && lib_ring_buffer_poll_deliver(config, buf, chan)) { + wake_up_interruptible(&buf->read_wait); + wake_up_interruptible(&chan->read_wait); + } + + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_check_deliver_slow); + int __init init_lib_ring_buffer_frontend(void) { int cpu;