#include <wrapper/atomic.h>
#include <wrapper/kref.h>
#include <wrapper/percpu-defs.h>
+#include <wrapper/timer.h>
/*
* Internal structure representing offsets to use at a sub-buffer switch.
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode);
/*
* Must be called under cpu hotplug protection.
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->switch_timer,
+ lttng_mod_timer_pinned(&buf->switch_timer,
jiffies + chan->switch_timer_interval);
else
mod_timer(&buf->switch_timer,
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
return;
- init_timer(&buf->switch_timer);
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->switch_timer);
+ else
+ init_timer(&buf->switch_timer);
+
buf->switch_timer.function = switch_buffer_timer;
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
buf->switch_timer.data = (unsigned long)buf;
}
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->read_timer,
+ lttng_mod_timer_pinned(&buf->read_timer,
jiffies + chan->read_timer_interval);
else
mod_timer(&buf->read_timer,
|| buf->read_timer_enabled)
return;
- init_timer(&buf->read_timer);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->read_timer);
+ else
+ init_timer(&buf->read_timer);
+
buf->read_timer.function = read_buffer_timer;
buf->read_timer.expires = jiffies + chan->read_timer_interval;
buf->read_timer.data = (unsigned long)buf;
channel_backend_unregister_notifiers(&chan->backend);
}
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+ if (!buf->quiescent) {
+ buf->quiescent = true;
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+ }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+ buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
static void channel_free(struct channel *chan)
{
if (chan->backend.release_priv_ops) {
chan->backend.priv,
cpu);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
/*
* lib_ring_buffer_switch_old_start: Populate old subbuffer header.
*
- * Only executed when the buffer is finalized, in SWITCH_FLUSH.
+ * Only executed by SWITCH_FLUSH, which can be issued while tracing is active
+ * or at buffer finalization (destroy).
*/
static
void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
unsigned long sb_index, commit_count;
/*
- * We are performing a SWITCH_FLUSH. At this stage, there are no
- * concurrent writes into the buffer.
+ * We are performing a SWITCH_FLUSH. There may be concurrent
+ * writes into the buffer if e.g. invoked while performing a
+ * snapshot on an active trace.
*
- * The client does not save any header information. Don't
- * switch empty subbuffer on finalize, because it is invalid to
- * deliver a completely empty subbuffer.
+ * If the client does not save any header information (sub-buffer
+ * header size == 0), don't switch empty subbuffer on finalize,
+ * because it is invalid to deliver a completely empty
+ * subbuffer.
*/
if (!config->cb.subbuffer_header_size())
return -1;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
+struct switch_param {
+ struct lib_ring_buffer *buf;
+ enum switch_mode mode;
+};
+
static void remote_switch(void *info)
{
- struct lib_ring_buffer *buf = info;
+ struct switch_param *param = info;
+ struct lib_ring_buffer *buf = param->buf;
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, param->mode);
}
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode)
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
int ret;
+ struct switch_param param;
/*
* With global synchronization we don't need to use the IPI scheme.
*/
if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
return;
}
* switch.
*/
get_online_cpus();
+ param.buf = buf;
+ param.mode = mode;
ret = smp_call_function_single(buf->backend.cpu,
- remote_switch, buf, 1);
+ remote_switch, ¶m, 1);
if (ret) {
/* Remote CPU is offline, do it ourself. */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
}
put_online_cpus();
}
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
+/* Switch sub-buffer even if current sub-buffer is empty. */
+void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
+
/*
* Returns :
* 0 if ok
return 0;
}
+static struct lib_ring_buffer *get_current_buf(struct channel *chan, int cpu)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ return per_cpu_ptr(chan->backend.buf, cpu);
+ else
+ return chan->backend.buf;
+}
+
+void lib_ring_buffer_lost_event_too_big(struct channel *chan)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ struct lib_ring_buffer *buf = get_current_buf(chan, smp_processor_id());
+
+ v_inc(config, &buf->records_lost_big);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big);
+
/**
* lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
* @ctx: ring buffer context.
struct switch_offsets offsets;
int ret;
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
- else
- buf = chan->backend.buf;
- ctx->buf = buf;
-
+ ctx->buf = buf = get_current_buf(chan, ctx->cpu);
offsets.size = 0;
do {