Fix: sample discarded events count before reserve
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 10 Mar 2022 19:20:47 +0000 (14:20 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 11 Mar 2022 16:41:54 +0000 (11:41 -0500)
Sampling the discarded events count in the buffer_end callback is done
out of order, and may therefore include increments performed by following
events (in following packets) if the thread doing the end-of-packet
event write is interrupted for a long time.

Sampling the event discarded counts before reserving space for the last
event in a packet, and keeping this as part of the private ring buffer
context, should fix this race.

In lttng-modules, this scenario would only happen if an interrupt
handler produces many events, when nested over an event between its
reserve and commit. Note that if lttng-modules supports faultable
tracepoints in the future, this may become more easy to trigger due to
preemption.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: I696a206b3926fc1abbee35caa9af65461ff56c68

include/ringbuffer/config.h
include/ringbuffer/frontend.h
include/ringbuffer/frontend_api.h
include/ringbuffer/frontend_internal.h
src/lib/ringbuffer/ring_buffer_frontend.c
src/lttng-ring-buffer-client.h
src/lttng-ring-buffer-event-notifier-client.h
src/lttng-ring-buffer-metadata-client.h

index 34f8a5da22e08935c659c8425d47e35184f1a2ff..43523de3055b55414a969d1ac30bee9610615e2c 100644 (file)
@@ -44,7 +44,8 @@ struct lttng_kernel_ring_buffer_client_cb {
        void (*buffer_begin) (struct lttng_kernel_ring_buffer *buf, u64 tsc,
                              unsigned int subbuf_idx);
        void (*buffer_end) (struct lttng_kernel_ring_buffer *buf, u64 tsc,
-                           unsigned int subbuf_idx, unsigned long data_size);
+                           unsigned int subbuf_idx, unsigned long data_size,
+                           const struct lttng_kernel_ring_buffer_ctx *ctx);
 
        /* Optional callbacks (can be set to NULL) */
 
@@ -189,6 +190,14 @@ struct lttng_kernel_ring_buffer_ctx_private {
                                                 * for this channel
                                                 */
        struct lttng_kernel_ring_buffer_backend_pages *backend_pages;
+
+       /*
+        * Records lost counts are only loaded into these fields before
+        * reserving the last bytes from the ring buffer.
+        */
+       unsigned long records_lost_full;
+       unsigned long records_lost_wrap;
+       unsigned long records_lost_big;
 };
 
 /*
index f5b6fe43311564e8e98f90c2930022c7ac8b97fd..57e09aa886edf2f6688eae67ba74f65b3b51c335 100644 (file)
@@ -201,26 +201,26 @@ unsigned long lib_ring_buffer_get_records_overrun(
 
 static inline
 unsigned long lib_ring_buffer_get_records_lost_full(
-                               const struct lttng_kernel_ring_buffer_config *config,
-                               struct lttng_kernel_ring_buffer *buf)
+                               const struct lttng_kernel_ring_buffer_config *config __attribute__((unused)),
+                               const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
-       return v_read(config, &buf->records_lost_full);
+       return ctx->priv.records_lost_full;
 }
 
 static inline
 unsigned long lib_ring_buffer_get_records_lost_wrap(
-                               const struct lttng_kernel_ring_buffer_config *config,
-                               struct lttng_kernel_ring_buffer *buf)
+                               const struct lttng_kernel_ring_buffer_config *config __attribute__((unused)),
+                               const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
-       return v_read(config, &buf->records_lost_wrap);
+       return ctx->priv.records_lost_wrap;
 }
 
 static inline
 unsigned long lib_ring_buffer_get_records_lost_big(
-                               const struct lttng_kernel_ring_buffer_config *config,
-                               struct lttng_kernel_ring_buffer *buf)
+                               const struct lttng_kernel_ring_buffer_config *config __attribute__((unused)),
+                               const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
-       return v_read(config, &buf->records_lost_big);
+       return ctx->priv.records_lost_big;
 }
 
 static inline
index 143aa02ba927af37b0941a12625590599ffa63c9..41e3958c140ff39752d66e67f2440f5d59a57c2d 100644 (file)
@@ -280,7 +280,7 @@ void lib_ring_buffer_commit(const struct lttng_kernel_ring_buffer_config *config
        commit_count = v_read(config, &cc_hot->cc);
 
        lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
-                                     commit_count, endidx, ctx->priv.tsc);
+                                     commit_count, endidx, ctx);
        /*
         * Update used size at each commit. It's needed only for extracting
         * ring_buffer buffers from vmcore, after crash.
index 73fda354adabefdc734663e1270a98ab7f57611b..999a7bb9993273a94c18dd2f7e2f861e04bd72a9 100644 (file)
@@ -148,7 +148,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_co
                                   unsigned long offset,
                                   unsigned long commit_count,
                                   unsigned long idx,
-                                  u64 tsc);
+                                  const struct lttng_kernel_ring_buffer_ctx *ctx);
 
 extern
 void lib_ring_buffer_switch_remote(struct lttng_kernel_ring_buffer *buf);
@@ -277,7 +277,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_kernel_ring_buffer_config
                                   unsigned long offset,
                                   unsigned long commit_count,
                                   unsigned long idx,
-                                  u64 tsc)
+                                  const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -286,7 +286,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_kernel_ring_buffer_config
        if (unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
                     - (old_commit_count & chan->commit_count_mask) == 0))
                lib_ring_buffer_check_deliver_slow(config, buf, chan, offset,
-                       commit_count, idx, tsc);
+                       commit_count, idx, ctx);
 }
 
 /*
index 87a575d0bbf384bfcd9c731a55fca837ee91636d..8a69e9ab57eec661de51040b4552daaa56b67dcf 100644 (file)
@@ -1582,14 +1582,14 @@ static
 void lib_ring_buffer_switch_old_start(struct lttng_kernel_ring_buffer *buf,
                                      struct lttng_kernel_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
-                                     u64 tsc)
+                                     const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
        unsigned long oldidx = subbuf_index(offsets->old, chan);
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot;
 
-       config->cb.buffer_begin(buf, tsc, oldidx);
+       config->cb.buffer_begin(buf, ctx->priv.tsc, oldidx);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1609,7 +1609,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_kernel_ring_buffer *buf,
        commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
-                                     commit_count, oldidx, tsc);
+                                     commit_count, oldidx, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + config->cb.subbuffer_header_size(),
                        commit_count, cc_hot);
@@ -1627,7 +1627,7 @@ static
 void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf,
                                    struct lttng_kernel_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   u64 tsc)
+                                   const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
@@ -1648,7 +1648,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf,
         * postponed until the commit counter is incremented for the
         * current space reservation.
         */
-       *ts_end = tsc;
+       *ts_end = ctx->priv.tsc;
 
        /*
         * Order all writes to buffer and store to ts_end before the commit
@@ -1667,7 +1667,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_kernel_ring_buffer *buf,
        v_add(config, padding_size, &cc_hot->cc);
        commit_count = v_read(config, &cc_hot->cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
-                                     commit_count, oldidx, tsc);
+                                     commit_count, oldidx, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + padding_size, commit_count,
                        cc_hot);
@@ -1684,14 +1684,14 @@ static
 void lib_ring_buffer_switch_new_start(struct lttng_kernel_ring_buffer *buf,
                                      struct lttng_kernel_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
-                                     u64 tsc)
+                                     const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
        unsigned long beginidx = subbuf_index(offsets->begin, chan);
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot;
 
-       config->cb.buffer_begin(buf, tsc, beginidx);
+       config->cb.buffer_begin(buf, ctx->priv.tsc, beginidx);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1711,7 +1711,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_kernel_ring_buffer *buf,
        commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
-                                     commit_count, beginidx, tsc);
+                                     commit_count, beginidx, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->begin + config->cb.subbuffer_header_size(),
                        commit_count, cc_hot);
@@ -1729,7 +1729,7 @@ static
 void lib_ring_buffer_switch_new_end(struct lttng_kernel_ring_buffer *buf,
                                            struct lttng_kernel_ring_buffer_channel *chan,
                                            struct switch_offsets *offsets,
-                                           u64 tsc)
+                                           const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
        unsigned long endidx, data_size;
@@ -1747,7 +1747,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_kernel_ring_buffer *buf,
         * postponed until the commit counter is incremented for the
         * current space reservation.
         */
-       *ts_end = tsc;
+       *ts_end = ctx->priv.tsc;
 }
 
 /*
@@ -1760,7 +1760,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
                                    struct lttng_kernel_ring_buffer *buf,
                                    struct lttng_kernel_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   u64 *tsc)
+                                   struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
        unsigned long off, reserve_commit_diff;
@@ -1770,7 +1770,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
        offsets->switch_old_start = 0;
        off = subbuf_offset(offsets->begin, chan);
 
-       *tsc = config->cb.ring_buffer_clock_read(chan);
+       ctx->priv.tsc = config->cb.ring_buffer_clock_read(chan);
 
        /*
         * Ensure we flush the header of an empty subbuffer when doing the
@@ -1852,6 +1852,13 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
        offsets->begin = subbuf_align(offsets->begin, chan);
        /* Note: old points to the next subbuf at offset 0 */
        offsets->end = offsets->begin;
