#include <linux/module.h>
#include <linux/percpu.h>
-#include "../../wrapper/ringbuffer/config.h"
-#include "../../wrapper/ringbuffer/backend.h"
-#include "../../wrapper/ringbuffer/frontend.h"
-#include "../../wrapper/ringbuffer/iterator.h"
-#include "../../wrapper/ringbuffer/nohz.h"
+#include <wrapper/ringbuffer/config.h>
+#include <wrapper/ringbuffer/backend.h>
+#include <wrapper/ringbuffer/frontend.h>
+#include <wrapper/ringbuffer/iterator.h>
+#include <wrapper/ringbuffer/nohz.h>
+#include <wrapper/atomic.h>
+#include <wrapper/kref.h>
+#include <wrapper/percpu-defs.h>
+#include <wrapper/timer.h>
/*
* Internal structure representing offsets to use at a sub-buffer switch.
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode);
/*
* Must be called under cpu hotplug protection.
kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(cpu, 0)));
+ GFP_KERNEL | __GFP_NOWARN,
+ cpu_to_node(max(cpu, 0)));
if (!buf->commit_hot) {
ret = -ENOMEM;
goto free_chanbuf;
kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(cpu, 0)));
+ GFP_KERNEL | __GFP_NOWARN,
+ cpu_to_node(max(cpu, 0)));
if (!buf->commit_cold) {
ret = -ENOMEM;
goto free_commit;
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->switch_timer,
+ lttng_mod_timer_pinned(&buf->switch_timer,
jiffies + chan->switch_timer_interval);
else
mod_timer(&buf->switch_timer,
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
return;
- init_timer(&buf->switch_timer);
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->switch_timer);
+ else
+ init_timer(&buf->switch_timer);
+
buf->switch_timer.function = switch_buffer_timer;
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
buf->switch_timer.data = (unsigned long)buf;
}
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->read_timer,
+ lttng_mod_timer_pinned(&buf->read_timer,
jiffies + chan->read_timer_interval);
else
mod_timer(&buf->read_timer,
|| buf->read_timer_enabled)
return;
- init_timer(&buf->read_timer);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->read_timer);
+ else
+ init_timer(&buf->read_timer);
+
buf->read_timer.function = read_buffer_timer;
buf->read_timer.expires = jiffies + chan->read_timer_interval;
buf->read_timer.data = (unsigned long)buf;
buf->read_timer_enabled = 0;
}
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+
+enum cpuhp_state lttng_rb_hp_prepare;
+enum cpuhp_state lttng_rb_hp_online;
+
+void lttng_rb_set_hp_prepare(enum cpuhp_state val)
+{
+ lttng_rb_hp_prepare = val;
+}
+EXPORT_SYMBOL_GPL(lttng_rb_set_hp_prepare);
+
+void lttng_rb_set_hp_online(enum cpuhp_state val)
+{
+ lttng_rb_hp_online = val;
+}
+EXPORT_SYMBOL_GPL(lttng_rb_set_hp_online);
+
+int lttng_cpuhp_rb_frontend_dead(unsigned int cpu,
+ struct lttng_cpuhp_node *node)
+{
+ struct channel *chan = container_of(node, struct channel,
+ cpuhp_prepare);
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ /*
+ * Performing a buffer switch on a remote CPU. Performed by
+ * the CPU responsible for doing the hotunplug after the target
+ * CPU stopped running completely. Ensures that all data
+ * from that remote CPU is flushed.
+ */
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_dead);
+
+int lttng_cpuhp_rb_frontend_online(unsigned int cpu,
+ struct lttng_cpuhp_node *node)
+{
+ struct channel *chan = container_of(node, struct channel,
+ cpuhp_online);
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ wake_up_interruptible(&chan->hp_wait);
+ lib_ring_buffer_start_switch_timer(buf);
+ lib_ring_buffer_start_read_timer(buf);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_online);
+
+int lttng_cpuhp_rb_frontend_offline(unsigned int cpu,
+ struct lttng_cpuhp_node *node)
+{
+ struct channel *chan = container_of(node, struct channel,
+ cpuhp_online);
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline);
+
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
#ifdef CONFIG_HOTPLUG_CPU
+
/**
* lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
* @nb: notifier block
return NOTIFY_DONE;
}
}
+
#endif
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
/*
* For per-cpu buffers, call the reader wakeups before switching the buffer, so
raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
break;
case TICK_NOHZ_STOP:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_stop_switch_timer(buf);
lib_ring_buffer_stop_read_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
case TICK_NOHZ_RESTART:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_start_read_timer(buf);
lib_ring_buffer_start_switch_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
}
static void channel_unregister_notifiers(struct channel *chan)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- int cpu;
channel_iterator_unregister_notifiers(chan);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
* concurrency.
*/
#endif /* CONFIG_NO_HZ */
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+ {
+ int ret;
+
+ ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
+ &chan->cpuhp_online.node);
+ WARN_ON(ret);
+ ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
+ &chan->cpuhp_prepare.node);
+ WARN_ON(ret);
+ }
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+ {
+ int cpu;
+
#ifdef CONFIG_HOTPLUG_CPU
+ get_online_cpus();
+ chan->cpu_hp_enable = 0;
+ for_each_online_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ }
+ put_online_cpus();
+ unregister_cpu_notifier(&chan->cpu_hp_notifier);
+#else
+ for_each_possible_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ }
+#endif
+ }
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ }
+ channel_backend_unregister_notifiers(&chan->backend);
+}
+
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+ if (!buf->quiescent) {
+ buf->quiescent = true;
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+ }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+ buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
get_online_cpus();
- chan->cpu_hp_enable = 0;
- for_each_online_cpu(cpu) {
+ for_each_channel_cpu(cpu, chan) {
struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
cpu);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
+
+ lib_ring_buffer_set_quiescent(buf);
}
put_online_cpus();
- unregister_cpu_notifier(&chan->cpu_hp_notifier);
-#else
- for_each_possible_cpu(cpu) {
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
cpu);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
+
+ lib_ring_buffer_clear_quiescent(buf);
}
-#endif
+ put_online_cpus();
} else {
struct lib_ring_buffer *buf = chan->backend.buf;
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
+ lib_ring_buffer_clear_quiescent(buf);
}
- channel_backend_unregister_notifiers(&chan->backend);
}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
static void channel_free(struct channel *chan)
{
+ if (chan->backend.release_priv_ops) {
+ chan->backend.release_priv_ops(chan->backend.priv_ops);
+ }
channel_iterator_free(chan);
channel_backend_free(&chan->backend);
kfree(chan);
size_t num_subbuf, unsigned int switch_timer_interval,
unsigned int read_timer_interval)
{
- int ret, cpu;
+ int ret;
struct channel *chan;
if (lib_ring_buffer_check_config(config, switch_timer_interval,
init_waitqueue_head(&chan->hp_wait);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+ chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND;
+ ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare,
+ &chan->cpuhp_prepare.node);
+ if (ret)
+ goto cpuhp_prepare_error;
+
+ chan->cpuhp_online.component = LTTNG_RING_BUFFER_FRONTEND;
+ ret = cpuhp_state_add_instance(lttng_rb_hp_online,
+ &chan->cpuhp_online.node);
+ if (ret)
+ goto cpuhp_online_error;
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+ {
+ int cpu;
+ /*
+ * In case of non-hotplug cpu, if the ring-buffer is allocated
+ * in early initcall, it will not be notified of secondary cpus.
+ * In that off case, we need to allocate for all possible cpus.
+ */
+#ifdef CONFIG_HOTPLUG_CPU
+ chan->cpu_hp_notifier.notifier_call =
+ lib_ring_buffer_cpu_hp_callback;
+ chan->cpu_hp_notifier.priority = 6;
+ register_cpu_notifier(&chan->cpu_hp_notifier);
+
+ get_online_cpus();
+ for_each_online_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ lib_ring_buffer_start_switch_timer(buf);
+ lib_ring_buffer_start_read_timer(buf);
+ spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ }
+ chan->cpu_hp_enable = 1;
+ put_online_cpus();
+#else
+ for_each_possible_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ lib_ring_buffer_start_switch_timer(buf);
+ lib_ring_buffer_start_read_timer(buf);
+ spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ }
+#endif
+ }
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
/* Only benefit from NO_HZ idle with per-cpu buffers for now. */
chan->tick_nohz_notifier.notifier_call =
&chan->tick_nohz_notifier);
#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
- /*
- * In case of non-hotplug cpu, if the ring-buffer is allocated
- * in early initcall, it will not be notified of secondary cpus.
- * In that off case, we need to allocate for all possible cpus.
- */
-#ifdef CONFIG_HOTPLUG_CPU
- chan->cpu_hp_notifier.notifier_call =
- lib_ring_buffer_cpu_hp_callback;
- chan->cpu_hp_notifier.priority = 6;
- register_cpu_notifier(&chan->cpu_hp_notifier);
-
- get_online_cpus();
- for_each_online_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
- }
- chan->cpu_hp_enable = 1;
- put_online_cpus();
-#else
- for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
- }
-#endif
} else {
struct lib_ring_buffer *buf = chan->backend.buf;
return chan;
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+cpuhp_online_error:
+ ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
+ &chan->cpuhp_prepare.node);
+ WARN_ON(ret);
+cpuhp_prepare_error:
+#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
error_free_backend:
channel_backend_free(&chan->backend);
error:
chan->backend.priv,
cpu);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
return -EBUSY;
- kref_get(&chan->ref);
- smp_mb__after_atomic_inc();
+ if (!lttng_kref_get(&chan->ref)) {
+ atomic_long_dec(&buf->active_readers);
+ return -EOVERFLOW;
+ }
+ lttng_smp_mb__after_atomic();
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
struct channel *chan = buf->backend.chan;
CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
- smp_mb__before_atomic_dec();
+ lttng_smp_mb__before_atomic();
atomic_long_dec(&buf->active_readers);
kref_put(&chan->ref, channel_release);
}
*/
if (((commit_count - chan->backend.subbuf_size)
& chan->commit_count_mask)
- - (buf_trunc(consumed_cur, chan)
+ - (buf_trunc(consumed, chan)
>> chan->backend.num_subbuf_order)
!= 0)
goto nodata;
* Check that we are not about to read the same subbuffer in
* which the writer head is.
*/
- if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
+ if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan)
== 0)
goto nodata;
commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
- commit_count, oldidx);
+ commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->old + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
- commit_count, oldidx);
+ commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- padding_size);
+ offsets->old + padding_size, commit_count);
}
/*
commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
- commit_count, beginidx);
+ commit_count, beginidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
- offsets->begin, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->begin + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
u64 *tsc)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- unsigned long off;
+ unsigned long off, reserve_commit_diff;
offsets->begin = v_read(config, &buf->offset);
offsets->old = offsets->begin;
* (records and header timestamps) are visible to the reader. This is
* required for quiescence guarantees for the fusion merge.
*/
- if (mode == SWITCH_FLUSH || off > 0) {
- if (unlikely(off == 0)) {
- /*
- * A final flush that encounters an empty
- * sub-buffer cannot switch buffer if a
- * reader is located within this sub-buffer.
- * Anyway, the purpose of final flushing of a
- * sub-buffer at offset 0 is to handle the case
- * of entirely empty stream.
- */
- if (unlikely(subbuf_trunc(offsets->begin, chan)
- - subbuf_trunc((unsigned long)
- atomic_long_read(&buf->consumed), chan)
- >= chan->backend.buf_size))
- return -1;
- /*
- * The client does not save any header information.
- * Don't switch empty subbuffer on finalize, because it
- * is invalid to deliver a completely empty subbuffer.
- */
- if (!config->cb.subbuffer_header_size())
+ if (mode != SWITCH_FLUSH && !off)
+ return -1; /* we do not have to switch : buffer is empty */
+
+ if (unlikely(off == 0)) {
+ unsigned long sb_index, commit_count;
+
+ /*
+ * We are performing a SWITCH_FLUSH. At this stage, there are no
+ * concurrent writes into the buffer.
+ *
+ * The client does not save any header information. Don't
+ * switch empty subbuffer on finalize, because it is invalid to
+ * deliver a completely empty subbuffer.
+ */
+ if (!config->cb.subbuffer_header_size())
+ return -1;
+
+ /* Test new buffer integrity */
+ sb_index = subbuf_index(offsets->begin, chan);
+ commit_count = v_read(config,
+ &buf->commit_cold[sb_index].cc_sb);
+ reserve_commit_diff =
+ (buf_trunc(offsets->begin, chan)
+ >> chan->backend.num_subbuf_order)
+ - (commit_count & chan->commit_count_mask);
+ if (likely(reserve_commit_diff == 0)) {
+ /* Next subbuffer not being written to. */
+ if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
+ subbuf_trunc(offsets->begin, chan)
+ - subbuf_trunc((unsigned long)
+ atomic_long_read(&buf->consumed), chan)
+ >= chan->backend.buf_size)) {
+ /*
+ * We do not overwrite non consumed buffers
+ * and we are full : don't switch.
+ */
return -1;
+ } else {
+ /*
+ * Next subbuffer not being written to, and we
+ * are either in overwrite mode or the buffer is
+ * not full. It's safe to write in this new
+ * subbuffer.
+ */
+ }
+ } else {
/*
- * Need to write the subbuffer start header on finalize.
+ * Next subbuffer reserve offset does not match the
+ * commit offset. Don't perform switch in
+ * producer-consumer and overwrite mode. Caused by
+ * either a writer OOPS or too many nested writes over a
+ * reserve/commit pair.
*/
- offsets->switch_old_start = 1;
+ return -1;
}
- offsets->begin = subbuf_align(offsets->begin, chan);
- } else
- return -1; /* we do not have to switch : buffer is empty */
+
+ /*
+ * Need to write the subbuffer start header on finalize.
+ */
+ offsets->switch_old_start = 1;
+ }
+ offsets->begin = subbuf_align(offsets->begin, chan);
/* Note: old points to the next subbuf at offset 0 */
offsets->end = offsets->begin;
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
+struct switch_param {
+ struct lib_ring_buffer *buf;
+ enum switch_mode mode;
+};
+
static void remote_switch(void *info)
{
- struct lib_ring_buffer *buf = info;
+ struct switch_param *param = info;
+ struct lib_ring_buffer *buf = param->buf;
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, param->mode);
}
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode)
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
int ret;
+ struct switch_param param;
/*
* With global synchronization we don't need to use the IPI scheme.
*/
if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
return;
}
* switch.
*/
get_online_cpus();
+ param.buf = buf;
+ param.mode = mode;
ret = smp_call_function_single(buf->backend.cpu,
- remote_switch, buf, 1);
+ remote_switch, ¶m, 1);
if (ret) {
/* Remote CPU is offline, do it ourself. */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
}
put_online_cpus();
}
+
+/* Switch sub-buffer if current sub-buffer is non-empty. */
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
+/* Switch sub-buffer even if current sub-buffer is empty. */
+void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
+
/*
* Returns :
* 0 if ok