Calculate context length outside of retry loop
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
index c5123a6415e39afc75cc55b8208b9b6924fbf975..310752506550bda15e03d6d57eff5d5f7a43398c 100644 (file)
@@ -247,7 +247,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
                                   * chan->backend.num_subbuf,
                                   1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(max(cpu, 0)));
+                       GFP_KERNEL | __GFP_NOWARN,
+                       cpu_to_node(max(cpu, 0)));
        if (!buf->commit_hot) {
                ret = -ENOMEM;
                goto free_chanbuf;
@@ -257,7 +258,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
                                   * chan->backend.num_subbuf,
                                   1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(max(cpu, 0)));
+                       GFP_KERNEL | __GFP_NOWARN,
+                       cpu_to_node(max(cpu, 0)));
        if (!buf->commit_cold) {
                ret = -ENOMEM;
                goto free_commit;
@@ -449,7 +451,81 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
        buf->read_timer_enabled = 0;
 }
 
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+
+enum cpuhp_state lttng_rb_hp_prepare;
+enum cpuhp_state lttng_rb_hp_online;
+
+void lttng_rb_set_hp_prepare(enum cpuhp_state val)
+{
+       lttng_rb_hp_prepare = val;
+}
+EXPORT_SYMBOL_GPL(lttng_rb_set_hp_prepare);
+
+void lttng_rb_set_hp_online(enum cpuhp_state val)
+{
+       lttng_rb_hp_online = val;
+}
+EXPORT_SYMBOL_GPL(lttng_rb_set_hp_online);
+
+int lttng_cpuhp_rb_frontend_dead(unsigned int cpu,
+               struct lttng_cpuhp_node *node)
+{
+       struct channel *chan = container_of(node, struct channel,
+                                           cpuhp_prepare);
+       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+       /*
+        * Performing a buffer switch on a remote CPU. Performed by
+        * the CPU responsible for doing the hotunplug after the target
+        * CPU stopped running completely. Ensures that all data
+        * from that remote CPU is flushed.
+        */
+       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+       return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_dead);
+
+int lttng_cpuhp_rb_frontend_online(unsigned int cpu,
+               struct lttng_cpuhp_node *node)
+{
+       struct channel *chan = container_of(node, struct channel,
+                                           cpuhp_online);
+       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+       wake_up_interruptible(&chan->hp_wait);
+       lib_ring_buffer_start_switch_timer(buf);
+       lib_ring_buffer_start_read_timer(buf);
+       return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_online);
+
+int lttng_cpuhp_rb_frontend_offline(unsigned int cpu,
+               struct lttng_cpuhp_node *node)
+{
+       struct channel *chan = container_of(node, struct channel,
+                                           cpuhp_online);
+       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+       lib_ring_buffer_stop_switch_timer(buf);
+       lib_ring_buffer_stop_read_timer(buf);
+       return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline);
+
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
 #ifdef CONFIG_HOTPLUG_CPU
+
 /**
  *     lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
  *     @nb: notifier block
@@ -505,8 +581,11 @@ int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
                return NOTIFY_DONE;
        }
 }
+
 #endif
 
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
 #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
 /*
  * For per-cpu buffers, call the reader wakeups before switching the buffer, so
@@ -595,7 +674,6 @@ void notrace lib_ring_buffer_tick_nohz_restart(void)
 static void channel_unregister_notifiers(struct channel *chan)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
-       int cpu;
 
        channel_iterator_unregister_notifiers(chan);
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
@@ -612,25 +690,42 @@ static void channel_unregister_notifiers(struct channel *chan)
                 * concurrency.
                 */
 #endif /* CONFIG_NO_HZ */
-#ifdef CONFIG_HOTPLUG_CPU
-               get_online_cpus();
-               chan->cpu_hp_enable = 0;
-               for_each_online_cpu(cpu) {
-                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
-                                                             cpu);
-                       lib_ring_buffer_stop_switch_timer(buf);
-                       lib_ring_buffer_stop_read_timer(buf);
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+               {
+                       int ret;
+
+                       ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
+                               &chan->cpuhp_online.node);
+                       WARN_ON(ret);
+                       ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
+                               &chan->cpuhp_prepare.node);
+                       WARN_ON(ret);
                }
