#include <linux/module.h>
#include <linux/percpu.h>
-#include "../../wrapper/ringbuffer/config.h"
-#include "../../wrapper/ringbuffer/backend.h"
-#include "../../wrapper/ringbuffer/frontend.h"
-#include "../../wrapper/ringbuffer/iterator.h"
-#include "../../wrapper/ringbuffer/nohz.h"
+#include <wrapper/ringbuffer/config.h>
+#include <wrapper/ringbuffer/backend.h>
+#include <wrapper/ringbuffer/frontend.h>
+#include <wrapper/ringbuffer/iterator.h>
+#include <wrapper/ringbuffer/nohz.h>
+#include <wrapper/atomic.h>
+#include <wrapper/kref.h>
+#include <wrapper/percpu-defs.h>
+#include <wrapper/timer.h>
/*
* Internal structure representing offsets to use at a sub-buffer switch.
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode);
+
+static
+int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config,
+ struct lib_ring_buffer *buf,
+ struct channel *chan)
+{
+ unsigned long consumed_old, consumed_idx, commit_count, write_offset;
+
+ consumed_old = atomic_long_read(&buf->consumed);
+ consumed_idx = subbuf_index(consumed_old, chan);
+ commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
+ /*
+ * No memory barrier here, since we are only interested
+ * in a statistically correct polling result. The next poll will
+ * get the data is we are racing. The mb() that ensures correct
+ * memory order is in get_subbuf.
+ */
+ write_offset = v_read(config, &buf->offset);
+
+ /*
+ * Check that the subbuffer we are trying to consume has been
+ * already fully committed.
+ */
+
+ if (((commit_count - chan->backend.subbuf_size)
+ & chan->commit_count_mask)
+ - (buf_trunc(consumed_old, chan)
+ >> chan->backend.num_subbuf_order)
+ != 0)
+ return 0;
+
+ /*
+ * Check that we are not about to read the same subbuffer in
+ * which the writer head is.
+ */
+ if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_old, chan)
+ == 0)
+ return 0;
+
+ return 1;
+}
/*
* Must be called under cpu hotplug protection.
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->switch_timer,
+ lttng_mod_timer_pinned(&buf->switch_timer,
jiffies + chan->switch_timer_interval);
else
mod_timer(&buf->switch_timer,
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
return;
- init_timer(&buf->switch_timer);
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->switch_timer);
+ else
+ init_timer(&buf->switch_timer);
+
buf->switch_timer.function = switch_buffer_timer;
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
buf->switch_timer.data = (unsigned long)buf;
}
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->read_timer,
+ lttng_mod_timer_pinned(&buf->read_timer,
jiffies + chan->read_timer_interval);
else
mod_timer(&buf->read_timer,
|| buf->read_timer_enabled)
return;
- init_timer(&buf->read_timer);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->read_timer);
+ else
+ init_timer(&buf->read_timer);
+
buf->read_timer.function = read_buffer_timer;
buf->read_timer.expires = jiffies + chan->read_timer_interval;
buf->read_timer.data = (unsigned long)buf;
raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
break;
case TICK_NOHZ_STOP:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_stop_switch_timer(buf);
lib_ring_buffer_stop_read_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
case TICK_NOHZ_RESTART:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_start_read_timer(buf);
lib_ring_buffer_start_switch_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
}
channel_backend_unregister_notifiers(&chan->backend);
}
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+ if (!buf->quiescent) {
+ buf->quiescent = true;
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+ }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+ buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
static void channel_free(struct channel *chan)
{
+ if (chan->backend.release_priv_ops) {
+ chan->backend.release_priv_ops(chan->backend.priv_ops);
+ }
channel_iterator_free(chan);
channel_backend_free(&chan->backend);
kfree(chan);
chan->backend.priv,
cpu);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
return -EBUSY;
- kref_get(&chan->ref);
- smp_mb__after_atomic_inc();
+ if (!lttng_kref_get(&chan->ref)) {
+ atomic_long_dec(&buf->active_readers);
+ return -EOVERFLOW;
+ }
+ lttng_smp_mb__after_atomic();
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
struct channel *chan = buf->backend.chan;
CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
- smp_mb__before_atomic_dec();
+ lttng_smp_mb__before_atomic();
atomic_long_dec(&buf->active_readers);
kref_put(&chan->ref, channel_release);
}
*/
if (((commit_count - chan->backend.subbuf_size)
& chan->commit_count_mask)
- - (buf_trunc(consumed_cur, chan)
+ - (buf_trunc(consumed, chan)
>> chan->backend.num_subbuf_order)
!= 0)
goto nodata;
* Check that we are not about to read the same subbuffer in
* which the writer head is.
*/
- if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
+ if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan)
== 0)
goto nodata;
/*
* lib_ring_buffer_switch_old_start: Populate old subbuffer header.
*
- * Only executed when the buffer is finalized, in SWITCH_FLUSH.
+ * Only executed by SWITCH_FLUSH, which can be issued while tracing is active
+ * or at buffer finalization (destroy).
*/
static
void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
- commit_count, oldidx);
+ commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->old + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
- commit_count, oldidx);
+ commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- padding_size);
+ offsets->old + padding_size, commit_count);
}
/*
commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
- commit_count, beginidx);
+ commit_count, beginidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
- offsets->begin, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->begin + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
u64 *tsc)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- unsigned long off;
+ unsigned long off, reserve_commit_diff;
offsets->begin = v_read(config, &buf->offset);
offsets->old = offsets->begin;
* (records and header timestamps) are visible to the reader. This is
* required for quiescence guarantees for the fusion merge.
*/
- if (mode == SWITCH_FLUSH || off > 0) {
- if (unlikely(off == 0)) {
- /*
- * A final flush that encounters an empty
- * sub-buffer cannot switch buffer if a
- * reader is located within this sub-buffer.
- * Anyway, the purpose of final flushing of a
- * sub-buffer at offset 0 is to handle the case
- * of entirely empty stream.
- */
- if (unlikely(subbuf_trunc(offsets->begin, chan)
- - subbuf_trunc((unsigned long)
- atomic_long_read(&buf->consumed), chan)
- >= chan->backend.buf_size))
- return -1;
- /*
- * The client does not save any header information.
- * Don't switch empty subbuffer on finalize, because it
- * is invalid to deliver a completely empty subbuffer.
- */
- if (!config->cb.subbuffer_header_size())
+ if (mode != SWITCH_FLUSH && !off)
+ return -1; /* we do not have to switch : buffer is empty */
+
+ if (unlikely(off == 0)) {
+ unsigned long sb_index, commit_count;
+
+ /*
+ * We are performing a SWITCH_FLUSH. There may be concurrent
+ * writes into the buffer if e.g. invoked while performing a
+ * snapshot on an active trace.
+ *
+ * If the client does not save any header information (sub-buffer
+ * header size == 0), don't switch empty subbuffer on finalize,
+ * because it is invalid to deliver a completely empty
+ * subbuffer.
+ */
+ if (!config->cb.subbuffer_header_size())
+ return -1;
+
+ /* Test new buffer integrity */
+ sb_index = subbuf_index(offsets->begin, chan);
+ commit_count = v_read(config,
+ &buf->commit_cold[sb_index].cc_sb);
+ reserve_commit_diff =
+ (buf_trunc(offsets->begin, chan)
+ >> chan->backend.num_subbuf_order)
+ - (commit_count & chan->commit_count_mask);
+ if (likely(reserve_commit_diff == 0)) {
+ /* Next subbuffer not being written to. */
+ if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
+ subbuf_trunc(offsets->begin, chan)
+ - subbuf_trunc((unsigned long)
+ atomic_long_read(&buf->consumed), chan)
+ >= chan->backend.buf_size)) {
+ /*
+ * We do not overwrite non consumed buffers
+ * and we are full : don't switch.
+ */
return -1;
+ } else {
+ /*
+ * Next subbuffer not being written to, and we
+ * are either in overwrite mode or the buffer is
+ * not full. It's safe to write in this new
+ * subbuffer.
+ */
+ }
+ } else {
/*
- * Need to write the subbuffer start header on finalize.
+ * Next subbuffer reserve offset does not match the
+ * commit offset. Don't perform switch in
+ * producer-consumer and overwrite mode. Caused by
+ * either a writer OOPS or too many nested writes over a
+ * reserve/commit pair.
*/
- offsets->switch_old_start = 1;
+ return -1;
}
- offsets->begin = subbuf_align(offsets->begin, chan);
- } else
- return -1; /* we do not have to switch : buffer is empty */
+
+ /*
+ * Need to write the subbuffer start header on finalize.
+ */
+ offsets->switch_old_start = 1;
+ }
+ offsets->begin = subbuf_align(offsets->begin, chan);
/* Note: old points to the next subbuf at offset 0 */
offsets->end = offsets->begin;
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
+struct switch_param {
+ struct lib_ring_buffer *buf;
+ enum switch_mode mode;
+};
+
static void remote_switch(void *info)
{
- struct lib_ring_buffer *buf = info;
+ struct switch_param *param = info;
+ struct lib_ring_buffer *buf = param->buf;
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, param->mode);
}
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode)
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
int ret;
+ struct switch_param param;
/*
* With global synchronization we don't need to use the IPI scheme.
*/
if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
return;
}
* switch.
*/
get_online_cpus();
+ param.buf = buf;
+ param.mode = mode;
ret = smp_call_function_single(buf->backend.cpu,
- remote_switch, buf, 1);
+ remote_switch, ¶m, 1);
if (ret) {
/* Remote CPU is offline, do it ourself. */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
}
put_online_cpus();
}
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
+/* Switch sub-buffer even if current sub-buffer is empty. */
+void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
+
/*
* Returns :
* 0 if ok
struct lib_ring_buffer_ctx *ctx)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- unsigned long reserve_commit_diff;
+ unsigned long reserve_commit_diff, offset_cmp;
- offsets->begin = v_read(config, &buf->offset);
+retry:
+ offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->old = offsets->begin;
offsets->switch_new_start = 0;
offsets->switch_new_end = 0;
}
}
if (unlikely(offsets->switch_new_start)) {
- unsigned long sb_index;
+ unsigned long sb_index, commit_count;
/*
* We are typically not filling the previous buffer completely.
+ config->cb.subbuffer_header_size();
/* Test new buffer integrity */
sb_index = subbuf_index(offsets->begin, chan);
+ /*
+ * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+ * lib_ring_buffer_check_deliver() has the matching
+ * memory barriers required around commit_cold cc_sb
+ * updates to ensure reserve and commit counter updates
+ * are not seen reordered when updated by another CPU.
+ */
+ smp_rmb();
+ commit_count = v_read(config,
+ &buf->commit_cold[sb_index].cc_sb);
+ /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+ smp_rmb();
+ if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
+ /*
+ * The reserve counter have been concurrently updated
+ * while we read the commit counter. This means the
+ * commit counter we read might not match buf->offset
+ * due to concurrent update. We therefore need to retry.
+ */
+ goto retry;
+ }
reserve_commit_diff =
(buf_trunc(offsets->begin, chan)
>> chan->backend.num_subbuf_order)
- - ((unsigned long) v_read(config,
- &buf->commit_cold[sb_index].cc_sb)
- & chan->commit_count_mask);
+ - (commit_count & chan->commit_count_mask);
if (likely(reserve_commit_diff == 0)) {
/* Next subbuffer not being written to. */
if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
} else {
/*
* Next subbuffer reserve offset does not match the
- * commit offset. Drop record in producer-consumer and
- * overwrite mode. Caused by either a writer OOPS or too
- * many nested writes over a reserve/commit pair.
+ * commit offset, and this did not involve update to the
+ * reserve counter. Drop record in producer-consumer and
+ * overwrite mode. Caused by either a writer OOPS or
+ * too many nested writes over a reserve/commit pair.
*/
v_inc(config, &buf->records_lost_wrap);
return -EIO;
return 0;
}
+static struct lib_ring_buffer *get_current_buf(struct channel *chan, int cpu)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ return per_cpu_ptr(chan->backend.buf, cpu);
+ else
+ return chan->backend.buf;
+}
+
+void lib_ring_buffer_lost_event_too_big(struct channel *chan)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ struct lib_ring_buffer *buf = get_current_buf(chan, smp_processor_id());
+
+ v_inc(config, &buf->records_lost_big);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big);
+
/**
* lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
* @ctx: ring buffer context.
struct switch_offsets offsets;
int ret;
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
- else
- buf = chan->backend.buf;
- ctx->buf = buf;
-
+ ctx->buf = buf = get_current_buf(chan, ctx->cpu);
offsets.size = 0;
do {
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
+static
+void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config,
+ struct lib_ring_buffer *buf,
+ unsigned long commit_count,
+ unsigned long idx)
+{
+ if (config->oops == RING_BUFFER_OOPS_CONSISTENCY)
+ v_set(config, &buf->commit_hot[idx].seq, commit_count);
+}
+
+void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *config,
+ struct lib_ring_buffer *buf,
+ struct channel *chan,
+ unsigned long offset,
+ unsigned long commit_count,
+ unsigned long idx,
+ u64 tsc)
+{
+ unsigned long old_commit_count = commit_count
+ - chan->backend.subbuf_size;
+
+ /*
+ * If we succeeded at updating cc_sb below, we are the subbuffer
+ * writer delivering the subbuffer. Deals with concurrent
+ * updates of the "cc" value without adding a add_return atomic
+ * operation to the fast path.
+ *
+ * We are doing the delivery in two steps:
+ * - First, we cmpxchg() cc_sb to the new value
+ * old_commit_count + 1. This ensures that we are the only
+ * subbuffer user successfully filling the subbuffer, but we
+ * do _not_ set the cc_sb value to "commit_count" yet.
+ * Therefore, other writers that would wrap around the ring
+ * buffer and try to start writing to our subbuffer would
+ * have to drop records, because it would appear as
+ * non-filled.
+ * We therefore have exclusive access to the subbuffer control
+ * structures. This mutual exclusion with other writers is
+ * crucially important to perform record overruns count in
+ * flight recorder mode locklessly.
+ * - When we are ready to release the subbuffer (either for
+ * reading or for overrun by other writers), we simply set the
+ * cc_sb value to "commit_count" and perform delivery.
+ *
+ * The subbuffer size is least 2 bytes (minimum size: 1 page).
+ * This guarantees that old_commit_count + 1 != commit_count.
+ */
+
+ /*
+ * Order prior updates to reserve count prior to the
+ * commit_cold cc_sb update.
+ */
+ smp_wmb();
+ if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
+ old_commit_count, old_commit_count + 1)
+ == old_commit_count)) {
+ /*
+ * Start of exclusive subbuffer access. We are
+ * guaranteed to be the last writer in this subbuffer
+ * and any other writer trying to access this subbuffer
+ * in this state is required to drop records.
+ */
+ v_add(config,
+ subbuffer_get_records_count(config,
+ &buf->backend, idx),
+ &buf->records_count);
+ v_add(config,
+ subbuffer_count_records_overrun(config,
+ &buf->backend,
+ idx),
+ &buf->records_overrun);
+ config->cb.buffer_end(buf, tsc, idx,
+ lib_ring_buffer_get_data_size(config,
+ buf,
+ idx));
+
+ /*
+ * Increment the packet counter while we have exclusive
+ * access.
+ */
+ subbuffer_inc_packet_count(config, &buf->backend, idx);
+
+ /*
+ * Set noref flag and offset for this subbuffer id.
+ * Contains a memory barrier that ensures counter stores
+ * are ordered before set noref and offset.
+ */
+ lib_ring_buffer_set_noref_offset(config, &buf->backend, idx,
+ buf_trunc_val(offset, chan));
+
+ /*
+ * Order set_noref and record counter updates before the
+ * end of subbuffer exclusive access. Orders with
+ * respect to writers coming into the subbuffer after
+ * wrap around, and also order wrt concurrent readers.
+ */
+ smp_mb();
+ /* End of exclusive subbuffer access */
+ v_set(config, &buf->commit_cold[idx].cc_sb,
+ commit_count);
+ /*
+ * Order later updates to reserve count after
+ * the commit_cold cc_sb update.
+ */
+ smp_wmb();
+ lib_ring_buffer_vmcore_check_deliver(config, buf,
+ commit_count, idx);
+
+ /*
+ * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
+ */
+ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
+ && atomic_long_read(&buf->active_readers)
+ && lib_ring_buffer_poll_deliver(config, buf, chan)) {
+ wake_up_interruptible(&buf->read_wait);
+ wake_up_interruptible(&chan->read_wait);
+ }
+
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_check_deliver_slow);
+
int __init init_lib_ring_buffer_frontend(void)
{
int cpu;