Convert buffering system to per-cpu
[ust.git] / libust / buffers.c
index 504a79da344b14431197576a42685efcefcfcaf5..13660a8910402ad45e163c524aa845d37025a6df 100644 (file)
@@ -20,6 +20,7 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
  */
 
+#include <unistd.h>
 #include <sys/mman.h>
 #include <sys/ipc.h>
 #include <sys/shm.h>
 static DEFINE_MUTEX(ust_buffers_channels_mutex);
 static LIST_HEAD(ust_buffers_channels);
 
+static int get_n_cpus(void)
+{
+       int result;
+       static int n_cpus = 0;
+
+       if(n_cpus) {
+               return n_cpus;
+       }
+
+       result = sysconf(_SC_NPROCESSORS_ONLN);
+       if(result == -1) {
+               return -1;
+       }
+
+       n_cpus = result;
+
+       return result;
+}
+
 static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
                struct ust_channel *ltt_chan,
                struct ust_buffer *buf,
@@ -57,6 +77,7 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
                return -1;
        }
 
+       /* FIXME: should have matching call to shmdt */
        ptr = shmat(buf->shmid, NULL, 0);
        if(ptr == (void *) -1) {
                perror("shmat");
@@ -86,20 +107,19 @@ static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
        return -1;
 }
 
-static struct ust_buffer *ust_buffers_create_buf(struct ust_channel *channel)
+int ust_buffers_create_buf(struct ust_channel *channel, int cpu)
 {
        int result;
+       struct ust_buffer *buf = channel->buf[cpu];
 
-       result = ust_buffers_alloc_buf(channel->buf, &channel->alloc_size);
+       buf->cpu = cpu;
+       result = ust_buffers_alloc_buf(buf, &channel->alloc_size);
        if(result)
-               goto free_buf;
+               return -1;
 
-       ((struct ust_buffer *)channel->buf)->chan = channel;
+       buf->chan = channel;
        kref_get(&channel->kref);
-       return channel->buf;
-
-free_buf:
-       return NULL;
+       return 0;
 }
 
 static void ust_buffers_destroy_channel(struct kref *kref)
@@ -118,6 +138,7 @@ static void ust_buffers_destroy_buf(struct ust_buffer *buf)
                PERROR("munmap");
        }
 
+//ust//        chan->buf[buf->cpu] = NULL;
        free(buf);
        kref_put(&chan->kref, ust_buffers_destroy_channel);
 }
@@ -129,23 +150,21 @@ static void ust_buffers_remove_buf(struct kref *kref)
        ust_buffers_destroy_buf(buf);
 }
 
-static struct ust_buffer *ust_buffers_open_buf(struct ust_channel *chan)
+int ust_buffers_open_buf(struct ust_channel *chan, int cpu)
 {
-       struct ust_buffer *buf = NULL;
-       int err;
-
-       buf = ust_buffers_create_buf(chan);
-       if (!buf)
-               return NULL;
+       int result;
 
-       kref_init(&buf->kref);
+       result = ust_buffers_create_buf(chan, cpu);
+       if (result == -1)
+               return -1;
 
-       err = ust_buffers_init_buffer(chan->trace, chan, buf, chan->subbuf_cnt);
+       kref_init(&chan->buf[cpu]->kref);
 
-       if (err)
-               return ERR_PTR(err);
+       result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt);
+       if(result == -1)
+               return -1;
 
-       return buf;
+       return 0;
 
        /* FIXME: decrementally destroy on error? */
 }