-               put_online_cpus();
-               unregister_cpu_notifier(&chan->cpu_hp_notifier);
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+               {
+                       int cpu;
+
+#ifdef CONFIG_HOTPLUG_CPU
+                       get_online_cpus();
+                       chan->cpu_hp_enable = 0;
+                       for_each_online_cpu(cpu) {
+                               struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                                     cpu);
+                               lib_ring_buffer_stop_switch_timer(buf);
+                               lib_ring_buffer_stop_read_timer(buf);
+                       }
+                       put_online_cpus();
+                       unregister_cpu_notifier(&chan->cpu_hp_notifier);
 #else
-               for_each_possible_cpu(cpu) {
-                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
-                                                             cpu);
-                       lib_ring_buffer_stop_switch_timer(buf);
-                       lib_ring_buffer_stop_read_timer(buf);
-               }
+                       for_each_possible_cpu(cpu) {
+                               struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                                     cpu);
+                               lib_ring_buffer_stop_switch_timer(buf);
+                               lib_ring_buffer_stop_read_timer(buf);
+                       }
 #endif
+               }
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
        } else {
                struct lib_ring_buffer *buf = chan->backend.buf;
 
@@ -731,7 +826,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
                   size_t num_subbuf, unsigned int switch_timer_interval,
                   unsigned int read_timer_interval)
 {
-       int ret, cpu;
+       int ret;
        struct channel *chan;
 
        if (lib_ring_buffer_check_config(config, switch_timer_interval,
@@ -759,6 +854,56 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
        init_waitqueue_head(&chan->hp_wait);
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+               chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND;
+               ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare,
+                       &chan->cpuhp_prepare.node);
+               if (ret)
+                       goto cpuhp_prepare_error;
+
+               chan->cpuhp_online.component = LTTNG_RING_BUFFER_FRONTEND;
+               ret = cpuhp_state_add_instance(lttng_rb_hp_online,
+                       &chan->cpuhp_online.node);
+               if (ret)
+                       goto cpuhp_online_error;
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+               {
+                       int cpu;
+                       /*
+                        * In case of non-hotplug cpu, if the ring-buffer is allocated
+                        * in early initcall, it will not be notified of secondary cpus.
+                        * In that off case, we need to allocate for all possible cpus.
+                        */
+#ifdef CONFIG_HOTPLUG_CPU
+                       chan->cpu_hp_notifier.notifier_call =
+                                       lib_ring_buffer_cpu_hp_callback;
+                       chan->cpu_hp_notifier.priority = 6;
+                       register_cpu_notifier(&chan->cpu_hp_notifier);
+
+                       get_online_cpus();
+                       for_each_online_cpu(cpu) {
+                               struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                                      cpu);
+                               spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+                               lib_ring_buffer_start_switch_timer(buf);
+                               lib_ring_buffer_start_read_timer(buf);
+                               spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+                       }
+                       chan->cpu_hp_enable = 1;
+                       put_online_cpus();
+#else
+                       for_each_possible_cpu(cpu) {
+                               struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                                     cpu);
+                               spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+                               lib_ring_buffer_start_switch_timer(buf);
+                               lib_ring_buffer_start_read_timer(buf);
+                               spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+                       }
+#endif
+               }
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
 #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
                /* Only benefit from NO_HZ idle with per-cpu buffers for now. */
                chan->tick_nohz_notifier.notifier_call =
@@ -768,38 +913,6 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
                                       &chan->tick_nohz_notifier);
 #endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
 
-               /*
-                * In case of non-hotplug cpu, if the ring-buffer is allocated
-                * in early initcall, it will not be notified of secondary cpus.
-                * In that off case, we need to allocate for all possible cpus.
-                */
-#ifdef CONFIG_HOTPLUG_CPU
-               chan->cpu_hp_notifier.notifier_call =
-                               lib_ring_buffer_cpu_hp_callback;
-               chan->cpu_hp_notifier.priority = 6;
-               register_cpu_notifier(&chan->cpu_hp_notifier);
-
-               get_online_cpus();
-               for_each_online_cpu(cpu) {
-                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
-                                                              cpu);
-                       spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
-                       lib_ring_buffer_start_switch_timer(buf);
-                       lib_ring_buffer_start_read_timer(buf);
-                       spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
-               }
-               chan->cpu_hp_enable = 1;
-               put_online_cpus();
-#else
-               for_each_possible_cpu(cpu) {
-                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
-                                                             cpu);
-                       spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
-                       lib_ring_buffer_start_switch_timer(buf);
-                       lib_ring_buffer_start_read_timer(buf);
-                       spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
-               }
-#endif
        } else {
                struct lib_ring_buffer *buf = chan->backend.buf;
 
@@ -809,6 +922,13 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
 
        return chan;
 
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+cpuhp_online_error:
+       ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
+                       &chan->cpuhp_prepare.node);
+       WARN_ON(ret);
+cpuhp_prepare_error:
+#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
 error_free_backend:
        channel_backend_free(&chan->backend);
 error:
