Cleanup: apply `include-what-you-use` guideline for `uint*_t`
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index d822039330853a7bce60a65f0ab6efb502188dcd..7f73d03c0e94712d2e5ce2b5245fb15bb254ecef 100644 (file)
@@ -61,6 +61,7 @@
 #include <signal.h>
 #include <time.h>
 #include <stdbool.h>
+#include <stdint.h>
 #include <urcu/compiler.h>
 #include <urcu/ref.h>
 #include <urcu/tls-compat.h>
@@ -118,7 +119,7 @@ struct switch_offsets {
                     switch_old_end:1;
 };
 
-DEFINE_URCU_TLS_IE(unsigned int, lib_ring_buffer_nesting);
+DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
 
 /*
  * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
@@ -194,6 +195,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
        for (i = 0; i < chan->backend.num_subbuf; i++) {
                struct commit_counters_hot *cc_hot;
                struct commit_counters_cold *cc_cold;
+               uint64_t *ts_end;
 
                cc_hot = shmp_index(handle, buf->commit_hot, i);
                if (!cc_hot)
@@ -201,9 +203,13 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
                cc_cold = shmp_index(handle, buf->commit_cold, i);
                if (!cc_cold)
                        return;
+               ts_end = shmp_index(handle, buf->ts_end, i);
+               if (!ts_end)
+                       return;
                v_set(config, &cc_hot->cc, 0);
                v_set(config, &cc_hot->seq, 0);
                v_set(config, &cc_cold->cc_sb, 0);
+               *ts_end = 0;
        }
        uatomic_set(&buf->consumed, 0);
        uatomic_set(&buf->record_disabled, 0);
@@ -368,6 +374,16 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
                goto free_commit;
        }
 
+       align_shm(shmobj, __alignof__(uint64_t));
+       set_shmp(buf->ts_end,
+                zalloc_shm(shmobj,
+                       sizeof(uint64_t) * chan->backend.num_subbuf));
+       if (!shmp(handle, buf->ts_end)) {
+               ret = -ENOMEM;
+               goto free_commit_cold;
+       }
+
+
        ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
                        cpu, handle, shmobj);
        if (ret) {
@@ -414,6 +430,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
 
        /* Error handling */
 free_init:
+       /* ts_end will be freed by shm teardown */
+free_commit_cold:
        /* commit_cold will be freed by shm teardown */
 free_commit:
        /* commit_hot will be freed by shm teardown */
@@ -1795,15 +1813,29 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
        unsigned long commit_count, padding_size, data_size;
        struct commit_counters_hot *cc_hot;
+       uint64_t *ts_end;
 
        data_size = subbuf_offset(offsets->old - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
        subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
                                handle);
 
+       ts_end = shmp_index(handle, buf->ts_end, oldidx);
+       if (!ts_end)
+               return;
        /*
-        * Order all writes to buffer before the commit count update that will
-        * determine that the subbuffer is full.
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
+
+       /*
+        * Order all writes to buffer and store to ts_end before the commit
+        * count update that will determine that the subbuffer is full.
         */
        cmm_smp_wmb();
        cc_hot = shmp_index(handle, buf->commit_hot, oldidx);
@@ -1874,11 +1906,24 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long endidx, data_size;
+       uint64_t *ts_end;
 
        endidx = subbuf_index(offsets->end - 1, chan);
        data_size = subbuf_offset(offsets->end - 1, chan) + 1;
        subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
                                handle);
+       ts_end = shmp_index(handle, buf->ts_end, endidx);
+       if (!ts_end)
+               return;
+       /*
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
 }
 
 /*
@@ -1996,9 +2041,11 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
  * Force a sub-buffer switch. This operation is completely reentrant : can be
  * called while tracing is active with absolutely no lock held.
  *
- * Note, however, that as a v_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
+ * For RING_BUFFER_SYNC_PER_CPU ring buffers, as a v_cmpxchg is used for
+ * some atomic operations, this function must be called from the CPU
+ * which owns the buffer for a ACTIVE flush. However, for
+ * RING_BUFFER_SYNC_GLOBAL ring buffers, this function can be called
+ * from any CPU.
  */
 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
                                 struct lttng_ust_shm_handle *handle)
@@ -2443,14 +2490,26 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_c
        if (caa_likely(v_cmpxchg(config, &cc_cold->cc_sb,
                                 old_commit_count, old_commit_count + 1)
                   == old_commit_count)) {
+               uint64_t *ts_end;
+
                /*
                 * Start of exclusive subbuffer access. We are
                 * guaranteed to be the last writer in this subbuffer
                 * and any other writer trying to access this subbuffer
                 * in this state is required to drop records.
+                *
+                * We can read the ts_end for the current sub-buffer
+                * which has been saved by the very last space
+                * reservation for the current sub-buffer.
+                *
+                * Order increment of commit counter before reading ts_end.
                 */
+               cmm_smp_mb();
+               ts_end = shmp_index(handle, buf->ts_end, idx);
+               if (!ts_end)
+                       return;
                deliver_count_events(config, buf, idx, handle);
-               config->cb.buffer_end(buf, tsc, idx,
+               config->cb.buffer_end(buf, *ts_end, idx,
                                      lib_ring_buffer_get_data_size(config,
                                                                buf,
                                                                idx,
This page took 0.025223 seconds and 4 git commands to generate.