reader starts to work
authorcompudj <compudj@04897980-b3bd-0310-b5e0-8ef037075253>
Wed, 8 Mar 2006 05:34:11 +0000 (05:34 +0000)
committercompudj <compudj@04897980-b3bd-0310-b5e0-8ef037075253>
Wed, 8 Mar 2006 05:34:11 +0000 (05:34 +0000)
git-svn-id: http://ltt.polymtl.ca/svn@1614 04897980-b3bd-0310-b5e0-8ef037075253

usertrace-fast/ltt-usertrace-fast.c
usertrace-fast/ltt-usertrace-fast.h

index 5f8c5e08657a2b9e747e7b00af436939e36a8855..990b0211bbf0b5025c3d0ccad5c8446fe39988f2 100644 (file)
@@ -41,6 +41,7 @@
  *
  */
 
+#define _GNU_SOURCE
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <stdlib.h>
 #include <sys/param.h>
-#include <linux/futex.h>
 #include <sys/time.h>
 
+#include <asm/atomic.h>
 #include <asm/timex.h> //for get_cycles()
 
 #include "ltt-usertrace-fast.h"
 
+enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
 
 /* Writer (the traced application) */
 
@@ -119,9 +121,152 @@ static void handler_sigalarm(int signo)
 }
 
 /* Do a buffer switch. Don't switch if buffer is completely empty */