@@ -999,6 +1119,37 @@ nodata:
 }
 EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
 
+/**
+ * Performs the same function as lib_ring_buffer_snapshot(), but the positions
+ * are saved regardless of whether the consumed and produced positions are
+ * in the same subbuffer.
+ * @buf: ring buffer
+ * @consumed: consumed byte count indicating the last position read
+ * @produced: produced byte count indicating the last position written
+ *
+ * This function is meant to provide information on the exact producer and
+ * consumer positions without regard for the "snapshot" feature.
+ */
+int lib_ring_buffer_snapshot_sample_positions(struct lib_ring_buffer *buf,
+               unsigned long *consumed, unsigned long *produced)
+{
+       struct channel *chan = buf->backend.chan;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       smp_rmb();
+       *consumed = atomic_long_read(&buf->consumed);
+       /*
+        * No need to issue a memory barrier between consumed count read and
+        * write offset read, because consumed count can only change
+        * concurrently in overwrite mode, and we keep a sequence counter
+        * identifier derived from the write offset to check we are getting
+        * the same sub-buffer we are expecting (the sub-buffers are atomically
+        * "tagged" upon writes, tags are checked upon read).
+        */
+       *produced = v_read(config, &buf->offset);
+       return 0;
+}
+
 /**
  * lib_ring_buffer_put_snapshot - move consumed counter forward
  *
@@ -1344,8 +1495,7 @@ void lib_ring_buffer_print_errors(struct channel *chan,
 /*
  * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
  *
- * Only executed by SWITCH_FLUSH, which can be issued while tracing is active
- * or at buffer finalization (destroy).
+ * Only executed when the buffer is finalized, in SWITCH_FLUSH.
  */
 static
 void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
@@ -1356,6 +1506,7 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long oldidx = subbuf_index(offsets->old, chan);
        unsigned long commit_count;
+       struct commit_counters_hot *cc_hot;
 
        config->cb.buffer_begin(buf, tsc, oldidx);
 
@@ -1372,15 +1523,15 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
                barrier();
        } else
                smp_wmb();
-       v_add(config, config->cb.subbuffer_header_size(),
-             &buf->commit_hot[oldidx].cc);
-       commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
+       cc_hot = &buf->commit_hot[oldidx];
+       v_add(config, config->cb.subbuffer_header_size(), &cc_hot->cc);
+       commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
                                      commit_count, oldidx, tsc);
-       lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
+       lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + config->cb.subbuffer_header_size(),
-                       commit_count);
+                       commit_count, cc_hot);
 }
 
 /*
@@ -1400,6 +1551,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
        unsigned long commit_count, padding_size, data_size;
+       struct commit_counters_hot *cc_hot;
 
        data_size = subbuf_offset(offsets->old - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
@@ -1418,12 +1570,14 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
                barrier();
        } else
                smp_wmb();
-       v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
-       commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
+       cc_hot = &buf->commit_hot[oldidx];
+       v_add(config, padding_size, &cc_hot->cc);
+       commit_count = v_read(config, &cc_hot->cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
                                      commit_count, oldidx, tsc);
-       lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
-                       offsets->old + padding_size, commit_count);
+       lib_ring_buffer_write_commit_counter(config, buf, chan,
+                       offsets->old + padding_size, commit_count,
+                       cc_hot);
 }
 
 /*
@@ -1442,6 +1596,7 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long beginidx = subbuf_index(offsets->begin, chan);
        unsigned long commit_count;
+       struct commit_counters_hot *cc_hot;
 
        config->cb.buffer_begin(buf, tsc, beginidx);
 
@@ -1458,15 +1613,15 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
                barrier();
        } else
                smp_wmb();
-       v_add(config, config->cb.subbuffer_header_size(),
-             &buf->commit_hot[beginidx].cc);
-       commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
+       cc_hot = &buf->commit_hot[beginidx];
+       v_add(config, config->cb.subbuffer_header_size(), &cc_hot->cc);
+       commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
                                      commit_count, beginidx, tsc);
-       lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
+       lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->begin + config->cb.subbuffer_header_size(),
-                       commit_count);
+                       commit_count, cc_hot);
 }
 
 /*
@@ -1536,14 +1691,12 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
                unsigned long sb_index, commit_count;
 
                /*
-                * We are performing a SWITCH_FLUSH. There may be concurrent
-                * writes into the buffer if e.g. invoked while performing a
-                * snapshot on an active trace.
+                * We are performing a SWITCH_FLUSH. At this stage, there are no
+                * concurrent writes into the buffer.
                 *
-                * If the client does not save any header information (sub-buffer
-                * header size == 0), don't switch empty subbuffer on finalize,
-                * because it is invalid to deliver a completely empty
-                * subbuffer.
+                * The client does not save any header information.  Don't
+                * switch empty subbuffer on finalize, because it is invalid to
+                * deliver a completely empty subbuffer.
                 */
                if (!config->cb.subbuffer_header_size())
                        return -1;