+       /*
+        * Populate the records lost counters prior to performing a
+        * sub-buffer switch.
+        */
+       ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full);
+       ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+       ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big);
        return 0;
 }
 
@@ -1867,9 +1874,9 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit
 {
        struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
        const struct lttng_kernel_ring_buffer_config *config = &chan->backend.config;
+       struct lttng_kernel_ring_buffer_ctx ctx;
        struct switch_offsets offsets;
        unsigned long oldidx;
-       u64 tsc;
 
        offsets.size = 0;
 
@@ -1878,7 +1885,7 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit
         */
        do {
                if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
-                                                   &tsc))
+                                                   &ctx))
                        return; /* Switch not needed */
        } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
                 != offsets.old);
@@ -1889,7 +1896,7 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit
         * records, never the opposite (missing a full TSC record when it would
         * be needed).
         */
-       save_last_tsc(config, buf, tsc);
+       save_last_tsc(config, buf, ctx.priv.tsc);
 
        /*
         * Push the reader if necessary
@@ -1903,14 +1910,14 @@ void lib_ring_buffer_switch_slow(struct lttng_kernel_ring_buffer *buf, enum swit
         * May need to populate header start on SWITCH_FLUSH.
         */
        if (offsets.switch_old_start) {
-               lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
+               lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx);
                offsets.old += config->cb.subbuffer_header_size();
        }
 
        /*
         * Switch old subbuffer.
         */