@@ -161,6 +180,9 @@ static void ust_buffers_close_buf(struct ust_buffer *buf)
 
 int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt)
 {
+       int i;
+       int result;
+
        if(subbuf_size == 0 || subbuf_cnt == 0)
                return -1;
 
@@ -169,18 +191,27 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_
        chan->subbuf_size = subbuf_size;
        chan->subbuf_size_order = get_count_order(subbuf_size);
        chan->alloc_size = FIX_SIZE(subbuf_size * subbuf_cnt);
+
        kref_init(&chan->kref);
 
        mutex_lock(&ust_buffers_channels_mutex);
-       chan->buf = ust_buffers_open_buf(chan);
-       if (!chan->buf)
-               goto error;
+       for(i=0; i<chan->n_cpus; i++) {
+               result = ust_buffers_open_buf(chan, i);
+               if (result == -1)
+                       goto error;
+       }
        list_add(&chan->list, &ust_buffers_channels);
        mutex_unlock(&ust_buffers_channels_mutex);
 
        return 0;
 
-       error:
+       /* Jump directly inside the loop to close the buffers that were already
+        * opened. */
+       for(; i>=0; i--) {
+               ust_buffers_close_buf(chan->buf[i]);
+error:
+       }
+
        kref_put(&chan->kref, ust_buffers_destroy_channel);
        mutex_unlock(&ust_buffers_channels_mutex);
        return -1;
@@ -188,12 +219,17 @@ int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_
 
 void ust_buffers_channel_close(struct ust_channel *chan)
 {
-       if (!chan)
+       int i;
+       if(!chan)
                return;
 
        mutex_lock(&ust_buffers_channels_mutex);
-       if (chan->buf)
-               ust_buffers_close_buf(chan->buf);
+       for(i=0; i<chan->n_cpus; i++) {
+       /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't
+        * initialize to NULL so we cannot use this check. Should we? */
+//ust//                if (chan->buf[i])
+                       ust_buffers_close_buf(chan->buf[i]);
+       }
 
        list_del(&chan->list);
        kref_put(&chan->kref, ust_buffers_destroy_channel);
@@ -216,6 +252,7 @@ void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
                len -= cpy;
                src += cpy;
                offset += cpy;
+
                WARN_ON(offset >= buf->buf_size);
 
                cpy = min_t(size_t, len, buf->buf_size - offset);
@@ -223,16 +260,6 @@ void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
        } while (unlikely(len != cpy));
 }
 
-/**
- * ltt_buffers_offset_address - get address of a location within the buffer
- * @buf : buffer
- * @offset : offset within the buffer.
- *
- * Return the address where a given offset is located.
- * Should be used to get the current subbuffer header pointer. Given we know
- * it's never on a page boundary, it's safe to write directly to this address,
- * as long as the write is never bigger than a page size.
- */
 void *ltt_buffers_offset_address(struct ust_buffer *buf, size_t offset)
 {
        return ((char *)buf->buf_data)+offset;
@@ -289,7 +316,7 @@ static inline int last_tsc_overflow(struct ust_buffer *ltt_buf,
  */
 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
 
-static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan);
+static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu);
 
 static void ltt_force_switch(struct ust_buffer *buf,
                enum force_switch_mode mode);
