Fix: uninitialized variable in lib_ring_buffer_channel_switch_timer_start
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index d822039330853a7bce60a65f0ab6efb502188dcd..45c0659bfafc8bcdb83077babd00e06068d5bf74 100644 (file)
@@ -51,7 +51,6 @@
  *   - put_subbuf
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -61,6 +60,7 @@
 #include <signal.h>
 #include <time.h>
 #include <stdbool.h>
+#include <stdint.h>
 #include <urcu/compiler.h>
 #include <urcu/ref.h>
 #include <urcu/tls-compat.h>
@@ -118,7 +118,7 @@ struct switch_offsets {
                     switch_old_end:1;
 };
 
-DEFINE_URCU_TLS_IE(unsigned int, lib_ring_buffer_nesting);
+DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
 
 /*
  * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
@@ -194,6 +194,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
        for (i = 0; i < chan->backend.num_subbuf; i++) {
                struct commit_counters_hot *cc_hot;
                struct commit_counters_cold *cc_cold;
+               uint64_t *ts_end;
 
                cc_hot = shmp_index(handle, buf->commit_hot, i);
                if (!cc_hot)
@@ -201,9 +202,13 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
                cc_cold = shmp_index(handle, buf->commit_cold, i);
                if (!cc_cold)
                        return;
+               ts_end = shmp_index(handle, buf->ts_end, i);
+               if (!ts_end)
+                       return;
                v_set(config, &cc_hot->cc, 0);
                v_set(config, &cc_hot->seq, 0);
                v_set(config, &cc_cold->cc_sb, 0);
+               *ts_end = 0;
        }
        uatomic_set(&buf->consumed, 0);
        uatomic_set(&buf->record_disabled, 0);
@@ -368,6 +373,16 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
                goto free_commit;
        }
 
+       align_shm(shmobj, __alignof__(uint64_t));
+       set_shmp(buf->ts_end,
+                zalloc_shm(shmobj,
+                       sizeof(uint64_t) * chan->backend.num_subbuf));
+       if (!shmp(handle, buf->ts_end)) {
+               ret = -ENOMEM;
+               goto free_commit_cold;
+       }
+
+
        ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
                        cpu, handle, shmobj);
        if (ret) {
@@ -414,6 +429,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
 
        /* Error handling */
 free_init:
+       /* ts_end will be freed by shm teardown */
+free_commit_cold:
        /* commit_cold will be freed by shm teardown */
 free_commit:
        /* commit_hot will be freed by shm teardown */
@@ -787,6 +804,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
 
        lib_ring_buffer_setup_timer_thread();
 
+       memset(&sev, 0, sizeof(sev));
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
        sev.sigev_value.sival_ptr = chan;
@@ -1005,11 +1023,11 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 
        /* Calculate the shm allocation layout */
        shmsize = sizeof(struct channel);
-       shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
+       shmsize += lttng_ust_offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
        shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams;
        chansize = shmsize;
        if (priv_data_align)
-               shmsize += offset_align(shmsize, priv_data_align);
+               shmsize += lttng_ust_offset_align(shmsize, priv_data_align);
        shmsize += priv_data_size;
 
        /* Allocate normal memory for channel (not shared) */
@@ -1137,7 +1155,7 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
  * Call "destroy" callback, finalize channels, decrement the channel
  * reference count. Note that when readers have completed data
  * consumption of finalized channels, get_subbuf() will return -ENODATA.
- * They should release their handle at that point. 
+ * They should release their handle at that point.
  */
 void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
                int consumer)
@@ -1795,15 +1813,29 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
        unsigned long commit_count, padding_size, data_size;
        struct commit_counters_hot *cc_hot;
+       uint64_t *ts_end;
 
        data_size = subbuf_offset(offsets->old - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
        subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
                                handle);
 
+       ts_end = shmp_index(handle, buf->ts_end, oldidx);
+       if (!ts_end)
+               return;
        /*
-        * Order all writes to buffer before the commit count update that will
-        * determine that the subbuffer is full.
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
+
+       /*
+        * Order all writes to buffer and store to ts_end before the commit
+        * count update that will determine that the subbuffer is full.
         */
        cmm_smp_wmb();
        cc_hot = shmp_index(handle, buf->commit_hot, oldidx);
@@ -1874,11 +1906,24 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long endidx, data_size;
+       uint64_t *ts_end;
 
        endidx = subbuf_index(offsets->end - 1, chan);
        data_size = subbuf_offset(offsets->end - 1, chan) + 1;
        subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
                                handle);
+       ts_end = shmp_index(handle, buf->ts_end, endidx);
+       if (!ts_end)
+               return;
+       /*
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
 }
 
 /*
@@ -1996,9 +2041,11 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
  * Force a sub-buffer switch. This operation is completely reentrant : can be
  * called while tracing is active with absolutely no lock held.
  *
- * Note, however, that as a v_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
+ * For RING_BUFFER_SYNC_PER_CPU ring buffers, as a v_cmpxchg is used for
+ * some atomic operations, this function must be called from the CPU
+ * which owns the buffer for a ACTIVE flush. However, for
+ * RING_BUFFER_SYNC_GLOBAL ring buffers, this function can be called
+ * from any CPU.
  */
 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
                                 struct lttng_ust_shm_handle *handle)
@@ -2443,14 +2490,26 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_c
        if (caa_likely(v_cmpxchg(config, &cc_cold->cc_sb,
                                 old_commit_count, old_commit_count + 1)
                   == old_commit_count)) {
+               uint64_t *ts_end;
+
                /*
                 * Start of exclusive subbuffer access. We are
                 * guaranteed to be the last writer in this subbuffer
                 * and any other writer trying to access this subbuffer
                 * in this state is required to drop records.
+                *
+                * We can read the ts_end for the current sub-buffer
+                * which has been saved by the very last space
+                * reservation for the current sub-buffer.
+                *
+                * Order increment of commit counter before reading ts_end.
                 */
+               cmm_smp_mb();
+               ts_end = shmp_index(handle, buf->ts_end, idx);
+               if (!ts_end)
+                       return;
                deliver_count_events(config, buf, idx, handle);
-               config->cb.buffer_end(buf, tsc, idx,
+               config->cb.buffer_end(buf, *ts_end, idx,
                                      lib_ring_buffer_get_data_size(config,
                                                                buf,
                                                                idx,
This page took 0.026335 seconds and 4 git commands to generate.