-static void flush_buffer(struct ltt_buf *ltt_buf)
+static void flush_buffer(struct ltt_buf *ltt_buf, enum force_switch_mode mode)
 {
+       uint64_t tsc;
+       int offset_begin, offset_end, offset_old;
+       int reserve_commit_diff;
+       int consumed_old, consumed_new;
+       int commit_count, reserve_count;
+       int end_switch_old;
 
+       do {
+               offset_old = atomic_read(&ltt_buf->offset);
+               offset_begin = offset_old;
+               end_switch_old = 0;
+               tsc = ltt_get_timestamp();
+               if(tsc == 0) {
+                       /* Error in getting the timestamp : should not happen : it would
+                        * mean we are called from an NMI during a write seqlock on xtime. */
+                       return;
+               }
+
+               if(SUBBUF_OFFSET(offset_begin, ltt_buf) != 0) {
+                       offset_begin = SUBBUF_ALIGN(offset_begin, ltt_buf);
+                       end_switch_old = 1;
+               } else {
+      /* we do not have to switch : buffer is empty */
+      return;
+    }
+               if(mode == FORCE_ACTIVE)
+                       offset_begin += ltt_subbuf_header_len(ltt_buf);
+               /* Always begin_switch in FORCE_ACTIVE mode */
+
+               /* Test new buffer integrity */
+               reserve_commit_diff = 
+                       atomic_read(
+                               &ltt_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])
+                       - atomic_read(
+                                       &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
+               if(reserve_commit_diff == 0) {
+      /* Next buffer not corrupted. */ 
+      if(mode == FORCE_ACTIVE
+                               && (offset_begin-atomic_read(&ltt_buf->consumed))
+                                                                                       >= ltt_buf->alloc_size) {
+       /* We do not overwrite non consumed buffers and we are full : ignore
+                switch while tracing is active. */ 
+        return;
+      }   
+    } else { 
+      /* Next subbuffer corrupted. Force pushing reader even in normal mode */
+    }
+                       
+               offset_end = offset_begin;
+       } while(atomic_cmpxchg(&ltt_buf->offset, offset_old, offset_end)
+                                                       != offset_old);
+
+
+       if(mode == FORCE_ACTIVE) {
+               /* Push the reader if necessary */
+               do {
+                       consumed_old = atomic_read(&ltt_buf->consumed);
+                       /* If buffer is in overwrite mode, push the reader consumed count if
+                                the write position has reached it and we are not at the first
+                                iteration (don't push the reader farther than the writer). 
+                                This operation can be done concurrently by many writers in the
+                                same buffer, the writer being at the fartest write position sub-buffer
+                                index in the buffer being the one which will win this loop. */
+                       /* If the buffer is not in overwrite mode, pushing the reader only
+                                happen if a sub-buffer is corrupted */
+                       if((SUBBUF_TRUNC(offset_end, ltt_buf) 
+                                       - SUBBUF_TRUNC(consumed_old, ltt_buf)) 
+                                                       >= ltt_buf->alloc_size)
+                               consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
+                       else {
+                               consumed_new = consumed_old;
+                               break;
+                       }
+               } while(atomic_cmpxchg(&ltt_buf->consumed, consumed_old, consumed_new)
+                               != consumed_old);
+
+               if(consumed_old != consumed_new) {
+                       /* Reader pushed : we are the winner of the push, we can therefore
+                                reequilibrate reserve and commit. Atomic increment of the commit
+                                count permits other writers to play around with this variable
+                                before us. We keep track of corrupted_subbuffers even in overwrite
+                                mode :
+                                we never want to write over a non completely committed sub-buffer : 
+                                possible causes : the buffer size is too low compared to the unordered
+                                data input, or there is a writer who died between the reserve and the
+                                commit. */
+                       if(reserve_commit_diff) {
+                               /* We have to alter the sub-buffer commit count : a sub-buffer is
+                                        corrupted */
+                               atomic_add(reserve_commit_diff,
+                                                               &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
+                               atomic_inc(&ltt_buf->corrupted_subbuffers);
+                       }
+               }
+       }
+
+       /* Always switch */
+
+       if(end_switch_old) {
+               /* old subbuffer */
+               /* Concurrency safe because we are the last and only thread to alter this
+                        sub-buffer. As long as it is not delivered and read, no other thread can
+                        alter the offset, alter the reserve_count or call the
+                        client_buffer_end_callback on this sub-buffer.
+                        The only remaining threads could be the ones with pending commits. They
+                        will have to do the deliver themself.
+                        Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
+                        commit and reserve counts. We keep a corrupted sub-buffers count and push
+                        the readers across these sub-buffers.
+                        Not concurrency safe if a writer is stalled in a subbuffer and
+                        another writer switches in, finding out it's corrupted. The result will be
+                        than the old (uncommited) subbuffer will be declared corrupted, and that
+                        the new subbuffer will be declared corrupted too because of the commit
+                        count adjustment.
+                        Offset old should never be 0. */
+               ltt_buffer_end_callback(ltt_buf, tsc, offset_old,
+                               SUBBUF_INDEX((offset_old), ltt_buf));
+               /* Setting this reserve_count will allow the sub-buffer to be delivered by
+                        the last committer. */
+               reserve_count = atomic_add_return((SUBBUF_OFFSET((offset_old-1),
+                                                      ltt_buf) + 1),
+                                                                               &ltt_buf->reserve_count[SUBBUF_INDEX((offset_old),
+                                                          ltt_buf)]);
+               if(reserve_count == atomic_read(
+                               &ltt_buf->commit_count[SUBBUF_INDEX((offset_old), ltt_buf)])) {
+                       ltt_deliver_callback(ltt_buf, SUBBUF_INDEX((offset_old), ltt_buf), NULL);
+               }
+       }
+       
+       if(mode == FORCE_ACTIVE) {
+               /* New sub-buffer */
+               /* This code can be executed unordered : writers may already have written
+                        to the sub-buffer before this code gets executed, caution. */
+               /* The commit makes sure that this code is executed before the deliver
+                        of this sub-buffer */
+               ltt_buffer_begin_callback(ltt_buf, tsc, SUBBUF_INDEX(offset_begin, ltt_buf));
+               commit_count = atomic_add_return(ltt_subbuf_header_len(ltt_buf),
+                                                                &ltt_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
+               /* Check if the written buffer has to be delivered */
+               if(commit_count == atomic_read(
+                                       &ltt_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])) {
+                       ltt_deliver_callback(ltt_buf, SUBBUF_INDEX(offset_begin, ltt_buf), NULL);
+               }
+       }
 
 }
 
@@ -163,7 +308,8 @@ static inline int ltt_buffer_put(struct ltt_buf *ltt_buf,
        } else {
                if(atomic_read(&ltt_buf->full) == 1) {
                        /* tell the client that buffer is now unfull */
-                       ret = futex(&ltt_buf->full, FUTEX_WAKE, 1, NULL, NULL, 0);
+                       ret = futex((unsigned long)&ltt_buf->full,
+                                       FUTEX_WAKE, 1, 0, 0, 0);
                        if(ret != 1) {
                                printf("LTT warning : race condition : writer not waiting or too many writers\n");
                        }
@@ -182,7 +328,7 @@ static inline int ltt_buffer_put(struct ltt_buf *ltt_buf,
  *{
        if(buffer_is_full) {
                atomic_set(&ltt_buf->full, 1);
-               ret = futex(&ltt_buf->full, 1, NULL, NULL, 0);
+               ret = do_futex((unsigned long)&ltt_buf->full, 1, 0, 0, 0);
        }
 }
 
@@ -190,11 +336,12 @@ static inline int ltt_buffer_put(struct ltt_buf *ltt_buf,
 
 static int read_subbuffer(struct ltt_buf *ltt_buf, int fd)
 {
+       unsigned int consumed_old;
        int err;
        printf("LTT read buffer\n");
 
 
-       err = ltt_buffer_get(&shared_trace_info->channel.cpu, &consumed_old);
+       err = ltt_buffer_get(ltt_buf, &consumed_old);
        if(err != -EAGAIN && err != 0) {
                printf("LTT Reserving sub buffer failed\n");
                goto get_error;
@@ -218,11 +365,11 @@ static int read_subbuffer(struct ltt_buf *ltt_buf, int fd)
        }
 #endif //0
 write_error:
-       err = ltt_buffer_put(&shared_trace_info->channel.cpu, consumed_old);
+       err = ltt_buffer_put(ltt_buf, consumed_old);
 
        if(err != 0) {
                if(err == -EIO) {
-                       perror("Reader has been pushed by the writer, last subbuffer corrupted.");
+                       printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
                        /* FIXME : we may delete the last written buffer if we wish. */
                }
                goto get_error;
@@ -304,30 +451,32 @@ static void ltt_usertrace_fast_daemon(struct ltt_trace_info *shared_trace_info,
                printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
        
                do {
-                       ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu);
+                       ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
                } while(ret == 0);
 
                do {
-                       ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac);
+                       ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
                } while(ret == 0);
        }
 
-       /* Buffer force switch (flush) */
-       flush_buffer(&shared_trace_info->channel.cpu);
+       /* The parent thread is dead and we have finished with the buffer */
+
+       /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
+        * there is no writer. */
+       flush_buffer(&shared_trace_info->channel.cpu, FORCE_FLUSH);
        do {
-               ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu);
+               ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
        } while(ret == 0);
 
 
-       flush_buffer(&shared_trace_info->channel.facilities);
+       flush_buffer(&shared_trace_info->channel.facilities, FORCE_FLUSH);
        do {
-               ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac);
+               ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
        } while(ret == 0);
 
        close(fd_fac);
        close(fd_cpu);
        
-       /* The parent thread is dead and we have finished with the buffer */
        munmap(shared_trace_info, sizeof(*shared_trace_info));
        
        exit(0);
@@ -358,9 +507,13 @@ void ltt_rw_init(void)
        atomic_set(&shared_trace_info->channel.facilities.full, 0);
        shared_trace_info->channel.facilities.alloc_size = LTT_BUF_SIZE_FACILITIES;
        shared_trace_info->channel.facilities.subbuf_size = LTT_SUBBUF_SIZE_FACILITIES;
+       shared_trace_info->channel.facilities.start =
+               shared_trace_info->channel.facilities_buf;
+
        atomic_set(&shared_trace_info->channel.cpu.full, 0);
        shared_trace_info->channel.cpu.alloc_size = LTT_BUF_SIZE_CPU;
        shared_trace_info->channel.cpu.subbuf_size = LTT_SUBBUF_SIZE_CPU;
+       shared_trace_info->channel.cpu.start = shared_trace_info->channel.cpu_buf;
        shared_trace_info->init = 1;
 
        /* Disable signals */
index 811a2537199a9ef8ad3ecae9e9959bfc6469e29a..9953c15a62cb0eb7579ffce11d1cb3e4b2a3058a 100644 (file)
 #include <errno.h>
 #include <asm/atomic.h>
 #include <pthread.h>
+#include <stdint.h>
+#include <syscall.h>
+#include <linux/futex.h>
+
+#ifndef futex
+static inline _syscall6(long, futex, unsigned long, uaddr, int, op, int, val,
+               unsigned long, timeout, unsigned long, uaddr2, int, val2)
+#endif //futex
 
 
 #ifndef        LTT_N_SUBBUFS
   (BUFFER_OFFSET(offset,buf)/buf->subbuf_size)
 
 
+#define LTT_TRACER_MAGIC_NUMBER                 0x00D6B7ED
+#define LTT_TRACER_VERSION_MAJOR               0
+#define LTT_TRACER_VERSION_MINOR               7
+
+#ifndef atomic_cmpxchg
+#define atomic_cmpxchg(v, old, new) ((int)cmpxchg(&((v)->counter), old, new))
+#endif //atomic_cmpxchg
+struct ltt_trace_header {
+       uint32_t                                magic_number;
+       uint32_t                                arch_type;
+       uint32_t                                arch_variant;
+       uint32_t                                float_word_order;        /* Only useful for user space traces */
+       uint8_t                                 arch_size;
+       //uint32_t                              system_type;
+       uint8_t                                 major_version;
+       uint8_t                                 minor_version;
+       uint8_t                                 flight_recorder;
+       uint8_t                                 has_heartbeat;
+       uint8_t                                 has_alignment;  /* Event header alignment */
+       uint32_t                                freq_scale;
+       uint64_t                                start_freq;
+       uint64_t                                start_tsc;
+       uint64_t                                start_monotonic;
+  uint64_t        start_time_sec;
+  uint64_t        start_time_usec;
+} __attribute((packed));
+
+
+struct ltt_block_start_header {
+       struct { 
+               uint64_t                                                                cycle_count;
+               uint64_t                                                                freq; /* khz */
+       } begin;
+       struct { 
+               uint64_t                                                                cycle_count;
+               uint64_t                                                                freq; /* khz */
+       } end;
+       uint32_t                                                                lost_size;      /* Size unused at the end of the buffer */
+       uint32_t                                                                buf_size;               /* The size of this sub-buffer */
+       struct ltt_trace_header trace;
+} __attribute((packed));
+
+
+
 struct ltt_buf {
+       void                    *start;
        atomic_t        offset;
        atomic_t        consumed;
        atomic_t        reserve_count[LTT_N_SUBBUFS];
        atomic_t        commit_count[LTT_N_SUBBUFS];
 
        atomic_t        events_lost;
+       atomic_t        corrupted_subbuffers;
        atomic_t        full;   /* futex on which the writer waits : 1 : full */
        unsigned int    alloc_size;
        unsigned int    subbuf_size;
@@ -71,10 +125,96 @@ struct ltt_trace_info {
        } channel;
 };
 
+
+
 extern __thread struct ltt_trace_info *thread_trace_info;
 
 void ltt_thread_init(void);
 
 void ltt_usertrace_fast_buffer_switch(void);
 
+
+
+static inline uint64_t ltt_get_timestamp()
+{
+       return get_cycles();
+}
+
+static inline unsigned int ltt_subbuf_header_len(struct ltt_buf *buf)
+{
+       return sizeof(struct ltt_block_start_header);
+}
+
+
+
+static inline void ltt_write_trace_header(struct ltt_trace_header *header)
+{
+       header->magic_number = LTT_TRACER_MAGIC_NUMBER;
+       header->major_version = LTT_TRACER_VERSION_MAJOR;
+       header->minor_version = LTT_TRACER_VERSION_MINOR;
+       header->float_word_order = 0;   //FIXME
+       header->arch_type = 0; //FIXME LTT_ARCH_TYPE;
+       header->arch_size = sizeof(void*);
+       header->arch_variant = 0; //FIXME LTT_ARCH_VARIANT;
+       header->flight_recorder = 0;
+       header->has_heartbeat = 0;
+
+#ifdef CONFIG_LTT_ALIGNMENT
+       header->has_alignment = sizeof(void*);
+#else
+       header->has_alignment = 0;
+#endif
+       
+       //FIXME
+       header->freq_scale = 0;
+       header->start_freq = 0;
+       header->start_tsc = 0;
+       header->start_monotonic = 0;
+       header->start_time_sec = 0;
+       header->start_time_usec = 0;
+}
+
+
+static inline void ltt_buffer_begin_callback(struct ltt_buf *buf,
+                     uint64_t tsc, unsigned int subbuf_idx)
+{
+       struct ltt_block_start_header *header = 
+                                       (struct ltt_block_start_header*)
+                                               (buf->start + (subbuf_idx*buf->subbuf_size));
+       
+       header->begin.cycle_count = tsc;
+       header->begin.freq = 0; //ltt_frequency();
+
+       header->lost_size = 0xFFFFFFFF; // for debugging...
+       
+       header->buf_size = buf->subbuf_size;
+       
+       ltt_write_trace_header(&header->trace);
+
+}
+
+
+
+static inline void ltt_buffer_end_callback(struct ltt_buf *buf,
+                     uint64_t tsc, unsigned int offset, unsigned int subbuf_idx)
+{
+       struct ltt_block_start_header *header = 
+                                               (struct ltt_block_start_header*)
+                                                               (buf->start + (subbuf_idx*buf->subbuf_size));
+  /* offset is assumed to never be 0 here : never deliver a completely
+   * empty subbuffer. */
+  /* The lost size is between 0 and subbuf_size-1 */
+       header->lost_size = SUBBUF_OFFSET((buf->subbuf_size - offset),
+                                                                                                                                               buf);
+       header->end.cycle_count = tsc;
+       header->end.freq = 0; //ltt_frequency();
+}
+
+
+static inline void ltt_deliver_callback(struct ltt_buf *buf,
+    unsigned subbuf_idx,
+    void *subbuf)
+{
+       ltt_usertrace_fast_buffer_switch();
+}
 #endif //_LTT_USERTRACE_FAST_H
This page took 0.030796 seconds and 4 git commands to generate.