@@ -449,9 +476,9 @@ int ust_buffers_do_put_subbuf(struct ust_buffer *buf, u32 uconsumed_old)
 
 static void ltt_relay_print_subbuffer_errors(
                struct ust_channel *channel,
-               long cons_off)
+               long cons_off, int cpu)
 {
-       struct ust_buffer *ltt_buf = channel->buf;
+       struct ust_buffer *ltt_buf = channel->buf[cpu];
        long cons_idx, commit_count, write_offset;
 
        cons_idx = SUBBUF_INDEX(cons_off, channel);
@@ -477,9 +504,9 @@ static void ltt_relay_print_subbuffer_errors(
 }
 
 static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
-               struct ust_channel *channel)
+               struct ust_channel *channel, int cpu)
 {
-       struct ust_buffer *ltt_buf = channel->buf;
+       struct ust_buffer *ltt_buf = channel->buf[cpu];
        long cons_off;
 
        /*
@@ -494,13 +521,13 @@ static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
                                      channel)
                         - cons_off) > 0;
                        cons_off = SUBBUF_ALIGN(cons_off, channel))
-               ltt_relay_print_subbuffer_errors(channel, cons_off);
+               ltt_relay_print_subbuffer_errors(channel, cons_off, cpu);
 }
 
-static void ltt_relay_print_buffer_errors(struct ust_channel *channel)
+static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu)
 {
        struct ltt_trace_struct *trace = channel->trace;
-       struct ust_buffer *ltt_buf = channel->buf;
+       struct ust_buffer *ltt_buf = channel->buf[cpu];
 
        if (local_read(&ltt_buf->events_lost))
                ERR("channel %s: %ld events lost",
@@ -511,7 +538,7 @@ static void ltt_relay_print_buffer_errors(struct ust_channel *channel)
                        channel->channel_name,
                        local_read(&ltt_buf->corrupted_subbuffers));
 
-       ltt_relay_print_errors(trace, channel);
+       ltt_relay_print_errors(trace, channel, cpu);
 }
 
 static void ltt_relay_release_channel(struct kref *kref)
@@ -616,14 +643,14 @@ static int ust_buffers_init_buffer(struct ltt_trace_struct *trace,
 }
 
 /* FIXME: use this function */
-static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan)
+static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu)
 {
        struct ltt_trace_struct *trace = ltt_chan->trace;
-       struct ust_buffer *ltt_buf = ltt_chan->buf;
+       struct ust_buffer *ltt_buf = ltt_chan->buf[cpu];
 
        kref_put(&ltt_chan->trace->ltt_transport_kref,
                ltt_release_transport);
-       ltt_relay_print_buffer_errors(ltt_chan);
+       ltt_relay_print_buffer_errors(ltt_chan, cpu);
 //ust//        free(ltt_buf->commit_seq);
        kfree(ltt_buf->commit_count);
        ltt_buf->commit_count = NULL;
@@ -632,47 +659,59 @@ static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan)
 //ust//        wake_up_interruptible(&trace->kref_wq);
 }
 
