add libtracing Makefile.am
[ust.git] / libtracing / relay.c
index d78fa72a7ae2ac248cc07e309ba1c48236fcfbc2..6bb5a35d63d6aaed2497e0920287801b9e49d4a3 100644 (file)
 //ust// #include <linux/cpu.h>
 //ust// #include <linux/splice.h>
 //ust// #include <linux/bitops.h>
-#include <sys/mman.h>
 #include "kernelcompat.h"
+#include <sys/mman.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
 #include "list.h"
 #include "relay.h"
 #include "channels.h"
@@ -90,24 +92,47 @@ static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf);
 
 static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
 {
-       unsigned int n_pages;
-       struct buf_page *buf_page, *n;
+//ust//        unsigned int n_pages;
+//ust//        struct buf_page *buf_page, *n;
 
-       void *result;
+       void *ptr;
+       int result;
 
        *size = PAGE_ALIGN(*size);
 
-       /* Maybe do read-ahead */
-       result = mmap(NULL, *size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
-       if(result == MAP_FAILED) {
-               PERROR("mmap");
+       result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
+       if(buf->shmid == -1) {
+               PERROR("shmget");
+               return -1;
+       }
+
+       ptr = shmat(buf->shmid, NULL, 0);
+       if(ptr == (void *) -1) {
+               perror("shmat");
+               goto destroy_shmem;
+       }
+
+       /* Already mark the shared memory for destruction. This will occur only
+         * when all users have detached.
+        */
+       result = shmctl(buf->shmid, IPC_RMID, NULL);
+       if(result == -1) {
+               perror("shmctl");
                return -1;
        }
 
-       buf->buf_data = result;
+       buf->buf_data = ptr;
        buf->buf_size = *size;
 
        return 0;
+
+       destroy_shmem:
+       result = shmctl(buf->shmid, IPC_RMID, NULL);
+       if(result == -1) {
+               perror("shmctl");
+       }
+
+       return -1;
 }
 
 /**
@@ -181,7 +206,7 @@ static void relay_destroy_buf(struct rchan_buf *buf)
 static void relay_remove_buf(struct kref *kref)
 {
        struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
-       buf->chan->cb->remove_buf_file(buf);
+//ust//        buf->chan->cb->remove_buf_file(buf);
        relay_destroy_buf(buf);
 }
 
@@ -748,37 +773,6 @@ void *ltt_relay_offset_address(struct rchan_buf *buf, size_t offset)
 #define printk_dbg(fmt, args...)
 #endif
 
-/* LTTng lockless logging buffer info */
-struct ltt_channel_buf_struct {
-       /* First 32 bytes cache-hot cacheline */
-       local_t offset;                 /* Current offset in the buffer */
-       local_t *commit_count;          /* Commit count per sub-buffer */
-       atomic_long_t consumed;         /*
-                                        * Current offset in the buffer
-                                        * standard atomic access (shared)
-                                        */
-       unsigned long last_tsc;         /*
-                                        * Last timestamp written in the buffer.
-                                        */
-       /* End of first 32 bytes cacheline */
-       atomic_long_t active_readers;   /*
-                                        * Active readers count
-                                        * standard atomic access (shared)
-                                        */
-       local_t events_lost;
-       local_t corrupted_subbuffers;
-       spinlock_t full_lock;           /*
-                                        * buffer full condition spinlock, only
-                                        * for userspace tracing blocking mode
-                                        * synchronization with reader.
-                                        */
-//ust//        wait_queue_head_t write_wait;   /*
-//ust//                                         * Wait queue for blocking user space
-//ust//                                         * writers
-//ust//                                         */
-       atomic_t wakeup_readers;        /* Boolean : wakeup readers waiting ? */
-} ____cacheline_aligned;
-
 /*
  * Last TSC comparison functions. Check if the current TSC overflows
  * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
@@ -879,14 +873,33 @@ static notrace void ltt_buffer_end_callback(struct rchan_buf *buf,
 
 }
 
+void (*wake_consumer)(void *, int) = NULL;
+
+void relay_set_wake_consumer(void (*wake)(void *, int))
+{
+       wake_consumer = wake;
+}
+
+void relay_wake_consumer(void *arg, int finished)
+{
+       if(wake_consumer)
+               wake_consumer(arg, finished);
+}
+
 static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
                void *subbuf)
 {
        struct ltt_channel_struct *channel =
                (struct ltt_channel_struct *)buf->chan->private_data;
        struct ltt_channel_buf_struct *ltt_buf = channel->buf;
+       int result;
 
-       atomic_set(&ltt_buf->wakeup_readers, 1);
+       result = write(ltt_buf->data_ready_fd_write, "1", 1);
+       if(result == -1) {
+               PERROR("write (in ltt_relay_buffer_flush)");
+               ERR("this should never happen!");
+       }
+//ust//        atomic_set(&ltt_buf->wakeup_readers, 1);
 }
 
 static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf)
@@ -1041,7 +1054,7 @@ static notrace void ltt_buf_unfull(struct rchan_buf *buf,
 //ust//        return mask;
 //ust// }
 
-static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
+int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
 {
        struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
        long consumed_old, consumed_idx, commit_count, write_offset;
@@ -1081,7 +1094,7 @@ static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struc
        return 0;
 }
 
-static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old)
+int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old)
 {
        long consumed_new, consumed_old;
 
@@ -1090,14 +1103,14 @@ static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struc
        consumed_old = consumed_old | uconsumed_old;
        consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
 
-       spin_lock(&ltt_buf->full_lock);
+//ust//        spin_lock(&ltt_buf->full_lock);
        if (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
                                consumed_new)
            != consumed_old) {
                /* We have been pushed by the writer : the last
                 * buffer read _is_ corrupted! It can also
                 * happen if this is a buffer we never got. */
-               spin_unlock(&ltt_buf->full_lock);
+//ust//                spin_unlock(&ltt_buf->full_lock);
                return -EIO;
        } else {
                /* tell the client that buffer is now unfull */
@@ -1106,7 +1119,7 @@ static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struc
                index = SUBBUF_INDEX(consumed_old, buf->chan);
                data = BUFFER_OFFSET(consumed_old, buf->chan);
                ltt_buf_unfull(buf, index, data);
-               spin_unlock(&ltt_buf->full_lock);
+//ust//                spin_unlock(&ltt_buf->full_lock);
        }
        return 0;
 }