@@ -1708,6 +1861,7 @@ static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
        put_online_cpus();
 }
 
+/* Switch sub-buffer if current sub-buffer is non-empty. */
 void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
 {
        _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
@@ -1732,7 +1886,8 @@ static
 int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                     struct channel *chan,
                                     struct switch_offsets *offsets,
-                                    struct lib_ring_buffer_ctx *ctx)
+                                    struct lib_ring_buffer_ctx *ctx,
+                                    void *client_ctx)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long reserve_commit_diff, offset_cmp;
@@ -1758,7 +1913,7 @@ retry:
                offsets->size = config->cb.record_header_size(config, chan,
                                                offsets->begin,
                                                &offsets->pre_header_padding,
-                                               ctx);
+                                               ctx, client_ctx);
                offsets->size +=
                        lib_ring_buffer_align(offsets->begin + offsets->size,
                                              ctx->largest_align)
@@ -1842,7 +1997,7 @@ retry:
                        config->cb.record_header_size(config, chan,
                                                offsets->begin,
                                                &offsets->pre_header_padding,
-                                               ctx);
+                                               ctx, client_ctx);
                offsets->size +=
                        lib_ring_buffer_align(offsets->begin + offsets->size,
                                              ctx->largest_align)
@@ -1906,7 +2061,8 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_lost_event_too_big);
  * -EIO for other errors, else returns 0.
  * It will take care of sub-buffer switching.
  */
-int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
+int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx,
+               void *client_ctx)
 {
        struct channel *chan = ctx->chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
@@ -1919,7 +2075,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
 
        do {
                ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
-                                                      ctx);
+                                                      ctx, client_ctx);
                if (unlikely(ret))
                        return ret;
        } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
@@ -1980,6 +2136,33 @@ void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *c
                v_set(config, &buf->commit_hot[idx].seq, commit_count);
 }
 
+/*
+ * The ring buffer can count events recorded and overwritten per buffer,
+ * but it is disabled by default due to its performance overhead.
+ */
+#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS
+static
+void deliver_count_events(const struct lib_ring_buffer_config *config,
+               struct lib_ring_buffer *buf,
+               unsigned long idx)
+{
+       v_add(config, subbuffer_get_records_count(config,
+                       &buf->backend, idx),
+               &buf->records_count);
+       v_add(config, subbuffer_count_records_overrun(config,
+                       &buf->backend, idx),
+               &buf->records_overrun);
+}
+#else /* LTTNG_RING_BUFFER_COUNT_EVENTS */
+static
+void deliver_count_events(const struct lib_ring_buffer_config *config,
+               struct lib_ring_buffer *buf,
+               unsigned long idx)
+{
+}
+#endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */
+
+
 void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *config,
                                   struct lib_ring_buffer *buf,
                                   struct channel *chan,
@@ -2032,15 +2215,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con
                 * and any other writer trying to access this subbuffer
                 * in this state is required to drop records.
                 */
-               v_add(config,
-                     subbuffer_get_records_count(config,
-                                                 &buf->backend, idx),
-                     &buf->records_count);
-               v_add(config,
-                     subbuffer_count_records_overrun(config,
-                                                     &buf->backend,
-                                                     idx),
-                     &buf->records_overrun);
+               deliver_count_events(config, buf, idx);
                config->cb.buffer_end(buf, tsc, idx,
                                      lib_ring_buffer_get_data_size(config,
                                                                buf,
This page took 0.031297 seconds and 4 git commands to generate.