-static void ltt_chan_alloc_ltt_buf(struct ust_channel *chan)
+static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan)
 {
        void *ptr;
        int result;
+       size_t size;
+       int i;
 
-       /* Get one page */
-       /* FIXME: increase size if we have a seq_commit array that overflows the page */
-       size_t size = PAGE_ALIGN(1);
+       size = PAGE_ALIGN(1);
 
-       result = chan->buf_shmid = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
-       if(chan->buf_shmid == -1) {
-               PERROR("shmget");
-               return;
-       }
+       for(i=0; i<chan->n_cpus; i++) {
 
-       ptr = shmat(chan->buf_shmid, NULL, 0);
-       if(ptr == (void *) -1) {
-               perror("shmat");
-               goto destroy_shmem;
-       }
+               result = chan->buf_struct_shmids[i] = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
+               if(result == -1) {
+                       PERROR("shmget");
+                       goto destroy_previous;
+               }
 
-       /* Already mark the shared memory for destruction. This will occur only
-         * when all users have detached.
-        */
-       result = shmctl(chan->buf_shmid, IPC_RMID, NULL);
-       if(result == -1) {
-               perror("shmctl");
-               return;
+               /* FIXME: should have matching call to shmdt */
+               ptr = shmat(chan->buf_struct_shmids[i], NULL, 0);
+               if(ptr == (void *) -1) {
+                       perror("shmat");
+                       goto destroy_shm;
+               }
+
+               /* Already mark the shared memory for destruction. This will occur only
+                * when all users have detached.
+                */
+               result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
+               if(result == -1) {
+                       perror("shmctl");
+                       goto destroy_previous;
+               }
+
+               chan->buf[i] = ptr;
        }
 
-       chan->buf = ptr;
+       return 0;
 
-       return;
+       /* Jumping inside this loop occurs from within the other loop above with i as
+        * counter, so it unallocates the structures for the cpu = current_i down to
+        * zero. */
+       for(; i>=0; i--) {
+               destroy_shm:
+               result = shmctl(chan->buf_struct_shmids[i], IPC_RMID, NULL);
+               if(result == -1) {
+                       perror("shmctl");
+               }
 
-       destroy_shmem:
-       result = shmctl(chan->buf_shmid, IPC_RMID, NULL);
-       if(result == -1) {
-               perror("shmctl");
+               destroy_previous:
+               continue;
        }
 
-       return;
+       return -1;
 }
 
 /*
@@ -682,7 +721,6 @@ static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_s
        const char *channel_name, struct ust_channel *ltt_chan,
        unsigned int subbuf_size, unsigned int n_subbufs, int overwrite)
 {
-       int err = 0;
        int result;
 
        kref_init(&ltt_chan->kref);
@@ -693,29 +731,40 @@ static int ust_buffers_create_channel(const char *trace_name, struct ltt_trace_s
        ltt_chan->overwrite = overwrite;
        ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
        ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
+       ltt_chan->n_cpus = get_n_cpus();
 //ust//        ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
+       ltt_chan->buf = (void *) malloc(ltt_chan->n_cpus * sizeof(void *));
+       if(ltt_chan->buf == NULL) {
+               goto error;
+       }
+       ltt_chan->buf_struct_shmids = (int *) malloc(ltt_chan->n_cpus * sizeof(int));
+       if(ltt_chan->buf_struct_shmids == NULL)
+               goto free_buf;
 
-       ltt_chan_alloc_ltt_buf(ltt_chan);
+       result = ust_buffers_alloc_channel_buf_structs(ltt_chan);
+       if(result != 0) {
+               goto free_buf_struct_shmids;
+       }
 
-//ust//        ltt_chan->buf = malloc(sizeof(struct ltt_channel_buf_struct));
-       if (!ltt_chan->buf)
-               goto alloc_error;
-       /* FIXME: handle error of this call */
        result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs);
-       if (result == -1) {
+       if (result != 0) {
                ERR("Cannot open channel for trace %s", trace_name);
-               goto relay_open_error;
+               goto unalloc_buf_structs;
        }
 
-       err = 0;
-       goto end;
+       return 0;
+
+unalloc_buf_structs:
+       /* FIXME: put a call here to unalloc the buf structs! */
+
+free_buf_struct_shmids:
+       free(ltt_chan->buf_struct_shmids);
 
-relay_open_error:
-//ust//        percpu_free(ltt_chan->buf);
-alloc_error:
-       err = EPERM;
-end:
-       return err;
+free_buf:
+       free(ltt_chan->buf);
+
+error:
+       return -1;
 }
 
 /*
@@ -754,12 +803,12 @@ static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel)
 //ust//        }
 }
 
-static void ltt_relay_finish_buffer(struct ust_channel *channel)
+static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu)
 {
 //     int result;
 
-       if (channel->buf) {
-               struct ust_buffer *buf = channel->buf;
+       if (channel->buf[cpu]) {
+               struct ust_buffer *buf = channel->buf[cpu];
                ltt_relay_buffer_flush(buf);
 //ust//                ltt_relay_wake_writers(ltt_buf);
                /* closing the pipe tells the consumer the buffer is finished */
@@ -776,10 +825,11 @@ static void ltt_relay_finish_buffer(struct ust_channel *channel)
 
 static void ltt_relay_finish_channel(struct ust_channel *channel)
 {
-//ust//        unsigned int i;
+       unsigned int i;
 
-//ust//        for_each_possible_cpu(i)
-               ltt_relay_finish_buffer(channel);
+       for(i=0; i<channel->n_cpus; i++) {
+               ltt_relay_finish_buffer(channel, i);
+       }
 }
 
 static void ltt_relay_remove_channel(struct ust_channel *channel)
@@ -1182,9 +1232,9 @@ static inline void ltt_reserve_end_switch_current(
 static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
                struct ust_channel *channel, void **transport_data,
                size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
-               unsigned int *rflags, int largest_align)
+               unsigned int *rflags, int largest_align, int cpu)
 {
-       struct ust_buffer *buf = *transport_data = channel->buf;
+       struct ust_buffer *buf = *transport_data = channel->buf[cpu];
        struct ltt_reserve_switch_offsets offsets;
 
        offsets.reserve_commit_diff = 0;
This page took 0.028188 seconds and 4 git commands to generate.