From: Mathieu Desnoyers Date: Thu, 10 Mar 2022 19:20:47 +0000 (-0500) Subject: Fix: sample discarded events count before reserve X-Git-Url: https://git.lttng.org/?p=lttng-modules.git;a=commitdiff_plain;h=b2cf5e0b0edcfe64d158119097fa49321cf9ba97 Fix: sample discarded events count before reserve Sampling the discarded events count in the buffer_end callback is done out of order, and may therefore include increments performed by following events (in following packets) if the thread doing the end-of-packet event write is interrupted for a long time. Sampling the event discarded counts before reserving space for the last event in a packet, and keeping this as part of the private ring buffer context, should fix this race. In lttng-modules, this scenario would only happen if an interrupt handler produces many events, when nested over an event between its reserve and commit. Note that if lttng-modules supports faultable tracepoints in the future, this may become more easy to trigger due to preemption. Signed-off-by: Mathieu Desnoyers Change-Id: I696a206b3926fc1abbee35caa9af65461ff56c68 --- diff --git a/include/ringbuffer/config.h b/include/ringbuffer/config.h index 34f8a5da..43523de3 100644 --- a/include/ringbuffer/config.h +++ b/include/ringbuffer/config.h @@ -44,7 +44,8 @@ struct lttng_kernel_ring_buffer_client_cb { void (*buffer_begin) (struct lttng_kernel_ring_buffer *buf, u64 tsc, unsigned int subbuf_idx); void (*buffer_end) (struct lttng_kernel_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size); + unsigned int subbuf_idx, unsigned long data_size, + const struct lttng_kernel_ring_buffer_ctx *ctx); /* Optional callbacks (can be set to NULL) */ @@ -189,6 +190,14 @@ struct lttng_kernel_ring_buffer_ctx_private { * for this channel */ struct lttng_kernel_ring_buffer_backend_pages *backend_pages; + + /* + * Records lost counts are only loaded into these fields before + * reserving the last bytes from the ring buffer. + */ + unsigned long records_lost_full; + unsigned long records_lost_wrap; + unsigned long records_lost_big; }; /* diff --git a/include/ringbuffer/frontend.h b/include/ringbuffer/frontend.h index f5b6fe43..57e09aa8 100644 --- a/include/ringbuffer/frontend.h +++ b/include/ringbuffer/frontend.h @@ -201,26 +201,26 @@ unsigned long lib_ring_buffer_get_records_overrun( static inline unsigned long lib_ring_buffer_get_records_lost_full( - const struct lttng_kernel_ring_buffer_config *config, - struct lttng_kernel_ring_buffer *buf) + const struct lttng_kernel_ring_buffer_config *config __attribute__((unused)), + const struct lttng_kernel_ring_buffer_ctx *ctx) { - return v_read(config, &buf->records_lost_full); + return ctx->priv.records_lost_full; } static inline unsigned long lib_ring_buffer_get_records_lost_wrap( - const struct lttng_kernel_ring_buffer_config *config, - struct lttng_kernel_ring_buffer *buf) + const struct lttng_kernel_ring_buffer_config *config __attribute__((unused)), + const struct lttng_kernel_ring_buffer_ctx *ctx) { - return v_read(config, &buf->records_lost_wrap); + return ctx->priv.records_lost_wrap; } static inline unsigned long lib_ring_buffer_get_records_lost_big( - const struct lttng_kernel_ring_buffer_config *config, - struct lttng_kernel_ring_buffer *buf) + const struct lttng_kernel_ring_buffer_config *config __attribute__((unused)), + const struct lttng_kernel_ring_buffer_ctx *ctx) { - return v_read(config, &buf->records_lost_big); + return ctx->priv.records_lost_big; } static inline diff --git a/include/ringbuffer/frontend_api.h b/include/ringbuffer/frontend_api.h index 143aa02b..41e3958c 100644 --- a/include/ringbuffer/frontend_api.h +++ b/include/ringbuffer/frontend_api.h @@ -280,7 +280,7 @@ void lib_ring_buffer_commit(const struct lttng_kernel_ring_buffer_config *config commit_count = v_read(config, &cc_hot->cc); lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1, - commit_count, endidx, ctx->priv.tsc); + commit_count, endidx, ctx); /* * Update used size at each commit. It's needed only for extracting * ring_buffer buffers from vmcore, after crash. diff --git a/include/ringbuffer/frontend_internal.h b/include/ringbuffer/frontend_internal.h index 73fda354..999a7bb9 100644 --- a/include/ringbuffer/frontend_internal.h +++ b/include/ringbuffer/frontend_internal.h @@ -148,7 +148,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_co unsigned long offset, unsigned long commit_count, unsigned long idx, - u64 tsc); + const struct lttng_kernel_ring_buffer_ctx *ctx); extern void lib_ring_buffer_switch_remote(struct lttng_kernel_ring_buffer *buf); @@ -277,7 +277,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_kernel_ring_buffer_config unsigned long offset, unsigned long commit_count, unsigned long idx, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -286,7 +286,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_kernel_ring_buffer_config if (unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) - (old_commit_count & chan->commit_count_mask) == 0)) lib_ring_buffer_check_deliver_slow(config, buf, chan, offset, - commit_count, idx, tsc); + commit_count, idx, ctx); } /* diff --git a/src/lib/ringbuffer/ring_buffer_frontend.c b/src/lib/ringbuffer/ring_buffer_frontend.c index 87a575d0..8a69e9ab 100644 --- a/src/lib/ringbuffer/ring_buffer_frontend.c +++ b/src/lib/ringbuffer/ring_buffer_frontend.c @@ -1582,14 +1582,14 @@ static void lib_ring_buffer_switch_old_start(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; struct commit_counters_hot *cc_hot; - config->cb.buffer_begin(buf, tsc, oldidx); + config->cb.buffer_begin(buf, ctx->priv.tsc, oldidx); /* * Order all writes to buffer before the commit count update that will @@ -1609,7 +1609,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_kernel_ring_buffer *buf, commit_count = v_read(config, &cc_hot->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, - commit_count, oldidx, tsc); + commit_count, oldidx, ctx); lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->old + config->cb.subbuffer_header_size(), commit_count, cc_hot); @@ -1627,7 +1627,7 @@ static void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); @@ -1648,7 +1648,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf, * postponed until the commit counter is incremented for the * current space reservation. */ - *ts_end = tsc; + *ts_end = ctx->priv.tsc; /* * Order all writes to buffer and store to ts_end before the commit @@ -1667,7 +1667,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf, v_add(config, padding_size, &cc_hot->cc); commit_count = v_read(config, &cc_hot->cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, - commit_count, oldidx, tsc); + commit_count, oldidx, ctx); lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->old + padding_size, commit_count, cc_hot); @@ -1684,14 +1684,14 @@ static void lib_ring_buffer_switch_new_start(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; struct commit_counters_hot *cc_hot; - config->cb.buffer_begin(buf, tsc, beginidx); + config->cb.buffer_begin(buf, ctx->priv.tsc, beginidx); /* * Order all writes to buffer before the commit count update that will @@ -1711,7 +1711,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_kernel_ring_buffer *buf, commit_count = v_read(config, &cc_hot->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, - commit_count, beginidx, tsc); + commit_count, beginidx, ctx); lib_ring_buffer_write_commit_counter(config, buf, chan, offsets->begin + config->cb.subbuffer_header_size(), commit_count, cc_hot); @@ -1729,7 +1729,7 @@ static void lib_ring_buffer_switch_new_end(struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long endidx, data_size; @@ -1747,7 +1747,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_kernel_ring_buffer *buf, * postponed until the commit counter is incremented for the * current space reservation. */ - *ts_end = tsc; + *ts_end = ctx->priv.tsc; } /* @@ -1760,7 +1760,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_kernel_ring_buffer *buf, struct lttng_kernel_ring_buffer_channel *chan, struct switch_offsets *offsets, - u64 *tsc) + struct lttng_kernel_ring_buffer_ctx *ctx) { const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; unsigned long off, reserve_commit_diff; @@ -1770,7 +1770,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, offsets->switch_old_start = 0; off = subbuf_offset(offsets->begin, chan); - *tsc = config->cb.ring_buffer_clock_read(chan); + ctx->priv.tsc = config->cb.ring_buffer_clock_read(chan); /* * Ensure we flush the header of an empty subbuffer when doing the @@ -1852,6 +1852,13 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, offsets->begin = subbuf_align(offsets->begin, chan); /* Note: old points to the next subbuf at offset 0 */ offsets->end = offsets->begin; + /* + * Populate the records lost counters prior to performing a + * sub-buffer switch. + */ + ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full); + ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap); + ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big); return 0; } @@ -1867,9 +1874,9 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config; + struct lttng_kernel_ring_buffer_ctx ctx; struct switch_offsets offsets; unsigned long oldidx; - u64 tsc; offsets.size = 0; @@ -1878,7 +1885,7 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit */ do { if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets, - &tsc)) + &ctx)) return; /* Switch not needed */ } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old); @@ -1889,7 +1896,7 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit * records, never the opposite (missing a full TSC record when it would * be needed). */ - save_last_tsc(config, buf, tsc); + save_last_tsc(config, buf, ctx.priv.tsc); /* * Push the reader if necessary @@ -1903,14 +1910,14 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit * May need to populate header start on SWITCH_FLUSH. */ if (offsets.switch_old_start) { - lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc); + lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx); offsets.old += config->cb.subbuffer_header_size(); } /* * Switch old subbuffer. */ - lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx); } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); @@ -2143,6 +2150,15 @@ retry: */ offsets->switch_new_end = 1; /* For offsets->begin */ } + /* + * Populate the records lost counters when the space reservation + * may cause a sub-buffer switch. + */ + if (offsets->switch_new_end || offsets->switch_old_end) { + ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full); + ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap); + ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big); + } return 0; } @@ -2219,17 +2235,17 @@ int lib_ring_buffer_reserve_slow(struct lttng_kernel_ring_buffer_ctx *ctx, if (unlikely(offsets.switch_old_end)) { lib_ring_buffer_clear_noref(config, &buf->backend, subbuf_index(offsets.old - 1, chan)); - lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->priv.tsc); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx); } /* * Populate new subbuffer. */ if (unlikely(offsets.switch_new_start)) - lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->priv.tsc); + lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx); if (unlikely(offsets.switch_new_end)) - lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->priv.tsc); + lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx); ctx->priv.slot_size = offsets.size; ctx->priv.pre_offset = offsets.begin; @@ -2281,7 +2297,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_co unsigned long offset, unsigned long commit_count, unsigned long idx, - u64 tsc) + const struct lttng_kernel_ring_buffer_ctx *ctx) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -2341,7 +2357,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_co config->cb.buffer_end(buf, *ts_end, idx, lib_ring_buffer_get_data_size(config, buf, - idx)); + idx), ctx); /* * Increment the packet counter while we have exclusive diff --git a/src/lttng-ring-buffer-client.h b/src/lttng-ring-buffer-client.h index e2ef7d76..0dc8a8d9 100644 --- a/src/lttng-ring-buffer-client.h +++ b/src/lttng-ring-buffer-client.h @@ -377,7 +377,8 @@ static void client_buffer_begin(struct lttng_kernel_ring_buffer *buf, u64 tsc, * subbuffer. data_size is between 1 and subbuf_size. */ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size) + unsigned int subbuf_idx, unsigned long data_size, + const struct lttng_kernel_ring_buffer_ctx *ctx) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; struct packet_header *header = @@ -391,9 +392,9 @@ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc, (uint64_t) data_size * CHAR_BIT; /* in bits */ header->ctx.packet_size = (uint64_t) PAGE_ALIGN(data_size) * CHAR_BIT; /* in bits */ - records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf); - records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf); - records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf); + records_lost += lib_ring_buffer_get_records_lost_full(&client_config, ctx); + records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx); + records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx); header->ctx.events_discarded = records_lost; } diff --git a/src/lttng-ring-buffer-event-notifier-client.h b/src/lttng-ring-buffer-event-notifier-client.h index ccd3a020..8526e05e 100644 --- a/src/lttng-ring-buffer-event-notifier-client.h +++ b/src/lttng-ring-buffer-event-notifier-client.h @@ -96,7 +96,8 @@ static void client_buffer_begin(struct lttng_kernel_ring_buffer *buf, u64 tsc, * subbuffer. data_size is between 1 and subbuf_size. */ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size) + unsigned int subbuf_idx, unsigned long data_size, + const struct lttng_kernel_ring_buffer_ctx *ctx) { } diff --git a/src/lttng-ring-buffer-metadata-client.h b/src/lttng-ring-buffer-metadata-client.h index 763ebaf1..86ec78f1 100644 --- a/src/lttng-ring-buffer-metadata-client.h +++ b/src/lttng-ring-buffer-metadata-client.h @@ -110,7 +110,8 @@ static void client_buffer_begin(struct lttng_kernel_ring_buffer *buf, u64 tsc, * subbuffer. data_size is between 1 and subbuf_size. */ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size) + unsigned int subbuf_idx, unsigned long data_size, + const struct lttng_kernel_ring_buffer_ctx *ctx) { struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan; struct metadata_packet_header *header = @@ -125,9 +126,9 @@ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc, * We do not care about the records lost count, because the metadata * channel waits and retry. */ - (void) lib_ring_buffer_get_records_lost_full(&client_config, buf); - records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf); - records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf); + (void) lib_ring_buffer_get_records_lost_full(&client_config, ctx); + records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx); + records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx); WARN_ON_ONCE(records_lost != 0); }