X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=b25ce7df6058d9dea6898062f4e6acbf6601c7b8;hb=9a9c9b6e05e8477c3edcf41a7f2697d1e588ce4d;hp=fc8d54139c446e0906b9b0560af3a2a568fa7992;hpb=dd5a0db3ea07c46bee3c1814ef7326736f38a06e;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index fc8d5413..b25ce7df 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -60,6 +60,8 @@ #include "../../wrapper/ringbuffer/frontend.h" #include "../../wrapper/ringbuffer/iterator.h" #include "../../wrapper/ringbuffer/nohz.h" +#include "../../wrapper/atomic.h" +#include "../../wrapper/percpu-defs.h" /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -89,6 +91,9 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); static void lib_ring_buffer_print_errors(struct channel *chan, struct lib_ring_buffer *buf, int cpu); +static +void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode); /* * Must be called under cpu hotplug protection. @@ -497,16 +502,16 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, raw_spin_unlock(&buf->raw_tick_nohz_spinlock); break; case TICK_NOHZ_STOP: - spin_lock(&__get_cpu_var(ring_buffer_nohz_lock)); + spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); - spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock)); + spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); break; case TICK_NOHZ_RESTART: - spin_lock(&__get_cpu_var(ring_buffer_nohz_lock)); + spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); lib_ring_buffer_start_read_timer(buf); lib_ring_buffer_start_switch_timer(buf); - spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock)); + spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); break; } @@ -583,6 +588,63 @@ static void channel_unregister_notifiers(struct channel *chan) channel_backend_unregister_notifiers(&chan->backend); } +static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf) +{ + if (!buf->quiescent) { + buf->quiescent = true; + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); + } +} + +static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf) +{ + buf->quiescent = false; +} + +void lib_ring_buffer_set_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_set_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_set_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel); + +void lib_ring_buffer_clear_quiescent_channel(struct channel *chan) +{ + int cpu; + const struct lib_ring_buffer_config *config = &chan->backend.config; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + get_online_cpus(); + for_each_channel_cpu(cpu, chan) { + struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, + cpu); + + lib_ring_buffer_clear_quiescent(buf); + } + put_online_cpus(); + } else { + struct lib_ring_buffer *buf = chan->backend.buf; + + lib_ring_buffer_clear_quiescent(buf); + } +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel); + static void channel_free(struct channel *chan) { if (chan->backend.release_priv_ops) { @@ -743,7 +805,7 @@ void *channel_destroy(struct channel *chan) chan->backend.priv, cpu); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -757,7 +819,7 @@ void *channel_destroy(struct channel *chan) if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ @@ -792,7 +854,7 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) return -EBUSY; kref_get(&chan->ref); - smp_mb__after_atomic_inc(); + lttng_smp_mb__after_atomic(); return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read); @@ -802,7 +864,7 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) struct channel *chan = buf->backend.chan; CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); - smp_mb__before_atomic_dec(); + lttng_smp_mb__before_atomic(); atomic_long_dec(&buf->active_readers); kref_put(&chan->ref, channel_release); } @@ -1227,7 +1289,8 @@ void lib_ring_buffer_print_errors(struct channel *chan, /* * lib_ring_buffer_switch_old_start: Populate old subbuffer header. * - * Only executed when the buffer is finalized, in SWITCH_FLUSH. + * Only executed by SWITCH_FLUSH, which can be issued while tracing is active + * or at buffer finalization (destroy). */ static void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, @@ -1261,8 +1324,8 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, commit_count, oldidx, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, - offsets->old, commit_count, - config->cb.subbuffer_header_size()); + offsets->old + config->cb.subbuffer_header_size(), + commit_count); } /* @@ -1305,8 +1368,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, commit_count, oldidx, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, - offsets->old, commit_count, - padding_size); + offsets->old + padding_size, commit_count); } /* @@ -1348,8 +1410,8 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, commit_count, beginidx, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx, - offsets->begin, commit_count, - config->cb.subbuffer_header_size()); + offsets->begin + config->cb.subbuffer_header_size(), + commit_count); } /* @@ -1419,12 +1481,14 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, unsigned long sb_index, commit_count; /* - * We are performing a SWITCH_FLUSH. At this stage, there are no - * concurrent writes into the buffer. + * We are performing a SWITCH_FLUSH. There may be concurrent + * writes into the buffer if e.g. invoked while performing a + * snapshot on an active trace. * - * The client does not save any header information. Don't - * switch empty subbuffer on finalize, because it is invalid to - * deliver a completely empty subbuffer. + * If the client does not save any header information (sub-buffer + * header size == 0), don't switch empty subbuffer on finalize, + * because it is invalid to deliver a completely empty + * subbuffer. */ if (!config->cb.subbuffer_header_size()) return -1; @@ -1538,24 +1602,32 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); +struct switch_param { + struct lib_ring_buffer *buf; + enum switch_mode mode; +}; + static void remote_switch(void *info) { - struct lib_ring_buffer *buf = info; + struct switch_param *param = info; + struct lib_ring_buffer *buf = param->buf; - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, param->mode); } -void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf, + enum switch_mode mode) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; int ret; + struct switch_param param; /* * With global synchronization we don't need to use the IPI scheme. */ if (config->sync == RING_BUFFER_SYNC_GLOBAL) { - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); return; } @@ -1570,16 +1642,30 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) * switch. */ get_online_cpus(); + param.buf = buf; + param.mode = mode; ret = smp_call_function_single(buf->backend.cpu, - remote_switch, buf, 1); + remote_switch, ¶m, 1); if (ret) { /* Remote CPU is offline, do it ourself. */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, mode); } put_online_cpus(); } + +void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE); +} EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); +/* Switch sub-buffer even if current sub-buffer is empty. */ +void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf) +{ + _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty); + /* * Returns : * 0 if ok