Update version to 0.16
[ust.git] / libust / buffers.h
index a2f17a9f78d01e83f3e1ee2a5d4e7eb40e92f081..850b9355017299ac3bc38c265f2ac86ce68aafe3 100644 (file)
@@ -1,23 +1,37 @@
 /*
  * buffers.h
+ * LTTng userspace tracer buffering system
  *
  * Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
  * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
  *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
  */
 
 #ifndef _UST_BUFFERS_H
 #define _UST_BUFFERS_H
 
-#include <kcompat/kref.h>
 #include <assert.h>
+
+#include <ust/core.h>
+#include <ust/clock.h>
+
+#include "usterr_signal_safe.h"
 #include "channels.h"
 #include "tracerconst.h"
 #include "tracercore.h"
-#include "header-inline.h"
-#include <usterr.h>
-
-/***** SHOULD BE REMOVED ***** */
 
 /*
  * BUFFER_TRUNC zeroes the subbuffer offset and the subbuffer number parts of
 
 /**************************************/
 
+/*
+ * TODO: using "long" type for struct ust_buffer (control structure
+ * shared between traced apps and the consumer) is a very bad idea when
+ * we get to systems with mixed 32/64-bit processes.
+ *
+ * But on 64-bit system, we want the full power of 64-bit counters,
+ * which wraps less often. Therefore, it's not as easy as "use 32-bit
+ * types everywhere".
+ *
+ * One way to deal with this is to:
+ * 1) Design 64-bit consumer so it can detect 32-bit and 64-bit apps.
+ * 2) The 32-bit consumer only supports 32-bit apps.
+ */
+
 struct commit_counters {
-       local_t cc;
-       local_t cc_sb;                  /* Incremented _once_ at sb switch */
+       long cc;                        /* ATOMIC */
+       long cc_sb;                     /* ATOMIC - Incremented _once_ at sb switch */
 };
 
 struct ust_buffer {
        /* First 32 bytes cache-hot cacheline */
-       local_t offset;                 /* Current offset in the buffer */
+       long offset;                    /* Current offset in the buffer *atomic* */
        struct commit_counters *commit_count;   /* Commit count per sub-buffer */
-       atomic_long_t consumed;         /*
-                                        * Current offset in the buffer
-                                        * standard atomic access (shared)
-                                        */
+       long consumed;                  /* Current offset in the buffer *atomic* access (shared) */
        unsigned long last_tsc;         /*
                                         * Last timestamp written in the buffer.
                                         */
        /* End of first 32 bytes cacheline */
-       atomic_long_t active_readers;   /*
-                                        * Active readers count
-                                        * standard atomic access (shared)
-                                        */
-       local_t events_lost;
-       local_t corrupted_subbuffers;
+       long active_readers;    /* ATOMIC - Active readers count standard atomic access (shared) */
+       long events_lost;       /* ATOMIC */
+       long corrupted_subbuffers; /* *ATOMIC* */
        /* one byte is written to this pipe when data is available, in order
            to wake the consumer */
        /* portability: Single byte writes must be as quick as possible. The kernel-side
@@ -72,6 +94,11 @@ struct ust_buffer {
        int data_ready_fd_write;
        /* the reading end of the pipe */
        int data_ready_fd_read;
+       /*
+        * List of buffers with an open pipe, used for fork and forced subbuffer
+        * switch.
+        */
+       struct cds_list_head open_buffers_list;
 
        unsigned int finalized;
 //ust//        struct timer_list switch_timer; /* timer for periodical switch */
@@ -79,31 +106,67 @@ struct ust_buffer {
 
        struct ust_channel *chan;
 
-       struct kref kref;
+       struct urcu_ref urcu_ref;
        void *buf_data;
        size_t buf_size;
        int shmid;
        unsigned int cpu;
 
        /* commit count per subbuffer; must be at end of struct */
-       local_t commit_seq[0] ____cacheline_aligned;
+       long commit_seq[0]; /* ATOMIC */
 } ____cacheline_aligned;
 
 /*
- * A switch is done during tracing or as a final flush after tracing (so it
- * won't write in the new sub-buffer).
- * FIXME: make this message clearer
+ * A switch is either done during tracing (FORCE_ACTIVE) or as a final
+ * flush after tracing (with FORCE_FLUSH). FORCE_FLUSH ensures we won't
+ * write in the new sub-buffer).
  */
 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
 
-extern int ltt_reserve_slot_lockless_slow(struct ust_trace *trace,
-               struct ust_channel *ltt_channel, void **transport_data,
-               size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
-               unsigned int *rflags, int largest_align, int cpu);
+extern int ltt_reserve_slot_lockless_slow(struct ust_channel *chan,
+               struct ust_trace *trace, size_t data_size,
+               int largest_align, int cpu,
+               struct ust_buffer **ret_buf,
+               size_t *slot_size, long *buf_offset,
+               u64 *tsc, unsigned int *rflags);
 
 extern void ltt_force_switch_lockless_slow(struct ust_buffer *buf,
                enum force_switch_mode mode);
 
+#ifndef HAVE_EFFICIENT_UNALIGNED_ACCESS
+
+/*
+ * Calculate the offset needed to align the type.
+ * size_of_type must be non-zero.
+ */
+static inline unsigned int ltt_align(size_t align_drift, size_t size_of_type)
+{
+       size_t alignment = min(sizeof(void *), size_of_type);
+       return (alignment - align_drift) & (alignment - 1);
+}
+/* Default arch alignment */
+#define LTT_ALIGN
+
+static inline int ltt_get_alignment(void)
+{
+       return sizeof(void *);
+}
+
+#else /* HAVE_EFFICIENT_UNALIGNED_ACCESS */
+
+static inline unsigned int ltt_align(size_t align_drift,
+                size_t size_of_type)
+{
+       return 0;
+}
+
+#define LTT_ALIGN __attribute__((packed))
+
+static inline int ltt_get_alignment(void)
+{
+       return 0;
+}
+#endif /* HAVE_EFFICIENT_UNALIGNED_ACCESS */
 
 static __inline__ void ust_buffers_do_copy(void *dest, const void *src, size_t len)
 {
@@ -176,6 +239,63 @@ static __inline__ int last_tsc_overflow(struct ust_buffer *ltt_buf,
 }
 #endif
 
+/*
+ * ust_get_header_size
+ *
+ * Calculate alignment offset to 32-bits. This is the alignment offset of the
+ * event header.
+ *
+ * Important note :
+ * The event header must be 32-bits. The total offset calculated here :
+ *
+ * Alignment of header struct on 32 bits (min arch size, header size)
+ * + sizeof(header struct)  (32-bits)
+ * + (opt) u16 (ext. event id)
+ * + (opt) u16 (event_size) (if event_size == 0xFFFFUL, has ext. event size)
+ * + (opt) u32 (ext. event size)
+ * + (opt) u64 full TSC (aligned on min(64-bits, arch size))
+ *
+ * The payload must itself determine its own alignment from the biggest type it
+ * contains.
+ * */
+static __inline__ unsigned char ust_get_header_size(
+               struct ust_channel *channel,
+               size_t offset,
+               size_t data_size,
+               size_t *before_hdr_pad,
+               unsigned int rflags)
+{
+       size_t orig_offset = offset;
+       size_t padding;
+
+       padding = ltt_align(offset, sizeof(struct ltt_event_header));
+       offset += padding;
+       offset += sizeof(struct ltt_event_header);
+
+       if(unlikely(rflags)) {
+               switch (rflags) {
+               case LTT_RFLAG_ID_SIZE_TSC:
+                       offset += sizeof(u16) + sizeof(u16);
+                       if (data_size >= 0xFFFFU)
+                               offset += sizeof(u32);
+                       offset += ltt_align(offset, sizeof(u64));
+                       offset += sizeof(u64);
+                       break;
+               case LTT_RFLAG_ID_SIZE:
+                       offset += sizeof(u16) + sizeof(u16);
+                       if (data_size >= 0xFFFFU)
+                               offset += sizeof(u32);
+                       break;
+               case LTT_RFLAG_ID:
+                       offset += sizeof(u16);
+                       break;
+               }
+       }
+
+       *before_hdr_pad = padding;
+       return offset - orig_offset;
+}
+
 static __inline__ void ltt_reserve_push_reader(
                struct ust_channel *rchan,
                struct ust_buffer *buf,
@@ -184,7 +304,7 @@ static __inline__ void ltt_reserve_push_reader(
        long consumed_old, consumed_new;
 
        do {
-               consumed_old = atomic_long_read(&buf->consumed);
+               consumed_old = uatomic_read(&buf->consumed);
                /*
                 * If buffer is in overwrite mode, push the reader consumed
                 * count if the write position has reached it and we are not
@@ -202,7 +322,7 @@ static __inline__ void ltt_reserve_push_reader(
                        consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
                else
                        return;
-       } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
+       } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
                        consumed_new) != consumed_old));
 }
 
@@ -210,7 +330,7 @@ static __inline__ void ltt_vmcore_check_deliver(
                struct ust_buffer *buf,
                long commit_count, long idx)
 {
-       local_set(&buf->commit_seq[idx], commit_count);
+       uatomic_set(&buf->commit_seq[idx], commit_count);
 }
 
 static __inline__ void ltt_check_deliver(struct ust_channel *chan,
@@ -230,7 +350,7 @@ static __inline__ void ltt_check_deliver(struct ust_channel *chan,
                 * value without adding a add_return atomic operation to the
                 * fast path.
                 */
-               if (likely(local_cmpxchg(&buf->commit_count[idx].cc_sb,
+               if (likely(uatomic_cmpxchg(&buf->commit_count[idx].cc_sb,
                                         old_commit_count, commit_count)
                           == old_commit_count)) {
                        int result;
@@ -255,16 +375,16 @@ static __inline__ int ltt_poll_deliver(struct ust_channel *chan, struct ust_buff
 {
        long consumed_old, consumed_idx, commit_count, write_offset;
 
-       consumed_old = atomic_long_read(&buf->consumed);
+       consumed_old = uatomic_read(&buf->consumed);
        consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
-       commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
+       commit_count = uatomic_read(&buf->commit_count[consumed_idx].cc_sb);
        /*
-        * No memory barrier here, since we are only interested
+        * No memory cmm_barrier here, since we are only interested
         * in a statistically correct polling result. The next poll will
         * get the data is we are racing. The mb() that ensures correct
         * memory order is in get_subbuf.
         */
-       write_offset = local_read(&buf->offset);
+       write_offset = uatomic_read(&buf->offset);
 
        /*
         * Check that the subbuffer we are trying to consume has been
@@ -302,17 +422,11 @@ static __inline__ int ltt_relay_try_reserve(
                long *o_begin, long *o_end, long *o_old,
                size_t *before_hdr_pad, size_t *size)
 {
-       *o_begin = local_read(&buf->offset);
+       *o_begin = uatomic_read(&buf->offset);
        *o_old = *o_begin;
 
        *tsc = trace_clock_read64();
 
-//ust// #ifdef CONFIG_LTT_VMCORE
-//ust//        prefetch(&buf->commit_count[SUBBUF_INDEX(*o_begin, rchan)]);
-//ust//        prefetch(&buf->commit_seq[SUBBUF_INDEX(*o_begin, rchan)]);
-//ust// #else
-//ust//        prefetchw(&buf->commit_count[SUBBUF_INDEX(*o_begin, rchan)]);
-//ust// #endif
        if (last_tsc_overflow(buf, *tsc))
                *rflags = LTT_RFLAG_ID_SIZE_TSC;
 
@@ -343,22 +457,23 @@ static __inline__ int ltt_relay_try_reserve(
        return 0;
 }
 
-static __inline__ int ltt_reserve_slot(struct ust_trace *trace,
-               struct ust_channel *chan, void **transport_data,
-               size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
-               unsigned int *rflags, int largest_align, int cpu)
+static __inline__ int ltt_reserve_slot(struct ust_channel *chan,
+                                      struct ust_trace *trace, size_t data_size,
+                                      int largest_align, int cpu,
+                                      struct ust_buffer **ret_buf,
+                                      size_t *slot_size, long *buf_offset, u64 *tsc,
+                                      unsigned int *rflags)
 {
-       struct ust_buffer *buf = chan->buf[cpu];
+       struct ust_buffer *buf = *ret_buf = chan->buf[cpu];
        long o_begin, o_end, o_old;
        size_t before_hdr_pad;
 
        /*
         * Perform retryable operations.
         */
-       /* FIXME: make this rellay per cpu? */
-       if (unlikely(LOAD_SHARED(ltt_nesting) > 4)) {
+       if (unlikely(CMM_LOAD_SHARED(ltt_nesting) > 4)) {
                DBG("Dropping event because nesting is too deep.");
-               local_inc(&buf->events_lost);
+               uatomic_inc(&buf->events_lost);
                return -EPERM;
        }
 
@@ -368,7 +483,7 @@ static __inline__ int ltt_reserve_slot(struct ust_trace *trace,
                        &before_hdr_pad, slot_size)))
                goto slow_path;
 
-       if (unlikely(local_cmpxchg(&buf->offset, o_old, o_end) != o_old))
+       if (unlikely(uatomic_cmpxchg(&buf->offset, o_old, o_end) != o_old))
                goto slow_path;
 
        /*
@@ -384,27 +499,19 @@ static __inline__ int ltt_reserve_slot(struct ust_trace *trace,
         */
        ltt_reserve_push_reader(chan, buf, o_end - 1);
 
-       /*
-        * Clear noref flag for this subbuffer.
-        */
-//ust//        ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(o_end - 1, chan));
-
        *buf_offset = o_begin + before_hdr_pad;
        return 0;
 slow_path:
-       return ltt_reserve_slot_lockless_slow(trace, chan,
-               transport_data, data_size, slot_size, buf_offset, tsc,
-               rflags, largest_align, cpu);
+       return ltt_reserve_slot_lockless_slow(chan, trace, data_size,
+                                             largest_align, cpu, ret_buf,
+                                             slot_size, buf_offset, tsc,
+                                             rflags);
 }
 
 /*
  * Force a sub-buffer switch for a per-cpu buffer. This operation is
  * completely reentrant : can be called while tracing is active with
  * absolutely no lock held.
- *
- * Note, however, that as a local_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
  */
 static __inline__ void ltt_force_switch(struct ust_buffer *buf,
                enum force_switch_mode mode)
@@ -418,7 +525,6 @@ static __inline__ void ltt_force_switch(struct ust_buffer *buf,
  * commit count reaches back the reserve offset (module subbuffer size). It is
  * useful for crash dump.
  */
-//ust// #ifdef CONFIG_LTT_VMCORE
 static __inline__ void ltt_write_commit_counter(struct ust_channel *chan,
                struct ust_buffer *buf, long idx, long buf_offset,
                long commit_count, size_t data_size)
@@ -437,19 +543,13 @@ static __inline__ void ltt_write_commit_counter(struct ust_channel *chan,
        if (unlikely(SUBBUF_OFFSET(offset - commit_count, buf->chan)))
                return;
 
-       commit_seq_old = local_read(&buf->commit_seq[idx]);
+       commit_seq_old = uatomic_read(&buf->commit_seq[idx]);
        while (commit_seq_old < commit_count)
-               commit_seq_old = local_cmpxchg(&buf->commit_seq[idx],
+               commit_seq_old = uatomic_cmpxchg(&buf->commit_seq[idx],
                                         commit_seq_old, commit_count);
 
        DBG("commit_seq for channel %s_%d, subbuf %ld is now %ld", buf->chan->channel_name, buf->cpu, idx, commit_count);
 }
-//ust// #else
-//ust// static __inline__ void ltt_write_commit_counter(struct ust_buffer *buf,
-//ust//                long idx, long buf_offset, long commit_count, size_t data_size)
-//ust// {
-//ust// }
-//ust// #endif
 
 /*
  * Atomic unordered slot commit. Increments the commit count in the
@@ -472,17 +572,9 @@ static __inline__ void ltt_commit_slot(
        long endidx = SUBBUF_INDEX(offset_end - 1, chan);
        long commit_count;
 
-#ifdef LTT_NO_IPI_BARRIER
-       smp_wmb();
-#else
-       /*
-        * Must write slot data before incrementing commit count.
-        * This compiler barrier is upgraded into a smp_mb() by the IPI
-        * sent by get_subbuf().
-        */
-       barrier();
-#endif
-       local_add(slot_size, &buf->commit_count[endidx].cc);
+       cmm_smp_wmb();
+
+       uatomic_add(&buf->commit_count[endidx].cc, slot_size);
        /*
         * commit count read can race with concurrent OOO commit count updates.
         * This is only needed for ltt_check_deliver (for non-polling delivery
@@ -492,7 +584,7 @@ static __inline__ void ltt_commit_slot(
         * - Multiple delivery for the same sub-buffer (which is handled
         *   gracefully by the reader code) if the value is for a full
         *   sub-buffer. It's important that we can never miss a sub-buffer
-        *   delivery. Re-reading the value after the local_add ensures this.
+        *   delivery. Re-reading the value after the uatomic_add ensures this.
         * - Reading a commit_count with a higher value that what was actually
         *   added to it for the ltt_write_commit_counter call (again caused by
         *   a concurrent committer). It does not matter, because this function
@@ -500,7 +592,7 @@ static __inline__ void ltt_commit_slot(
         *   reserve offset for a specific sub-buffer, which is completely
         *   independent of the order.
         */
-       commit_count = local_read(&buf->commit_count[endidx].cc);
+       commit_count = uatomic_read(&buf->commit_count[endidx].cc);
 
        ltt_check_deliver(chan, buf, offset_end - 1, commit_count, endidx);
        /*
@@ -510,26 +602,98 @@ static __inline__ void ltt_commit_slot(
        ltt_write_commit_counter(chan, buf, endidx, buf_offset, commit_count, data_size);
 }
 
-void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
-        const void *src, size_t len, ssize_t cpy);
+void _ust_buffers_strncpy_fixup(struct ust_buffer *buf, size_t offset,
+                               size_t len, size_t copied, int terminated);
 
 static __inline__ int ust_buffers_write(struct ust_buffer *buf, size_t offset,
         const void *src, size_t len)
 {
-       size_t cpy;
        size_t buf_offset = BUFFER_OFFSET(offset, buf->chan);
 
        assert(buf_offset < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+       assert(buf_offset + len
+              <= buf->chan->subbuf_size*buf->chan->subbuf_cnt);
 
-       cpy = min_t(size_t, len, buf->buf_size - buf_offset);
-       ust_buffers_do_copy(buf->buf_data + buf_offset, src, cpy);
+       ust_buffers_do_copy(buf->buf_data + buf_offset, src, len);
 
-       if (unlikely(len != cpy))
-               _ust_buffers_write(buf, buf_offset, src, len, cpy);
        return len;
 }
 
-int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed);
-int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old);
+/*
+ * ust_buffers_do_memset - write character into dest.
+ * @dest: destination
+ * @src: source character
+ * @len: length to write
+ */
+static __inline__
+void ust_buffers_do_memset(void *dest, char src, size_t len)
+{
+       /*
+        * What we really want here is an __inline__ memset, but we
+        * don't have constants, so gcc generally uses a function call.
+        */
+       for (; len > 0; len--)
+               *(u8 *)dest++ = src;
+}
+
+/*
+ * ust_buffers_do_strncpy - copy a string up to a certain number of bytes
+ * @dest: destination
+ * @src: source
+ * @len: max. length to copy
+ * @terminated: output string ends with \0 (output)
+ *
+ * returns the number of bytes copied. Does not finalize with \0 if len is
+ * reached.
+ */
+static __inline__
+size_t ust_buffers_do_strncpy(void *dest, const void *src, size_t len,
+                             int *terminated)
+{
+       size_t orig_len = len;
+
+       *terminated = 0;
+       /*
+        * What we really want here is an __inline__ strncpy, but we
+        * don't have constants, so gcc generally uses a function call.
+        */
+       for (; len > 0; len--) {
+               *(u8 *)dest = CMM_LOAD_SHARED(*(const u8 *)src);
+               /* Check with dest, because src may be modified concurrently */
+               if (*(const u8 *)dest == '\0') {
+                       len--;
+                       *terminated = 1;
+                       break;
+               }
+               dest++;
+               src++;
+       }
+       return orig_len - len;
+}
+
+static __inline__
+int ust_buffers_strncpy(struct ust_buffer *buf, size_t offset, const void *src,
+                       size_t len)
+{
+       size_t buf_offset = BUFFER_OFFSET(offset, buf->chan);
+       ssize_t copied;
+       int terminated;
+
+       assert(buf_offset < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+       assert(buf_offset + len
+              <= buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+
+       copied = ust_buffers_do_strncpy(buf->buf_data + buf_offset,
+                                       src, len, &terminated);
+       if (unlikely(copied < len || !terminated))
+               _ust_buffers_strncpy_fixup(buf, offset, len, copied,
+                                          terminated);
+       return len;
+}
+
+extern int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed);
+extern int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old);
+
+extern void init_ustrelay_transport(void);
 
 #endif /* _UST_BUFFERS_H */
This page took 0.034772 seconds and 4 git commands to generate.