* - put_subbuf
*/
+#include <linux/atomic.h>
#include <linux/delay.h>
#include <linux/module.h>
#include <linux/percpu.h>
+#include <linux/percpu-defs.h>
#include <asm/cacheflush.h>
#include <ringbuffer/config.h>
#include <ringbuffer/frontend.h>
#include <ringbuffer/iterator.h>
#include <ringbuffer/nohz.h>
-#include <wrapper/atomic.h>
#include <wrapper/cpu.h>
#include <wrapper/kref.h>
-#include <wrapper/percpu-defs.h>
#include <wrapper/timer.h>
#include <wrapper/vmalloc.h>
}
atomic_long_set(&buf->consumed, 0);
atomic_set(&buf->record_disabled, 0);
- v_set(config, &buf->last_tsc, 0);
+ v_set(config, &buf->last_timestamp, 0);
lib_ring_buffer_backend_reset(&buf->backend);
/* Don't reset number of active readers */
v_set(config, &buf->records_lost_full, 0);
struct lttng_kernel_ring_buffer_channel *chan = container_of(chanb, struct lttng_kernel_ring_buffer_channel, backend);
void *priv = chanb->priv;
size_t subbuf_header_size;
- u64 tsc;
+ u64 timestamp;
int ret;
/* Test for cpu hotplug */
subbuf_header_size = config->cb.subbuffer_header_size();
v_set(config, &buf->offset, subbuf_header_size);
subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id);
- tsc = config->cb.ring_buffer_clock_read(buf->backend.chan);
- config->cb.buffer_begin(buf, tsc, 0);
+ timestamp = config->cb.ring_buffer_clock_read(buf->backend.chan);
+ config->cb.buffer_begin(buf, timestamp, 0);
v_add(config, subbuf_header_size, &buf->commit_hot[0].cc);
if (config->cb.buffer_create) {
raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
break;
case TICK_NOHZ_STOP:
- spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_stop_switch_timer(buf);
lib_ring_buffer_stop_read_timer(buf);
- spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock));
break;
case TICK_NOHZ_RESTART:
- spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_start_read_timer(buf);
lib_ring_buffer_start_switch_timer(buf);
- spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock));
break;
}
atomic_long_dec(&buf->active_readers);
return -EOVERFLOW;
}
- lttng_smp_mb__after_atomic();
+ smp_mb__after_atomic();
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
- lttng_smp_mb__before_atomic();
+ smp_mb__before_atomic();
atomic_long_dec(&buf->active_readers);
kref_put(&chan->ref, channel_release);
}
if (config->output != RING_BUFFER_MMAP)
return;
+#ifdef cpu_dcache_is_aliasing
+ /*
+ * Some architectures implement flush_dcache_page() but don't
+ * actually have aliasing dcache. cpu_dcache_is_aliasing() was
+ * introduced in kernel v6.9 to query this more precisely.
+ */
+ if (!cpu_dcache_is_aliasing())
+ return;
+#endif
+
/*
* Architectures with caches aliased on virtual addresses may
* use different cache lines for the linear mapping vs
void lib_ring_buffer_switch_old_start(struct lttng_kernel_ring_buffer *buf,
struct lttng_kernel_ring_buffer_channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ const struct lttng_kernel_ring_buffer_ctx *ctx)
{
const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
unsigned long oldidx = subbuf_index(offsets->old, chan);
unsigned long commit_count;
struct commit_counters_hot *cc_hot;
- config->cb.buffer_begin(buf, tsc, oldidx);
+ config->cb.buffer_begin(buf, ctx->priv.timestamp, oldidx);
/*
* Order all writes to buffer before the commit count update that will
commit_count = v_read(config, &cc_hot->cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
- commit_count, oldidx, tsc);
+ commit_count, oldidx, ctx);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->old + config->cb.subbuffer_header_size(),
commit_count, cc_hot);
void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf,
struct lttng_kernel_ring_buffer_channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ const struct lttng_kernel_ring_buffer_ctx *ctx)
{
const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
* postponed until the commit counter is incremented for the
* current space reservation.
*/
- *ts_end = tsc;
+ *ts_end = ctx->priv.timestamp;
/*
* Order all writes to buffer and store to ts_end before the commit
v_add(config, padding_size, &cc_hot->cc);
commit_count = v_read(config, &cc_hot->cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
- commit_count, oldidx, tsc);
+ commit_count, oldidx, ctx);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->old + padding_size, commit_count,
cc_hot);
void lib_ring_buffer_switch_new_start(struct lttng_kernel_ring_buffer *buf,
struct lttng_kernel_ring_buffer_channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ const struct lttng_kernel_ring_buffer_ctx *ctx)
{
const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
unsigned long beginidx = subbuf_index(offsets->begin, chan);
unsigned long commit_count;
struct commit_counters_hot *cc_hot;
- config->cb.buffer_begin(buf, tsc, beginidx);
+ config->cb.buffer_begin(buf, ctx->priv.timestamp, beginidx);
/*
* Order all writes to buffer before the commit count update that will
commit_count = v_read(config, &cc_hot->cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
- commit_count, beginidx, tsc);
+ commit_count, beginidx, ctx);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->begin + config->cb.subbuffer_header_size(),
commit_count, cc_hot);
void lib_ring_buffer_switch_new_end(struct lttng_kernel_ring_buffer *buf,
struct lttng_kernel_ring_buffer_channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ const struct lttng_kernel_ring_buffer_ctx *ctx)
{
const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
unsigned long endidx, data_size;
* postponed until the commit counter is incremented for the
* current space reservation.
*/
- *ts_end = tsc;
+ *ts_end = ctx->priv.timestamp;
}
/*
struct lttng_kernel_ring_buffer *buf,
struct lttng_kernel_ring_buffer_channel *chan,
struct switch_offsets *offsets,
- u64 *tsc)
+ struct lttng_kernel_ring_buffer_ctx *ctx)
{
const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
unsigned long off, reserve_commit_diff;
offsets->switch_old_start = 0;
off = subbuf_offset(offsets->begin, chan);
- *tsc = config->cb.ring_buffer_clock_read(chan);
+ ctx->priv.timestamp = config->cb.ring_buffer_clock_read(chan);
/*
* Ensure we flush the header of an empty subbuffer when doing the
offsets->begin = subbuf_align(offsets->begin, chan);
/* Note: old points to the next subbuf at offset 0 */
offsets->end = offsets->begin;
+ /*
+ * Populate the records lost counters prior to performing a
+ * sub-buffer switch.
+ */
+ ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full);
+ ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+ ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big);
return 0;
}
{
struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
+ struct lttng_kernel_ring_buffer_ctx ctx;
struct switch_offsets offsets;
unsigned long oldidx;
- u64 tsc;
offsets.size = 0;
*/
do {
if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
- &tsc))
+ &ctx))
return; /* Switch not needed */
} while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
!= offsets.old);
/*
- * Atomically update last_tsc. This update races against concurrent
- * atomic updates, but the race will always cause supplementary full TSC
- * records, never the opposite (missing a full TSC record when it would
- * be needed).
+ * Atomically update last_timestamp. This update races against concurrent
+ * atomic updates, but the race will always cause supplementary
+ * full timestamp records, never the opposite (missing a full
+ * timestamp record when it would be needed).
*/
- save_last_tsc(config, buf, tsc);
+ save_last_timestamp(config, buf, ctx.priv.timestamp);
/*
* Push the reader if necessary
* May need to populate header start on SWITCH_FLUSH.
*/
if (offsets.switch_old_start) {
- lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
+ lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx);
offsets.old += config->cb.subbuffer_header_size();
}
/*
* Switch old subbuffer.
*/
- lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
+ lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx);
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
offsets->switch_old_end = 0;
offsets->pre_header_padding = 0;
- ctx->priv.tsc = config->cb.ring_buffer_clock_read(chan);
- if ((int64_t) ctx->priv.tsc == -EIO)
+ ctx->priv.timestamp = config->cb.ring_buffer_clock_read(chan);
+ if ((int64_t) ctx->priv.timestamp == -EIO)
return -EIO;
- if (last_tsc_overflow(config, buf, ctx->priv.tsc))
- ctx->priv.rflags |= RING_BUFFER_RFLAG_FULL_TSC;
+ if (last_timestamp_overflow(config, buf, ctx->priv.timestamp))
+ ctx->priv.rflags |= RING_BUFFER_RFLAG_FULL_TIMESTAMP;
if (unlikely(subbuf_offset(offsets->begin, ctx->priv.chan) == 0)) {
offsets->switch_new_start = 1; /* For offsets->begin */
*/
offsets->switch_new_end = 1; /* For offsets->begin */
}
+ /*
+ * Populate the records lost counters when the space reservation
+ * may cause a sub-buffer switch.
+ */
+ if (offsets->switch_new_end || offsets->switch_old_end) {
+ ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full);
+ ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+ ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big);
+ }
return 0;
}
!= offsets.old));
/*
- * Atomically update last_tsc. This update races against concurrent
- * atomic updates, but the race will always cause supplementary full TSC
- * records, never the opposite (missing a full TSC record when it would
- * be needed).
+ * Atomically update last_timestamp. This update races against concurrent
+ * atomic updates, but the race will always cause supplementary
+ * full timestamp records, never the opposite (missing a full
+ * timestamp record when it would be needed).
*/
- save_last_tsc(config, buf, ctx->priv.tsc);
+ save_last_timestamp(config, buf, ctx->priv.timestamp);
/*
* Push the reader if necessary
if (unlikely(offsets.switch_old_end)) {
lib_ring_buffer_clear_noref(config, &buf->backend,
subbuf_index(offsets.old - 1, chan));
- lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->priv.tsc);
+ lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx);
}
/*
* Populate new subbuffer.
*/
if (unlikely(offsets.switch_new_start))
- lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->priv.tsc);
+ lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx);
if (unlikely(offsets.switch_new_end))
- lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->priv.tsc);
+ lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx);
ctx->priv.slot_size = offsets.size;
ctx->priv.pre_offset = offsets.begin;
unsigned long offset,
unsigned long commit_count,
unsigned long idx,
- u64 tsc)
+ const struct lttng_kernel_ring_buffer_ctx *ctx)
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
config->cb.buffer_end(buf, *ts_end, idx,
lib_ring_buffer_get_data_size(config,
buf,
- idx));
+ idx), ctx);
/*
* Increment the packet counter while we have exclusive
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_check_deliver_slow);
+static
int __init init_lib_ring_buffer_frontend(void)
{
int cpu;
module_init(init_lib_ring_buffer_frontend);
+static
void __exit exit_lib_ring_buffer_frontend(void)
{
}