@@ -1447,11 +1460,13 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
 {
        struct ltt_channel_buf_struct *ltt_buf = ltt_chan->buf;
        unsigned int j;
+       int fds[2];
+       int result;
 
-       ltt_buf->commit_count =
-               zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs);
-       if (!ltt_buf->commit_count)
-               return -ENOMEM;
+//ust//        ltt_buf->commit_count =
+//ust//                zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs);
+//ust//        if (!ltt_buf->commit_count)
+//ust//                return -ENOMEM;
        kref_get(&trace->kref);
        kref_get(&trace->ltt_transport_kref);
        kref_get(&ltt_chan->kref);
@@ -1461,8 +1476,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
        for (j = 0; j < n_subbufs; j++)
                local_set(&ltt_buf->commit_count[j], 0);
 //ust//        init_waitqueue_head(&ltt_buf->write_wait);
-       atomic_set(&ltt_buf->wakeup_readers, 0);
-       spin_lock_init(&ltt_buf->full_lock);
+//ust//        atomic_set(&ltt_buf->wakeup_readers, 0);
+//ust//        spin_lock_init(&ltt_buf->full_lock);
 
        ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
 
@@ -1471,6 +1486,14 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
        local_set(&ltt_buf->events_lost, 0);
        local_set(&ltt_buf->corrupted_subbuffers, 0);
 
+       result = pipe(fds);
+       if(result == -1) {
+               PERROR("pipe");
+               return -1;
+       }
+       ltt_buf->data_ready_fd_read = fds[0];
+       ltt_buf->data_ready_fd_write = fds[1];
+
        return 0;
 }
 
@@ -1482,13 +1505,56 @@ static void ltt_relay_destroy_buffer(struct ltt_channel_struct *ltt_chan)
        kref_put(&ltt_chan->trace->ltt_transport_kref,
                ltt_release_transport);
        ltt_relay_print_buffer_errors(ltt_chan);
-       kfree(ltt_buf->commit_count);
-       ltt_buf->commit_count = NULL;
+//ust//        kfree(ltt_buf->commit_count);
+//ust//        ltt_buf->commit_count = NULL;
        kref_put(&ltt_chan->kref, ltt_relay_release_channel);
        kref_put(&trace->kref, ltt_release_trace);
 //ust//        wake_up_interruptible(&trace->kref_wq);
 }
 
+static void ltt_chan_alloc_ltt_buf(struct ltt_channel_struct *ltt_chan)
+{
+       void *ptr;
+       int result;
+
+       /* Get one page */
+       /* FIXME: increase size if we have a commit_count array that overflows the page */
+       size_t size = PAGE_ALIGN(1);
+
+       result = ltt_chan->buf_shmid = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
+       if(ltt_chan->buf_shmid == -1) {
+               PERROR("shmget");
+               return -1;
+       }
+
+       ptr = shmat(ltt_chan->buf_shmid, NULL, 0);
+       if(ptr == (void *) -1) {
+               perror("shmat");
+               goto destroy_shmem;
+       }
+
+       /* Already mark the shared memory for destruction. This will occur only
+         * when all users have detached.
+        */
+       result = shmctl(ltt_chan->buf_shmid, IPC_RMID, NULL);
+       if(result == -1) {
+               perror("shmctl");
+               return -1;
+       }
+
+       ltt_chan->buf = ptr;
+
+       return 0;
+
+       destroy_shmem:
+       result = shmctl(ltt_chan->buf_shmid, IPC_RMID, NULL);
+       if(result == -1) {
+               perror("shmctl");
+       }
+
+       return -1;
+}
+
 /*
  * Create channel.
  */