-       lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
+       lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx);
 }
 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
 
@@ -2143,6 +2150,15 @@ retry:
                 */
                offsets->switch_new_end = 1;    /* For offsets->begin */
        }
+       /*
+        * Populate the records lost counters when the space reservation
+        * may cause a sub-buffer switch.
+        */
+       if (offsets->switch_new_end || offsets->switch_old_end) {
+               ctx->priv.records_lost_full = v_read(config, &buf->records_lost_full);
+               ctx->priv.records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+               ctx->priv.records_lost_big = v_read(config, &buf->records_lost_big);
+       }
        return 0;
 }
 
@@ -2219,17 +2235,17 @@ int lib_ring_buffer_reserve_slow(struct lttng_kernel_ring_buffer_ctx *ctx,
        if (unlikely(offsets.switch_old_end)) {
                lib_ring_buffer_clear_noref(config, &buf->backend,
                                            subbuf_index(offsets.old - 1, chan));
-               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->priv.tsc);
+               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx);
        }
 
        /*
         * Populate new subbuffer.
         */
        if (unlikely(offsets.switch_new_start))
-               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->priv.tsc);
+               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx);
 
        if (unlikely(offsets.switch_new_end))
-               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->priv.tsc);
+               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx);
 
        ctx->priv.slot_size = offsets.size;
        ctx->priv.pre_offset = offsets.begin;
@@ -2281,7 +2297,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_co
                                   unsigned long offset,
                                   unsigned long commit_count,
                                   unsigned long idx,
-                                  u64 tsc)
+                                  const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -2341,7 +2357,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_kernel_ring_buffer_co
                config->cb.buffer_end(buf, *ts_end, idx,
                                      lib_ring_buffer_get_data_size(config,
                                                                buf,
-                                                               idx));
+                                                               idx), ctx);
 
                /*
                 * Increment the packet counter while we have exclusive
index e2ef7d7664cd6e577448e79a491834c2146ddebf..0dc8a8d9f5f73393a04afa39efe51db09e226e1d 100644 (file)
@@ -377,7 +377,8 @@ static void client_buffer_begin(struct lttng_kernel_ring_buffer *buf, u64 tsc,
  * subbuffer. data_size is between 1 and subbuf_size.
  */
 static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc,
-                             unsigned int subbuf_idx, unsigned long data_size)
+                             unsigned int subbuf_idx, unsigned long data_size,
+                             const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
        struct packet_header *header =
@@ -391,9 +392,9 @@ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc,
                (uint64_t) data_size * CHAR_BIT;                /* in bits */
        header->ctx.packet_size =
                (uint64_t) PAGE_ALIGN(data_size) * CHAR_BIT;    /* in bits */
-       records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+       records_lost += lib_ring_buffer_get_records_lost_full(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx);
        header->ctx.events_discarded = records_lost;
 }
 
index ccd3a0206f12d1a6c56437302b21de17f350bc2b..8526e05e413fcb1e2b58a75682013ff242b8f1cb 100644 (file)
@@ -96,7 +96,8 @@ static void client_buffer_begin(struct lttng_kernel_ring_buffer *buf, u64 tsc,
  * subbuffer. data_size is between 1 and subbuf_size.
  */
 static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc,
-                             unsigned int subbuf_idx, unsigned long data_size)
+                             unsigned int subbuf_idx, unsigned long data_size,
+                             const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
 }
 
index 763ebaf1274592a3dd5c665491ad81f59fb31c75..86ec78f16abf7af75bc0ed9a3fae5ce66f64766b 100644 (file)
@@ -110,7 +110,8 @@ static void client_buffer_begin(struct lttng_kernel_ring_buffer *buf, u64 tsc,
  * subbuffer. data_size is between 1 and subbuf_size.
  */
 static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc,
-                             unsigned int subbuf_idx, unsigned long data_size)
+                             unsigned int subbuf_idx, unsigned long data_size,
+                             const struct lttng_kernel_ring_buffer_ctx *ctx)
 {
        struct lttng_kernel_ring_buffer_channel *chan = buf->backend.chan;
        struct metadata_packet_header *header =
@@ -125,9 +126,9 @@ static void client_buffer_end(struct lttng_kernel_ring_buffer *buf, u64 tsc,
         * We do not care about the records lost count, because the metadata
         * channel waits and retry.
         */
-       (void) lib_ring_buffer_get_records_lost_full(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+       (void) lib_ring_buffer_get_records_lost_full(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx);
        WARN_ON_ONCE(records_lost != 0);
 }
 
This page took 0.03427 seconds and 4 git commands to generate.