#include <linux/module.h>
#include <linux/percpu.h>
-#include "../../wrapper/ringbuffer/config.h"
-#include "../../wrapper/ringbuffer/backend.h"
-#include "../../wrapper/ringbuffer/frontend.h"
-#include "../../wrapper/ringbuffer/iterator.h"
-#include "../../wrapper/ringbuffer/nohz.h"
+#include <wrapper/ringbuffer/config.h>
+#include <wrapper/ringbuffer/backend.h>
+#include <wrapper/ringbuffer/frontend.h>
+#include <wrapper/ringbuffer/iterator.h>
+#include <wrapper/ringbuffer/nohz.h>
+#include <wrapper/atomic.h>
+#include <wrapper/kref.h>
+#include <wrapper/percpu-defs.h>
+#include <wrapper/timer.h>
/*
* Internal structure representing offsets to use at a sub-buffer switch.
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode);
/*
* Must be called under cpu hotplug protection.
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->switch_timer,
+ lttng_mod_timer_pinned(&buf->switch_timer,
jiffies + chan->switch_timer_interval);
else
mod_timer(&buf->switch_timer,
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
return;
- init_timer(&buf->switch_timer);
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->switch_timer);
+ else
+ init_timer(&buf->switch_timer);
+
buf->switch_timer.function = switch_buffer_timer;
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
buf->switch_timer.data = (unsigned long)buf;
}
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- mod_timer_pinned(&buf->read_timer,
+ lttng_mod_timer_pinned(&buf->read_timer,
jiffies + chan->read_timer_interval);
else
mod_timer(&buf->read_timer,
|| buf->read_timer_enabled)
return;
- init_timer(&buf->read_timer);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ lttng_init_timer_pinned(&buf->read_timer);
+ else
+ init_timer(&buf->read_timer);
+
buf->read_timer.function = read_buffer_timer;
buf->read_timer.expires = jiffies + chan->read_timer_interval;
buf->read_timer.data = (unsigned long)buf;
raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
break;
case TICK_NOHZ_STOP:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_stop_switch_timer(buf);
lib_ring_buffer_stop_read_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
case TICK_NOHZ_RESTART:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_start_read_timer(buf);
lib_ring_buffer_start_switch_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
}
channel_backend_unregister_notifiers(&chan->backend);
}
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+ if (!buf->quiescent) {
+ buf->quiescent = true;
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+ }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+ buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
static void channel_free(struct channel *chan)
{
+ if (chan->backend.release_priv_ops) {
+ chan->backend.release_priv_ops(chan->backend.priv_ops);
+ }
channel_iterator_free(chan);
channel_backend_free(&chan->backend);
kfree(chan);
chan->backend.priv,
cpu);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
return -EBUSY;
- kref_get(&chan->ref);
- smp_mb__after_atomic_inc();
+ if (!lttng_kref_get(&chan->ref)) {
+ atomic_long_dec(&buf->active_readers);
+ return -EOVERFLOW;
+ }
+ lttng_smp_mb__after_atomic();
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
struct channel *chan = buf->backend.chan;
CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
- smp_mb__before_atomic_dec();
+ lttng_smp_mb__before_atomic();
atomic_long_dec(&buf->active_readers);
kref_put(&chan->ref, channel_release);
}
/*
* lib_ring_buffer_switch_old_start: Populate old subbuffer header.
*
- * Only executed when the buffer is finalized, in SWITCH_FLUSH.
+ * Only executed by SWITCH_FLUSH, which can be issued while tracing is active
+ * or at buffer finalization (destroy).
*/
static
void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
- commit_count, oldidx);
+ commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->old + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
- commit_count, oldidx);
+ commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- padding_size);
+ offsets->old + padding_size, commit_count);
}
/*
commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
- commit_count, beginidx);
+ commit_count, beginidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
- offsets->begin, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->begin + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
unsigned long sb_index, commit_count;
/*
- * We are performing a SWITCH_FLUSH. At this stage, there are no
- * concurrent writes into the buffer.
+ * We are performing a SWITCH_FLUSH. There may be concurrent
+ * writes into the buffer if e.g. invoked while performing a
+ * snapshot on an active trace.
*
- * The client does not save any header information. Don't
- * switch empty subbuffer on finalize, because it is invalid to
- * deliver a completely empty subbuffer.
+ * If the client does not save any header information (sub-buffer
+ * header size == 0), don't switch empty subbuffer on finalize,
+ * because it is invalid to deliver a completely empty
+ * subbuffer.
*/
if (!config->cb.subbuffer_header_size())
return -1;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
+struct switch_param {
+ struct lib_ring_buffer *buf;
+ enum switch_mode mode;
+};
+
static void remote_switch(void *info)
{
- struct lib_ring_buffer *buf = info;
+ struct switch_param *param = info;
+ struct lib_ring_buffer *buf = param->buf;
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, param->mode);
}
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode)
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
int ret;
+ struct switch_param param;
/*
* With global synchronization we don't need to use the IPI scheme.
*/
if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
return;
}
* switch.
*/
get_online_cpus();
+ param.buf = buf;
+ param.mode = mode;
ret = smp_call_function_single(buf->backend.cpu,
- remote_switch, buf, 1);
+ remote_switch, ¶m, 1);
if (ret) {
/* Remote CPU is offline, do it ourself. */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
}
put_online_cpus();
}
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
+/* Switch sub-buffer even if current sub-buffer is empty. */
+void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
+
/*
* Returns :
* 0 if ok
return 0;
}
+static struct lib_ring_buffer *get_current_buf(struct channel *chan, int cpu)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ return per_cpu_ptr(chan->backend.buf, cpu);
+ else
+ return chan->backend.buf;
+}
+
+void lib_ring_buffer_lost_event_too_big(struct channel *chan)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ struct lib_ring_buffer *buf = get_current_buf(chan, smp_processor_id());
+
+ v_inc(config, &buf->records_lost_big);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big);
+
/**
* lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
* @ctx: ring buffer context.
struct switch_offsets offsets;
int ret;
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
- else
- buf = chan->backend.buf;
- ctx->buf = buf;
-
+ ctx->buf = buf = get_current_buf(chan, ctx->cpu);
offsets.size = 0;
do {