@@ -1523,7 +1589,10 @@ static int ltt_relay_create_channel(const char *trace_name,
        ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
        ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
 //ust//        ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
-       ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct));
+
+       ltt_chan_alloc_ltt_buf(ltt_chan);
+
+//ust//        ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct));
        if (!ltt_chan->buf)
                goto alloc_error;
        ltt_chan->trans_channel_data = ltt_relay_open(tmpname,
@@ -1578,8 +1647,19 @@ static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
  */
 static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf)
 {
+       struct ltt_channel_struct *channel =
+               (struct ltt_channel_struct *)buf->chan->private_data;
+       struct ltt_channel_buf_struct *ltt_buf = channel->buf;
+       int result;
+
        buf->finalized = 1;
        ltt_force_switch(buf, FORCE_FLUSH);
+
+       result = write(ltt_buf->data_ready_fd_write, "1", 1);
+       if(result == -1) {
+               PERROR("write (in ltt_relay_buffer_flush)");
+               ERR("this should never happen!");
+       }
 }
 
 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
@@ -1601,11 +1681,20 @@ static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
 static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel)
 {
        struct rchan *rchan = ltt_channel->trans_channel_data;
+       int result;
 
        if (rchan->buf) {
                struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf;
                ltt_relay_buffer_flush(rchan->buf);
 //ust//                ltt_relay_wake_writers(ltt_buf);
+               /* closing the pipe tells the consumer the buffer is finished */
+               
+               //result = write(ltt_buf->data_ready_fd_write, "D", 1);
+               //if(result == -1) {
+               //      PERROR("write (in ltt_relay_finish_buffer)");
+               //      ERR("this should never happen!");
+               //}
+               close(ltt_buf->data_ready_fd_write);
        }
 }
 
@@ -1937,7 +2026,7 @@ static inline void ltt_reserve_switch_old_subbuf(
  * sub-buffer before this code gets executed, caution.  The commit makes sure
  * that this code is executed before the deliver of this sub-buffer.
  */
-static inline void ltt_reserve_switch_new_subbuf(
+static /*inline*/ void ltt_reserve_switch_new_subbuf(
                struct ltt_channel_struct *ltt_channel,
                struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
                struct rchan_buf *buf,
@@ -2149,14 +2238,13 @@ static notrace void ltt_force_switch(struct rchan_buf *buf,
  * fill the subbuffer completely (so the subbuf index stays in the previous
  * subbuffer).
  */
-#ifdef CONFIG_LTT_VMCORE
-static inline void ltt_write_commit_counter(struct rchan_buf *buf,
+//ust// #ifdef CONFIG_LTT_VMCORE
+static /*inline*/ void ltt_write_commit_counter(struct rchan_buf *buf,
                long buf_offset, size_t slot_size)
 {
        struct ltt_channel_struct *ltt_channel =
                (struct ltt_channel_struct *)buf->chan->private_data;
-       struct ltt_channel_buf_struct *ltt_buf =
-                       percpu_ptr(ltt_channel->buf, buf->cpu);
+       struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf;
        struct ltt_subbuffer_header *header;
        long offset, subbuf_idx, commit_count;
        uint32_t lost_old, lost_new;
@@ -2183,12 +2271,12 @@ static inline void ltt_write_commit_counter(struct rchan_buf *buf,
                }
        }
 }
-#else
-static inline void ltt_write_commit_counter(struct rchan_buf *buf,
-               long buf_offset, size_t slot_size)
-{
-}
-#endif
+//ust// #else
+//ust// static inline void ltt_write_commit_counter(struct rchan_buf *buf,
+//ust//                long buf_offset, size_t slot_size)
+//ust// {
+//ust// }
+//ust// #endif
 
 /*
  * Atomic unordered slot commit. Increments the commit count in the
@@ -2227,6 +2315,8 @@ static notrace void ltt_relay_commit_slot(
         * ltt buffers from vmcore, after crash.
         */
        ltt_write_commit_counter(buf, buf_offset, slot_size);
+
+       DBG("commited slot. now commit count is %ld", commit_count);
 }
 
 /*
@@ -2375,9 +2465,14 @@ static struct ltt_transport ust_relay_transport = {
 //ust//        return 0;
 //ust// }
 
-void init_ustrelay_transport(void)
+static char initialized = 0;
+
+void __attribute__((constructor)) init_ustrelay_transport(void)
 {
-       ltt_transport_register(&ust_relay_transport);
+       if(!initialized) {
+               ltt_transport_register(&ust_relay_transport);
+               initialized = 1;
+       }
 }
 
 static void __exit ltt_relay_exit(void)
This page took 0.0271 seconds and 4 git commands to generate.