From 14641debd03ba299bd06040cb62e0dbdef7fac81 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 27 Jun 2011 15:24:39 -0400 Subject: [PATCH] Port ring buffer to userspace, part 1 Signed-off-by: Mathieu Desnoyers --- include/ust/core.h | 28 +- libprio_heap/lttng_prio_heap.c | 206 ----- libprio_heap/lttng_prio_heap.h | 117 --- libringbuffer/Makefile.am | 6 +- libringbuffer/backend.h | 15 +- libringbuffer/backend_internal.h | 17 +- libringbuffer/backend_types.h | 2 - libringbuffer/frontend.h | 13 +- libringbuffer/frontend_internal.h | 21 +- libringbuffer/frontend_types.h | 72 +- .../{ring_buffer_vfs.c => ring_buffer_abi.c} | 0 libringbuffer/ring_buffer_backend.c | 114 +-- libringbuffer/ring_buffer_frontend.c | 20 +- libringbuffer/ring_buffer_iterator.c | 793 ------------------ libringbuffer/vatomic.h | 36 +- 15 files changed, 129 insertions(+), 1331 deletions(-) delete mode 100644 libprio_heap/lttng_prio_heap.c delete mode 100644 libprio_heap/lttng_prio_heap.h rename libringbuffer/{ring_buffer_vfs.c => ring_buffer_abi.c} (100%) delete mode 100644 libringbuffer/ring_buffer_iterator.c diff --git a/include/ust/core.h b/include/ust/core.h index eee405dd..8c1c490e 100644 --- a/include/ust/core.h +++ b/include/ust/core.h @@ -82,7 +82,25 @@ static inline long IS_ERR(const void *ptr) /* MALLOCATION */ -#define zmalloc(s) calloc(1, s) +#include + +static inline +void *zmalloc(size_t len) +{ + return calloc(1, len); +} + +static inline +void *malloc_align(size_t len) +{ + return malloc(ALIGN(len, CAA_CACHE_LINE_SIZE)); +} + +static inline +void *zmalloc_align(size_t len) +{ + return calloc(1, ALIGN(len, CAA_CACHE_LINE_SIZE)); +} /* MATH */ @@ -110,4 +128,12 @@ static __inline__ int get_count_order(unsigned int count) const typeof( ((type *)0)->member ) *__mptr = (ptr); \ (type *)( (char *)__mptr - offsetof(type,member) );}) +#ifndef inline_memcpy +#define inline_memcpy memcpy +#endif + +#ifndef __same_type +#define __same_type(a, b) __builtin_types_compatible_p(typeof(a), typeof(b)) +#endif + #endif /* UST_CORE_H */ diff --git a/libprio_heap/lttng_prio_heap.c b/libprio_heap/lttng_prio_heap.c deleted file mode 100644 index 5bbd0793..00000000 --- a/libprio_heap/lttng_prio_heap.c +++ /dev/null @@ -1,206 +0,0 @@ -/* - * lttng_prio_heap.c - * - * Priority heap containing pointers. Based on CLRS, chapter 6. - * - * Copyright 2011 - Mathieu Desnoyers - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - */ - -#include -#include "lttng_prio_heap.h" - -#ifdef DEBUG_HEAP -void lttng_check_heap(const struct lttng_ptr_heap *heap) -{ - size_t i; - - if (!heap->len) - return; - - for (i = 1; i < heap->len; i++) - WARN_ON_ONCE(!heap->gt(heap->ptrs[i], heap->ptrs[0])); -} -#endif - -static -size_t parent(size_t i) -{ - return (i -1) >> 1; -} - -static -size_t left(size_t i) -{ - return (i << 1) + 1; -} - -static -size_t right(size_t i) -{ - return (i << 1) + 2; -} - -/* - * Copy of heap->ptrs pointer is invalid after heap_grow. - */ -static -int heap_grow(struct lttng_ptr_heap *heap, size_t new_len) -{ - void **new_ptrs; - - if (heap->alloc_len >= new_len) - return 0; - - heap->alloc_len = max_t(size_t, new_len, heap->alloc_len << 1); - new_ptrs = kmalloc(heap->alloc_len * sizeof(void *), heap->gfpmask); - if (!new_ptrs) - return -ENOMEM; - if (heap->ptrs) - memcpy(new_ptrs, heap->ptrs, heap->len * sizeof(void *)); - kfree(heap->ptrs); - heap->ptrs = new_ptrs; - return 0; -} - -static -int heap_set_len(struct lttng_ptr_heap *heap, size_t new_len) -{ - int ret; - - ret = heap_grow(heap, new_len); - if (ret) - return ret; - heap->len = new_len; - return 0; -} - -int lttng_heap_init(struct lttng_ptr_heap *heap, size_t alloc_len, - gfp_t gfpmask, int gt(void *a, void *b)) -{ - heap->ptrs = NULL; - heap->len = 0; - heap->alloc_len = 0; - heap->gt = gt; - /* - * Minimum size allocated is 1 entry to ensure memory allocation - * never fails within heap_replace_max. - */ - return heap_grow(heap, max_t(size_t, 1, alloc_len)); -} - -void lttng_heap_free(struct lttng_ptr_heap *heap) -{ - kfree(heap->ptrs); -} - -static void heapify(struct lttng_ptr_heap *heap, size_t i) -{ - void **ptrs = heap->ptrs; - size_t l, r, largest; - - for (;;) { - void *tmp; - - l = left(i); - r = right(i); - if (l < heap->len && heap->gt(ptrs[l], ptrs[i])) - largest = l; - else - largest = i; - if (r < heap->len && heap->gt(ptrs[r], ptrs[largest])) - largest = r; - if (largest == i) - break; - tmp = ptrs[i]; - ptrs[i] = ptrs[largest]; - ptrs[largest] = tmp; - i = largest; - } - lttng_check_heap(heap); -} - -void *lttng_heap_replace_max(struct lttng_ptr_heap *heap, void *p) -{ - void *res; - - if (!heap->len) { - (void) heap_set_len(heap, 1); - heap->ptrs[0] = p; - lttng_check_heap(heap); - return NULL; - } - - /* Replace the current max and heapify */ - res = heap->ptrs[0]; - heap->ptrs[0] = p; - heapify(heap, 0); - return res; -} - -int lttng_heap_insert(struct lttng_ptr_heap *heap, void *p) -{ - void **ptrs; - size_t pos; - int ret; - - ret = heap_set_len(heap, heap->len + 1); - if (ret) - return ret; - ptrs = heap->ptrs; - pos = heap->len - 1; - while (pos > 0 && heap->gt(p, ptrs[parent(pos)])) { - /* Move parent down until we find the right spot */ - ptrs[pos] = ptrs[parent(pos)]; - pos = parent(pos); - } - ptrs[pos] = p; - lttng_check_heap(heap); - return 0; -} - -void *lttng_heap_remove(struct lttng_ptr_heap *heap) -{ - switch (heap->len) { - case 0: - return NULL; - case 1: - (void) heap_set_len(heap, 0); - return heap->ptrs[0]; - } - /* Shrink, replace the current max by previous last entry and heapify */ - heap_set_len(heap, heap->len - 1); - /* len changed. previous last entry is at heap->len */ - return lttng_heap_replace_max(heap, heap->ptrs[heap->len]); -} - -void *lttng_heap_cherrypick(struct lttng_ptr_heap *heap, void *p) -{ - size_t pos, len = heap->len; - - for (pos = 0; pos < len; pos++) - if (heap->ptrs[pos] == p) - goto found; - return NULL; -found: - if (heap->len == 1) { - (void) heap_set_len(heap, 0); - lttng_check_heap(heap); - return heap->ptrs[0]; - } - /* Replace p with previous last entry and heapify. */ - heap_set_len(heap, heap->len - 1); - /* len changed. previous last entry is at heap->len */ - heap->ptrs[pos] = heap->ptrs[heap->len]; - heapify(heap, pos); - return p; -} diff --git a/libprio_heap/lttng_prio_heap.h b/libprio_heap/lttng_prio_heap.h deleted file mode 100644 index ea8dbb8f..00000000 --- a/libprio_heap/lttng_prio_heap.h +++ /dev/null @@ -1,117 +0,0 @@ -#ifndef _LTTNG_PRIO_HEAP_H -#define _LTTNG_PRIO_HEAP_H - -/* - * lttng_prio_heap.h - * - * Priority heap containing pointers. Based on CLRS, chapter 6. - * - * Copyright 2011 - Mathieu Desnoyers - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - */ - -#include - -struct lttng_ptr_heap { - size_t len, alloc_len; - void **ptrs; - int (*gt)(void *a, void *b); - gfp_t gfpmask; -}; - -#ifdef DEBUG_HEAP -void lttng_check_heap(const struct lttng_ptr_heap *heap); -#else -static inline -void lttng_check_heap(const struct lttng_ptr_heap *heap) -{ -} -#endif - -/** - * lttng_heap_maximum - return the largest element in the heap - * @heap: the heap to be operated on - * - * Returns the largest element in the heap, without performing any modification - * to the heap structure. Returns NULL if the heap is empty. - */ -static inline void *lttng_heap_maximum(const struct lttng_ptr_heap *heap) -{ - lttng_check_heap(heap); - return heap->len ? heap->ptrs[0] : NULL; -} - -/** - * lttng_heap_init - initialize the heap - * @heap: the heap to initialize - * @alloc_len: number of elements initially allocated - * @gfp: allocation flags - * @gt: function to compare the elements - * - * Returns -ENOMEM if out of memory. - */ -extern int lttng_heap_init(struct lttng_ptr_heap *heap, - size_t alloc_len, gfp_t gfpmask, - int gt(void *a, void *b)); - -/** - * lttng_heap_free - free the heap - * @heap: the heap to free - */ -extern void lttng_heap_free(struct lttng_ptr_heap *heap); - -/** - * lttng_heap_insert - insert an element into the heap - * @heap: the heap to be operated on - * @p: the element to add - * - * Insert an element into the heap. - * - * Returns -ENOMEM if out of memory. - */ -extern int lttng_heap_insert(struct lttng_ptr_heap *heap, void *p); - -/** - * lttng_heap_remove - remove the largest element from the heap - * @heap: the heap to be operated on - * - * Returns the largest element in the heap. It removes this element from the - * heap. Returns NULL if the heap is empty. - */ -extern void *lttng_heap_remove(struct lttng_ptr_heap *heap); - -/** - * lttng_heap_cherrypick - remove a given element from the heap - * @heap: the heap to be operated on - * @p: the element - * - * Remove the given element from the heap. Return the element if present, else - * return NULL. This algorithm has a complexity of O(n), which is higher than - * O(log(n)) provided by the rest of this API. - */ -extern void *lttng_heap_cherrypick(struct lttng_ptr_heap *heap, void *p); - -/** - * lttng_heap_replace_max - replace the the largest element from the heap - * @heap: the heap to be operated on - * @p: the pointer to be inserted as topmost element replacement - * - * Returns the largest element in the heap. It removes this element from the - * heap. The heap is rebalanced only once after the insertion. Returns NULL if - * the heap is empty. - * - * This is the equivalent of calling heap_remove() and then heap_insert(), but - * it only rebalances the heap once. It never allocates memory. - */ -extern void *lttng_heap_replace_max(struct lttng_ptr_heap *heap, void *p); - -#endif /* _LTTNG_PRIO_HEAP_H */ diff --git a/libringbuffer/Makefile.am b/libringbuffer/Makefile.am index bc2c7a72..22660533 100644 --- a/libringbuffer/Makefile.am +++ b/libringbuffer/Makefile.am @@ -6,11 +6,7 @@ lib_LTLIBRARIES = libringbuffer.la libringbuffer_la_SOURCES = \ ring_buffer_backend.c \ ring_buffer_frontend.c \ - ring_buffer_iterator.c \ - ring_buffer_vfs.c \ - ring_buffer_splice.c \ - ring_buffer_mmap.c \ - ../libprio_heap/lttng_prio_heap.c + ring_buffer_abi.c libringbuffer_la_LDFLAGS = -no-undefined -version-info 0:0:0 diff --git a/libringbuffer/backend.h b/libringbuffer/backend.h index b50bae1a..61d2f327 100644 --- a/libringbuffer/backend.h +++ b/libringbuffer/backend.h @@ -14,6 +14,10 @@ * the reader in flight recorder mode. */ +#include + +#include "ust/core.h" + /* Internal helpers */ #include "backend_internal.h" #include "frontend_internal.h" @@ -25,10 +29,6 @@ extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, void *dest, size_t len); -extern int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb, - size_t offset, void __user *dest, - size_t len); - extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset, void *dest, size_t len); @@ -76,7 +76,7 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config, offset &= chanb->buf_size - 1; sbidx = offset >> chanb->subbuf_size_order; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK); id = bufb->buf_wsb[sbidx].id; sb_bindex = subbuffer_id_get_index(config, id); @@ -124,9 +124,4 @@ unsigned long lib_ring_buffer_get_records_unread( return records_unread; } -ssize_t lib_ring_buffer_file_splice_read(struct file *in, loff_t *ppos, - struct pipe_inode_info *pipe, - size_t len, unsigned int flags); -loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin); - #endif /* _LINUX_RING_BUFFER_BACKEND_H */ diff --git a/libringbuffer/backend_internal.h b/libringbuffer/backend_internal.h index 2c3de67a..c5f3362e 100644 --- a/libringbuffer/backend_internal.h +++ b/libringbuffer/backend_internal.h @@ -11,6 +11,9 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include +#include + #include "config.h" #include "backend_types.h" #include "frontend_types.h" @@ -52,7 +55,7 @@ extern void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, * sampling and subbuffer ID exchange). */ -#define HALF_ULONG_BITS (BITS_PER_LONG >> 1) +#define HALF_ULONG_BITS (CAA_BITS_PER_LONG >> 1) #define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1) #define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT) @@ -145,7 +148,7 @@ void subbuffer_id_set_noref_offset(const struct lib_ring_buffer_config *config, tmp |= offset << SB_ID_OFFSET_SHIFT; tmp |= SB_ID_NOREF_MASK; /* Volatile store, read concurrently by readers. */ - ACCESS_ONCE(*id) = tmp; + CMM_ACCESS_ONCE(*id) = tmp; } } @@ -300,7 +303,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, * Performing a volatile access to read the sb_pages, because we want to * read a coherent version of the pointer and the associated noref flag. */ - id = ACCESS_ONCE(bufb->buf_wsb[idx].id); + id = CMM_ACCESS_ONCE(bufb->buf_wsb[idx].id); for (;;) { /* This check is called on the fast path for each record. */ if (likely(!subbuffer_id_is_noref(config, id))) { @@ -314,7 +317,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, } new_id = id; subbuffer_id_clear_noref(config, &new_id); - new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id); + new_id = uatomic_cmpxchg(&bufb->buf_wsb[idx].id, id, new_id); if (likely(new_id == id)) break; id = new_id; @@ -350,7 +353,7 @@ void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *confi * Memory barrier that ensures counter stores are ordered before set * noref and offset. */ - smp_mb(); + cmm_smp_mb(); subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset); } @@ -369,7 +372,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, if (config->mode == RING_BUFFER_OVERWRITE) { /* * Exchange the target writer subbuffer with our own unused - * subbuffer. No need to use ACCESS_ONCE() here to read the + * subbuffer. No need to use CMM_ACCESS_ONCE() here to read the * old_wpage, because the value read will be confirmed by the * following cmpxchg(). */ @@ -387,7 +390,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, !subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id, consumed_count); - new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id, + new_id = uatomic_cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id, bufb->buf_rsb.id); if (unlikely(old_id != new_id)) return -EAGAIN; diff --git a/libringbuffer/backend_types.h b/libringbuffer/backend_types.h index 99de9f11..cfbe59c3 100644 --- a/libringbuffer/backend_types.h +++ b/libringbuffer/backend_types.h @@ -68,9 +68,7 @@ struct channel_backend { unsigned long num_subbuf; /* Number of sub-buffers for writer */ u64 start_tsc; /* Channel creation TSC value */ void *priv; /* Client-specific information */ - struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */ const struct lib_ring_buffer_config *config; /* Ring buffer configuration */ - cpumask_var_t cpumask; /* Allocated per-cpu buffers cpumask */ char name[NAME_MAX]; /* Channel name */ }; diff --git a/libringbuffer/frontend.h b/libringbuffer/frontend.h index 1eb4ae92..9d73da16 100644 --- a/libringbuffer/frontend.h +++ b/libringbuffer/frontend.h @@ -16,6 +16,9 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include +#include + /* Internal helpers */ #include "frontend_internal.h" @@ -60,7 +63,7 @@ void *channel_destroy(struct channel *chan); #define for_each_channel_cpu(cpu, chan) \ for ((cpu) = -1; \ ({ (cpu) = cpumask_next(cpu, (chan)->backend.cpumask); \ - smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });) + cmm_smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });) extern struct lib_ring_buffer *channel_get_ring_buffer( const struct lib_ring_buffer_config *config, @@ -118,7 +121,7 @@ static inline unsigned long lib_ring_buffer_get_consumed(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf) { - return atomic_long_read(&buf->consumed); + return uatomic_read(&buf->consumed); } /* @@ -129,11 +132,11 @@ static inline int lib_ring_buffer_is_finalized(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf) { - int finalized = ACCESS_ONCE(buf->finalized); + int finalized = CMM_ACCESS_ONCE(buf->finalized); /* * Read finalized before counters. */ - smp_rmb(); + cmm_smp_rmb(); return finalized; } @@ -146,7 +149,7 @@ int lib_ring_buffer_channel_is_finalized(const struct channel *chan) static inline int lib_ring_buffer_channel_is_disabled(const struct channel *chan) { - return atomic_read(&chan->record_disabled); + return uatomic_read(&chan->record_disabled); } static inline diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index 37e046bc..f758a684 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -16,10 +16,11 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include + #include "config.h" #include "backend_types.h" #include "frontend_types.h" -#include "../lib_prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */ /* Buffer offset macros */ @@ -81,7 +82,7 @@ unsigned long subbuf_index(unsigned long offset, struct channel *chan) * last_tsc atomically. */ -#if (BITS_PER_LONG == 32) +#if (CAA_BITS_PER_LONG == 32) static inline void save_last_tsc(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, u64 tsc) @@ -154,7 +155,7 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf, unsigned long consumed_old, consumed_new; do { - consumed_old = atomic_long_read(&buf->consumed); + consumed_old = uatomic_read(&buf->consumed); /* * If buffer is in overwrite mode, push the reader consumed * count if the write position has reached it and we are not @@ -170,7 +171,7 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf, consumed_new = subbuf_align(consumed_old, chan); else return; - } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old, + } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old, consumed_new) != consumed_old)); } @@ -191,7 +192,7 @@ int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, { unsigned long consumed_old, consumed_idx, commit_count, write_offset; - consumed_old = atomic_long_read(&buf->consumed); + consumed_old = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed_old, chan); commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); /* @@ -354,7 +355,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * respect to writers coming into the subbuffer after * wrap around, and also order wrt concurrent readers. */ - smp_mb(); + cmm_smp_mb(); /* End of exclusive subbuffer access */ v_set(config, &buf->commit_cold[idx].cc_sb, commit_count); @@ -365,10 +366,10 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER - && atomic_long_read(&buf->active_readers) + && uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + //wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&chan->read_wait); } } @@ -419,6 +420,6 @@ extern int lib_ring_buffer_create(struct lib_ring_buffer *buf, extern void lib_ring_buffer_free(struct lib_ring_buffer *buf); /* Keep track of trap nesting inside ring buffer code */ -DECLARE_PER_CPU(unsigned int, lib_ring_buffer_nesting); +extern __thread unsigned int lib_ring_buffer_nesting; #endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */ diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index f89d803c..c9f98cb7 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -16,9 +16,15 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include +#include +#include + +#include "ust/core.h" + +#include "usterr_signal_safe.h" #include "config.h" #include "backend_types.h" -#include "../lib_prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */ /* * A switch is done during tracing or as a final flush after tracing (so it @@ -26,24 +32,9 @@ */ enum switch_mode { SWITCH_ACTIVE, SWITCH_FLUSH }; -/* channel-level read-side iterator */ -struct channel_iter { - /* Prio heap of buffers. Lowest timestamps at the top. */ - struct lttng_ptr_heap heap; /* Heap of struct lib_ring_buffer ptrs */ - struct list_head empty_head; /* Empty buffers linked-list head */ - int read_open; /* Opened for reading ? */ - u64 last_qs; /* Last quiescent state timestamp */ - u64 last_timestamp; /* Last timestamp (for WARN_ON) */ - int last_cpu; /* Last timestamp cpu */ - /* - * read() file operation state. - */ - unsigned long len_left; -}; - /* channel: collection of per-cpu ring buffers. */ struct channel { - atomic_t record_disabled; + int record_disabled; unsigned long commit_count_mask; /* * Commit count mask, removing * the MSBs corresponding to @@ -55,16 +46,9 @@ struct channel { unsigned long switch_timer_interval; /* Buffer flush (jiffies) */ unsigned long read_timer_interval; /* Reader wakeup (jiffies) */ - struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */ - struct notifier_block tick_nohz_notifier; /* CPU nohz notifier */ - struct notifier_block hp_iter_notifier; /* hotplug iterator notifier */ - int cpu_hp_enable:1; /* Enable CPU hotplug notif. */ - int hp_iter_enable:1; /* Enable hp iter notif. */ - wait_queue_head_t read_wait; /* reader wait queue */ - wait_queue_head_t hp_wait; /* CPU hotplug wait queue */ + //wait_queue_head_t read_wait; /* reader wait queue */ int finalized; /* Has channel been finalized */ - struct channel_iter iter; /* Channel read-side iterator */ - struct kref ref; /* Reference count */ + struct urcu_ref ref; /* Reference count */ }; /* Per-subbuffer commit counters used on the hot path */ @@ -78,35 +62,17 @@ struct commit_counters_cold { union v_atomic cc_sb; /* Incremented _once_ at sb switch */ }; -/* Per-buffer read iterator */ -struct lib_ring_buffer_iter { - u64 timestamp; /* Current record timestamp */ - size_t header_len; /* Current record header length */ - size_t payload_len; /* Current record payload length */ - - struct list_head empty_node; /* Linked list of empty buffers */ - unsigned long consumed, read_offset, data_size; - enum { - ITER_GET_SUBBUF = 0, - ITER_TEST_RECORD, - ITER_NEXT_RECORD, - ITER_PUT_SUBBUF, - } state; - int allocated:1; - int read_open:1; /* Opened for reading ? */ -}; - /* ring buffer state */ struct lib_ring_buffer { /* First 32 bytes cache-hot cacheline */ union v_atomic offset; /* Current offset in the buffer */ struct commit_counters_hot *commit_hot; /* Commit count per sub-buffer */ - atomic_long_t consumed; /* + long consumed; /* * Current offset in the buffer * standard atomic access (shared) */ - atomic_t record_disabled; + int record_disabled; /* End of first 32 bytes cacheline */ union v_atomic last_tsc; /* * Last timestamp written in the buffer. @@ -116,7 +82,7 @@ struct lib_ring_buffer { struct commit_counters_cold *commit_cold; /* Commit count per sub-buffer */ - atomic_long_t active_readers; /* + long active_readers; /* * Active readers count * standard atomic access (shared) */ @@ -126,12 +92,10 @@ struct lib_ring_buffer { union v_atomic records_lost_big; /* Events too big */ union v_atomic records_count; /* Number of records written */ union v_atomic records_overrun; /* Number of overwritten records */ - wait_queue_head_t read_wait; /* reader buffer-level wait queue */ + //wait_queue_head_t read_wait; /* reader buffer-level wait queue */ int finalized; /* buffer has been finalized */ - struct timer_list switch_timer; /* timer for periodical switch */ - struct timer_list read_timer; /* timer for read poll */ - raw_spinlock_t raw_tick_nohz_spinlock; /* nohz entry lock/trylock */ - struct lib_ring_buffer_iter iter; /* read-side iterator */ + //struct timer_list switch_timer; /* timer for periodical switch */ + //struct timer_list read_timer; /* timer for read poll */ unsigned long get_subbuf_consumed; /* Read-side consumed */ unsigned long prod_snapshot; /* Producer count snapshot */ unsigned long cons_snapshot; /* Consumer count snapshot */ @@ -157,14 +121,14 @@ void *channel_get_private(struct channel *chan) int _____ret = unlikely(cond); \ if (_____ret) { \ if (__same_type(*(c), struct channel_backend)) \ - __chan = container_of((void *) (c), \ + __chan = caa_container_of((void *) (c), \ struct channel, \ backend); \ else if (__same_type(*(c), struct channel)) \ __chan = (void *) (c); \ else \ BUG_ON(1); \ - atomic_inc(&__chan->record_disabled); \ + uatomic_inc(&__chan->record_disabled); \ WARN_ON(1); \ } \ _____ret; \ diff --git a/libringbuffer/ring_buffer_vfs.c b/libringbuffer/ring_buffer_abi.c similarity index 100% rename from libringbuffer/ring_buffer_vfs.c rename to libringbuffer/ring_buffer_abi.c diff --git a/libringbuffer/ring_buffer_backend.c b/libringbuffer/ring_buffer_backend.c index 5ff38f0d..861acf74 100644 --- a/libringbuffer/ring_buffer_backend.c +++ b/libringbuffer/ring_buffer_backend.c @@ -6,6 +6,10 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include + +#include "ust/core.h" + #include "config.h" #include "backend.h" #include "frontend.h" @@ -32,7 +36,7 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config void **virt; unsigned long i; - num_pages = size >> PAGE_SHIFT; + num_pages = size >> get_count_order(PAGE_SIZE); num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf); subbuf_size = chanb->subbuf_size; num_subbuf_alloc = num_subbuf; @@ -42,22 +46,15 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config num_subbuf_alloc++; } - pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + pages = malloc_align(sizeof(*pages) * num_pages); if (unlikely(!pages)) goto pages_error; - virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + virt = malloc_align(sizeof(*virt) * num_pages); if (unlikely(!virt)) goto virt_error; - bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array) - * num_subbuf_alloc, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + bufb->array = malloc_align(sizeof(*bufb->array) * num_subbuf_alloc); if (unlikely(!bufb->array)) goto array_error; @@ -73,22 +70,18 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config /* Allocate backend pages array elements */ for (i = 0; i < num_subbuf_alloc; i++) { bufb->array[i] = - kzalloc_node(ALIGN( + zmalloc_align( sizeof(struct lib_ring_buffer_backend_pages) + sizeof(struct lib_ring_buffer_backend_page) - * num_pages_per_subbuf, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + * num_pages_per_subbuf); if (!bufb->array[i]) goto free_array; } /* Allocate write-side subbuffer table */ - bufb->buf_wsb = kzalloc_node(ALIGN( + bufb->buf_wsb = zmalloc_align( sizeof(struct lib_ring_buffer_backend_subbuffer) - * num_subbuf, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0))); + * num_subbuf); if (unlikely(!bufb->buf_wsb)) goto free_array; @@ -116,11 +109,6 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config } } - /* - * If kmalloc ever uses vmalloc underneath, make sure the buffer pages - * will not fault. - */ - wrapper_vmalloc_sync_all(); kfree(virt); kfree(pages); return 0; @@ -146,7 +134,7 @@ int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb, { const struct lib_ring_buffer_config *config = chanb->config; - bufb->chan = container_of(chanb, struct channel, backend); + bufb->chan = caa_container_of(chanb, struct channel, backend); bufb->cpu = cpu; return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size, @@ -209,7 +197,7 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) */ void channel_backend_reset(struct channel_backend *chanb) { - struct channel *chan = container_of(chanb, struct channel, backend); + struct channel *chan = caa_container_of(chanb, struct channel, backend); const struct lib_ring_buffer_config *config = chanb->config; /* @@ -235,7 +223,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, void *hcpu) { unsigned int cpu = (unsigned long)hcpu; - struct channel_backend *chanb = container_of(nb, struct channel_backend, + struct channel_backend *chanb = caa_container_of(nb, struct channel_backend, cpu_hp_notifier); const struct lib_ring_buffer_config *config = chanb->config; struct lib_ring_buffer *buf; @@ -289,7 +277,7 @@ int channel_backend_init(struct channel_backend *chanb, const struct lib_ring_buffer_config *config, void *priv, size_t subbuf_size, size_t num_subbuf) { - struct channel *chan = container_of(chanb, struct channel, backend); + struct channel *chan = caa_container_of(chanb, struct channel, backend); unsigned int i; int ret; @@ -468,7 +456,7 @@ void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset, src += pagecpy; offset += pagecpy; sbidx = offset >> chanb->subbuf_size_order; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); /* * Underlying layer should never ask for writes across @@ -512,7 +500,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, orig_len = len; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); if (unlikely(!len)) return 0; for (;;) { @@ -529,7 +517,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, break; dest += pagecpy; offset += pagecpy; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); /* * Underlying layer should never ask for reads across * subbuffers. @@ -540,60 +528,6 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, } EXPORT_SYMBOL_GPL(lib_ring_buffer_read); -/** - * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace - * @bufb : buffer backend - * @offset : offset within the buffer - * @dest : destination userspace address - * @len : length to copy to destination - * - * Should be protected by get_subbuf/put_subbuf. - * access_ok() must have been performed on dest addresses prior to call this - * function. - * Returns -EFAULT on error, 0 if ok. - */ -int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb, - size_t offset, void __user *dest, size_t len) -{ - struct channel_backend *chanb = &bufb->chan->backend; - const struct lib_ring_buffer_config *config = chanb->config; - size_t index; - ssize_t pagecpy, orig_len; - struct lib_ring_buffer_backend_pages *rpages; - unsigned long sb_bindex, id; - - orig_len = len; - offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; - if (unlikely(!len)) - return 0; - for (;;) { - pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK)); - id = bufb->buf_rsb.id; - sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; - CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE - && subbuffer_id_is_noref(config, id)); - if (__copy_to_user(dest, - rpages->p[index].virt + (offset & ~PAGE_MASK), - pagecpy)) - return -EFAULT; - len -= pagecpy; - if (likely(!len)) - break; - dest += pagecpy; - offset += pagecpy; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; - /* - * Underlying layer should never ask for reads across - * subbuffers. - */ - CHAN_WARN_ON(chanb, offset >= chanb->buf_size); - } - return 0; -} -EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user); - /** * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer. * @bufb : buffer backend @@ -616,7 +550,7 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); orig_offset = offset; for (;;) { id = bufb->buf_rsb.id; @@ -636,7 +570,7 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse len -= pagecpy; } offset += strpagelen; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); if (strpagelen < pagelen) break; /* @@ -670,7 +604,7 @@ struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); rpages = bufb->array[sb_bindex]; @@ -701,7 +635,7 @@ void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb, unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); rpages = bufb->array[sb_bindex]; @@ -732,7 +666,7 @@ void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb, offset &= chanb->buf_size - 1; sbidx = offset >> chanb->subbuf_size_order; - index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT; + index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); id = bufb->buf_wsb[sbidx].id; sb_bindex = subbuffer_id_get_index(config, id); rpages = bufb->array[sb_bindex]; diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index a3f14f3d..5ceda871 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -38,6 +38,8 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include + #include "config.h" #include "backend.h" #include "frontend.h" @@ -158,7 +160,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, struct channel_backend *chanb, int cpu) { const struct lib_ring_buffer_config *config = chanb->config; - struct channel *chan = container_of(chanb, struct channel, backend); + struct channel *chan = caa_container_of(chanb, struct channel, backend); void *priv = chanb->priv; unsigned int num_subbuf; size_t subbuf_header_size; @@ -391,7 +393,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, void *hcpu) { unsigned int cpu = (unsigned long)hcpu; - struct channel *chan = container_of(nb, struct channel, + struct channel *chan = caa_container_of(nb, struct channel, cpu_hp_notifier); struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); const struct lib_ring_buffer_config *config = chan->backend.config; @@ -447,7 +449,7 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, unsigned long val, void *data) { - struct channel *chan = container_of(nb, struct channel, + struct channel *chan = caa_container_of(nb, struct channel, tick_nohz_notifier); const struct lib_ring_buffer_config *config = chan->backend.config; struct lib_ring_buffer *buf; @@ -687,7 +689,7 @@ EXPORT_SYMBOL_GPL(channel_create); static void channel_release(struct kref *kref) { - struct channel *chan = container_of(kref, struct channel, ref); + struct channel *chan = caa_container_of(kref, struct channel, ref); channel_free(chan); } @@ -729,7 +731,7 @@ void *channel_destroy(struct channel *chan) * Perform flush before writing to finalized. */ smp_wmb(); - ACCESS_ONCE(buf->finalized) = 1; + CMM_ACCESS_ONCE(buf->finalized) = 1; wake_up_interruptible(&buf->read_wait); } } else { @@ -743,10 +745,10 @@ void *channel_destroy(struct channel *chan) * Perform flush before writing to finalized. */ smp_wmb(); - ACCESS_ONCE(buf->finalized) = 1; + CMM_ACCESS_ONCE(buf->finalized) = 1; wake_up_interruptible(&buf->read_wait); } - ACCESS_ONCE(chan->finalized) = 1; + CMM_ACCESS_ONCE(chan->finalized) = 1; wake_up_interruptible(&chan->hp_wait); wake_up_interruptible(&chan->read_wait); kref_put(&chan->ref, channel_release); @@ -820,7 +822,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, int finalized; retry: - finalized = ACCESS_ONCE(buf->finalized); + finalized = CMM_ACCESS_ONCE(buf->finalized); /* * Read finalized before counters. */ @@ -908,7 +910,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, int finalized; retry: - finalized = ACCESS_ONCE(buf->finalized); + finalized = CMM_ACCESS_ONCE(buf->finalized); /* * Read finalized before counters. */ diff --git a/libringbuffer/ring_buffer_iterator.c b/libringbuffer/ring_buffer_iterator.c deleted file mode 100644 index ff8e582b..00000000 --- a/libringbuffer/ring_buffer_iterator.c +++ /dev/null @@ -1,793 +0,0 @@ -/* - * ring_buffer_iterator.c - * - * (C) Copyright 2010 - Mathieu Desnoyers - * - * Ring buffer and channel iterators. Get each event of a channel in order. Uses - * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic - * complexity for the "get next event" operation. - * - * Author: - * Mathieu Desnoyers - * - * Dual LGPL v2.1/GPL v2 license. - */ - -#include "iterator.h" - -/* - * Safety factor taking into account internal kernel interrupt latency. - * Assuming 250ms worse-case latency. - */ -#define MAX_SYSTEM_LATENCY 250 - -/* - * Maximum delta expected between trace clocks. At most 1 jiffy delta. - */ -#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000) - -/** - * lib_ring_buffer_get_next_record - Get the next record in a buffer. - * @chan: channel - * @buf: buffer - * - * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if - * buffer is empty and finalized. The buffer must already be opened for reading. - */ -ssize_t lib_ring_buffer_get_next_record(struct channel *chan, - struct lib_ring_buffer *buf) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer_iter *iter = &buf->iter; - int ret; - -restart: - switch (iter->state) { - case ITER_GET_SUBBUF: - ret = lib_ring_buffer_get_next_subbuf(buf); - if (ret && !ACCESS_ONCE(buf->finalized) - && config->alloc == RING_BUFFER_ALLOC_GLOBAL) { - /* - * Use "pull" scheme for global buffers. The reader - * itself flushes the buffer to "pull" data not visible - * to readers yet. Flush current subbuffer and re-try. - * - * Per-CPU buffers rather use a "push" scheme because - * the IPI needed to flush all CPU's buffers is too - * costly. In the "push" scheme, the reader waits for - * the writer periodic deferrable timer to flush the - * buffers (keeping track of a quiescent state - * timestamp). Therefore, the writer "pushes" data out - * of the buffers rather than letting the reader "pull" - * data from the buffer. - */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - ret = lib_ring_buffer_get_next_subbuf(buf); - } - if (ret) - return ret; - iter->consumed = buf->cons_snapshot; - iter->data_size = lib_ring_buffer_get_read_data_size(config, buf); - iter->read_offset = iter->consumed; - /* skip header */ - iter->read_offset += config->cb.subbuffer_header_size(); - iter->state = ITER_TEST_RECORD; - goto restart; - case ITER_TEST_RECORD: - if (iter->read_offset - iter->consumed >= iter->data_size) { - iter->state = ITER_PUT_SUBBUF; - } else { - CHAN_WARN_ON(chan, !config->cb.record_get); - config->cb.record_get(config, chan, buf, - iter->read_offset, - &iter->header_len, - &iter->payload_len, - &iter->timestamp); - iter->read_offset += iter->header_len; - subbuffer_consume_record(config, &buf->backend); - iter->state = ITER_NEXT_RECORD; - return iter->payload_len; - } - goto restart; - case ITER_NEXT_RECORD: - iter->read_offset += iter->payload_len; - iter->state = ITER_TEST_RECORD; - goto restart; - case ITER_PUT_SUBBUF: - lib_ring_buffer_put_next_subbuf(buf); - iter->state = ITER_GET_SUBBUF; - goto restart; - default: - CHAN_WARN_ON(chan, 1); /* Should not happen */ - return -EPERM; - } -} -EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record); - -static int buf_is_higher(void *a, void *b) -{ - struct lib_ring_buffer *bufa = a; - struct lib_ring_buffer *bufb = b; - - /* Consider lowest timestamps to be at the top of the heap */ - return (bufa->iter.timestamp < bufb->iter.timestamp); -} - -static -void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config, - struct channel *chan) -{ - struct lttng_ptr_heap *heap = &chan->iter.heap; - struct lib_ring_buffer *buf, *tmp; - ssize_t len; - - list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head, - iter.empty_node) { - len = lib_ring_buffer_get_next_record(chan, buf); - - /* - * Deal with -EAGAIN and -ENODATA. - * len >= 0 means record contains data. - * -EBUSY should never happen, because we support only one - * reader. - */ - switch (len) { - case -EAGAIN: - /* Keep node in empty list */ - break; - case -ENODATA: - /* - * Buffer is finalized. Don't add to list of empty - * buffer, because it has no more data to provide, ever. - */ - list_del(&buf->iter.empty_node); - break; - case -EBUSY: - CHAN_WARN_ON(chan, 1); - break; - default: - /* - * Insert buffer into the heap, remove from empty buffer - * list. - */ - CHAN_WARN_ON(chan, len < 0); - list_del(&buf->iter.empty_node); - CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf)); - } - } -} - -static -void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config, - struct channel *chan) -{ - u64 timestamp_qs; - unsigned long wait_msecs; - - /* - * No need to wait if no empty buffers are present. - */ - if (list_empty(&chan->iter.empty_head)) - return; - - timestamp_qs = config->cb.ring_buffer_clock_read(chan); - /* - * We need to consider previously empty buffers. - * Do a get next buf record on each of them. Add them to - * the heap if they have data. If at least one of them - * don't have data, we need to wait for - * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the - * buffers have been switched either by the timer or idle entry) and - * check them again, adding them if they have data. - */ - lib_ring_buffer_get_empty_buf_records(config, chan); - - /* - * No need to wait if no empty buffers are present. - */ - if (list_empty(&chan->iter.empty_head)) - return; - - /* - * We need to wait for the buffer switch timer to run. If the - * CPU is idle, idle entry performed the switch. - * TODO: we could optimize further by skipping the sleep if all - * empty buffers belong to idle or offline cpus. - */ - wait_msecs = jiffies_to_msecs(chan->switch_timer_interval); - wait_msecs += MAX_SYSTEM_LATENCY; - msleep(wait_msecs); - lib_ring_buffer_get_empty_buf_records(config, chan); - /* - * Any buffer still in the empty list here cannot possibly - * contain an event with a timestamp prior to "timestamp_qs". - * The new quiescent state timestamp is the one we grabbed - * before waiting for buffer data. It is therefore safe to - * ignore empty buffers up to last_qs timestamp for fusion - * merge. - */ - chan->iter.last_qs = timestamp_qs; -} - -/** - * channel_get_next_record - Get the next record in a channel. - * @chan: channel - * @ret_buf: the buffer in which the event is located (output) - * - * Returns the size of new current event, -EAGAIN if all buffers are empty, - * -ENODATA if all buffers are empty and finalized. The channel must already be - * opened for reading. - */ - -ssize_t channel_get_next_record(struct channel *chan, - struct lib_ring_buffer **ret_buf) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - struct lttng_ptr_heap *heap; - ssize_t len; - - if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) { - *ret_buf = channel_get_ring_buffer(config, chan, 0); - return lib_ring_buffer_get_next_record(chan, *ret_buf); - } - - heap = &chan->iter.heap; - - /* - * get next record for topmost buffer. - */ - buf = lttng_heap_maximum(heap); - if (buf) { - len = lib_ring_buffer_get_next_record(chan, buf); - /* - * Deal with -EAGAIN and -ENODATA. - * len >= 0 means record contains data. - */ - switch (len) { - case -EAGAIN: - buf->iter.timestamp = 0; - list_add(&buf->iter.empty_node, &chan->iter.empty_head); - /* Remove topmost buffer from the heap */ - CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf); - break; - case -ENODATA: - /* - * Buffer is finalized. Remove buffer from heap and - * don't add to list of empty buffer, because it has no - * more data to provide, ever. - */ - CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf); - break; - case -EBUSY: - CHAN_WARN_ON(chan, 1); - break; - default: - /* - * Reinsert buffer into the heap. Note that heap can be - * partially empty, so we need to use - * lttng_heap_replace_max(). - */ - CHAN_WARN_ON(chan, len < 0); - CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf); - break; - } - } - - buf = lttng_heap_maximum(heap); - if (!buf || buf->iter.timestamp > chan->iter.last_qs) { - /* - * Deal with buffers previously showing no data. - * Add buffers containing data to the heap, update - * last_qs. - */ - lib_ring_buffer_wait_for_qs(config, chan); - } - - *ret_buf = buf = lttng_heap_maximum(heap); - if (buf) { - /* - * If this warning triggers, you probably need to check your - * system interrupt latency. Typical causes: too many printk() - * output going to a serial console with interrupts off. - * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward. - * Observed on SMP KVM setups with trace_clock(). - */ - if (chan->iter.last_timestamp - > (buf->iter.timestamp + MAX_CLOCK_DELTA)) { - printk(KERN_WARNING "ring_buffer: timestamps going " - "backward. Last time %llu ns, cpu %d, " - "current time %llu ns, cpu %d, " - "delta %llu ns.\n", - chan->iter.last_timestamp, chan->iter.last_cpu, - buf->iter.timestamp, buf->backend.cpu, - chan->iter.last_timestamp - buf->iter.timestamp); - CHAN_WARN_ON(chan, 1); - } - chan->iter.last_timestamp = buf->iter.timestamp; - chan->iter.last_cpu = buf->backend.cpu; - return buf->iter.payload_len; - } else { - /* Heap is empty */ - if (list_empty(&chan->iter.empty_head)) - return -ENODATA; /* All buffers finalized */ - else - return -EAGAIN; /* Temporarily empty */ - } -} -EXPORT_SYMBOL_GPL(channel_get_next_record); - -static -void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf) -{ - if (buf->iter.allocated) - return; - - buf->iter.allocated = 1; - if (chan->iter.read_open && !buf->iter.read_open) { - CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0); - buf->iter.read_open = 1; - } - - /* Add to list of buffers without any current record */ - if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU) - list_add(&buf->iter.empty_node, &chan->iter.empty_head); -} - -#ifdef CONFIG_HOTPLUG_CPU -static -int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb, - unsigned long action, - void *hcpu) -{ - unsigned int cpu = (unsigned long)hcpu; - struct channel *chan = container_of(nb, struct channel, - hp_iter_notifier); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = chan->backend.config; - - if (!chan->hp_iter_enable) - return NOTIFY_DONE; - - CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); - - switch (action) { - case CPU_DOWN_FAILED: - case CPU_DOWN_FAILED_FROZEN: - case CPU_ONLINE: - case CPU_ONLINE_FROZEN: - lib_ring_buffer_iterator_init(chan, buf); - return NOTIFY_OK; - default: - return NOTIFY_DONE; - } -} -#endif - -int channel_iterator_init(struct channel *chan) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - int cpu, ret; - - INIT_LIST_HEAD(&chan->iter.empty_head); - ret = lttng_heap_init(&chan->iter.heap, - num_possible_cpus(), - GFP_KERNEL, buf_is_higher); - if (ret) - return ret; - /* - * In case of non-hotplug cpu, if the ring-buffer is allocated - * in early initcall, it will not be notified of secondary cpus. - * In that off case, we need to allocate for all possible cpus. - */ -#ifdef CONFIG_HOTPLUG_CPU - chan->hp_iter_notifier.notifier_call = - channel_iterator_cpu_hotplug; - chan->hp_iter_notifier.priority = 10; - register_cpu_notifier(&chan->hp_iter_notifier); - get_online_cpus(); - for_each_online_cpu(cpu) { - buf = per_cpu_ptr(chan->backend.buf, cpu); - lib_ring_buffer_iterator_init(chan, buf); - } - chan->hp_iter_enable = 1; - put_online_cpus(); -#else - for_each_possible_cpu(cpu) { - buf = per_cpu_ptr(chan->backend.buf, cpu); - lib_ring_buffer_iterator_init(chan, buf); - } -#endif - } else { - buf = channel_get_ring_buffer(config, chan, 0); - lib_ring_buffer_iterator_init(chan, buf); - } - return 0; -} - -void channel_iterator_unregister_notifiers(struct channel *chan) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - chan->hp_iter_enable = 0; - unregister_cpu_notifier(&chan->hp_iter_notifier); - } -} - -void channel_iterator_free(struct channel *chan) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_heap_free(&chan->iter.heap); -} - -int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf) -{ - struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = chan->backend.config; - CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR); - return lib_ring_buffer_open_read(buf); -} -EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open); - -/* - * Note: Iterators must not be mixed with other types of outputs, because an - * iterator can leave the buffer in "GET" state, which is not consistent with - * other types of output (mmap, splice, raw data read). - */ -void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf) -{ - lib_ring_buffer_release_read(buf); -} -EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release); - -int channel_iterator_open(struct channel *chan) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - int ret = 0, cpu; - - CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR); - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - get_online_cpus(); - /* Allow CPU hotplug to keep track of opened reader */ - chan->iter.read_open = 1; - for_each_channel_cpu(cpu, chan) { - buf = channel_get_ring_buffer(config, chan, cpu); - ret = lib_ring_buffer_iterator_open(buf); - if (ret) - goto error; - buf->iter.read_open = 1; - } - put_online_cpus(); - } else { - buf = channel_get_ring_buffer(config, chan, 0); - ret = lib_ring_buffer_iterator_open(buf); - } - return ret; -error: - /* Error should always happen on CPU 0, hence no close is required. */ - CHAN_WARN_ON(chan, cpu != 0); - put_online_cpus(); - return ret; -} -EXPORT_SYMBOL_GPL(channel_iterator_open); - -void channel_iterator_release(struct channel *chan) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - int cpu; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - get_online_cpus(); - for_each_channel_cpu(cpu, chan) { - buf = channel_get_ring_buffer(config, chan, cpu); - if (buf->iter.read_open) { - lib_ring_buffer_iterator_release(buf); - buf->iter.read_open = 0; - } - } - chan->iter.read_open = 0; - put_online_cpus(); - } else { - buf = channel_get_ring_buffer(config, chan, 0); - lib_ring_buffer_iterator_release(buf); - } -} -EXPORT_SYMBOL_GPL(channel_iterator_release); - -void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf) -{ - struct channel *chan = buf->backend.chan; - - if (buf->iter.state != ITER_GET_SUBBUF) - lib_ring_buffer_put_next_subbuf(buf); - buf->iter.state = ITER_GET_SUBBUF; - /* Remove from heap (if present). */ - if (lttng_heap_cherrypick(&chan->iter.heap, buf)) - list_add(&buf->iter.empty_node, &chan->iter.empty_head); - buf->iter.timestamp = 0; - buf->iter.header_len = 0; - buf->iter.payload_len = 0; - buf->iter.consumed = 0; - buf->iter.read_offset = 0; - buf->iter.data_size = 0; - /* Don't reset allocated and read_open */ -} - -void channel_iterator_reset(struct channel *chan) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - int cpu; - - /* Empty heap, put into empty_head */ - while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL) - list_add(&buf->iter.empty_node, &chan->iter.empty_head); - - for_each_channel_cpu(cpu, chan) { - buf = channel_get_ring_buffer(config, chan, cpu); - lib_ring_buffer_iterator_reset(buf); - } - /* Don't reset read_open */ - chan->iter.last_qs = 0; - chan->iter.last_timestamp = 0; - chan->iter.last_cpu = 0; - chan->iter.len_left = 0; -} - -/* - * Ring buffer payload extraction read() implementation. - */ -static -ssize_t channel_ring_buffer_file_read(struct file *filp, - char __user *user_buf, - size_t count, - loff_t *ppos, - struct channel *chan, - struct lib_ring_buffer *buf, - int fusionmerge) -{ - const struct lib_ring_buffer_config *config = chan->backend.config; - size_t read_count = 0, read_offset; - ssize_t len; - - might_sleep(); - if (!access_ok(VERIFY_WRITE, user_buf, count)) - return -EFAULT; - - /* Finish copy of previous record */ - if (*ppos != 0) { - if (read_count < count) { - len = chan->iter.len_left; - read_offset = *ppos; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU - && fusionmerge) - buf = lttng_heap_maximum(&chan->iter.heap); - CHAN_WARN_ON(chan, !buf); - goto skip_get_next; - } - } - - while (read_count < count) { - size_t copy_len, space_left; - - if (fusionmerge) - len = channel_get_next_record(chan, &buf); - else - len = lib_ring_buffer_get_next_record(chan, buf); -len_test: - if (len < 0) { - /* - * Check if buffer is finalized (end of file). - */ - if (len == -ENODATA) { - /* A 0 read_count will tell about end of file */ - goto nodata; - } - if (filp->f_flags & O_NONBLOCK) { - if (!read_count) - read_count = -EAGAIN; - goto nodata; - } else { - int error; - - /* - * No data available at the moment, return what - * we got. - */ - if (read_count) - goto nodata; - - /* - * Wait for returned len to be >= 0 or -ENODATA. - */ - if (fusionmerge) - error = wait_event_interruptible( - chan->read_wait, - ((len = channel_get_next_record(chan, - &buf)), len != -EAGAIN)); - else - error = wait_event_interruptible( - buf->read_wait, - ((len = lib_ring_buffer_get_next_record( - chan, buf)), len != -EAGAIN)); - CHAN_WARN_ON(chan, len == -EBUSY); - if (error) { - read_count = error; - goto nodata; - } - CHAN_WARN_ON(chan, len < 0 && len != -ENODATA); - goto len_test; - } - } - read_offset = buf->iter.read_offset; -skip_get_next: - space_left = count - read_count; - if (len <= space_left) { - copy_len = len; - chan->iter.len_left = 0; - *ppos = 0; - } else { - copy_len = space_left; - chan->iter.len_left = len - copy_len; - *ppos = read_offset + copy_len; - } - if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset, - &user_buf[read_count], - copy_len)) { - /* - * Leave the len_left and ppos values at their current - * state, as we currently have a valid event to read. - */ - return -EFAULT; - } - read_count += copy_len; - }; - return read_count; - -nodata: - *ppos = 0; - chan->iter.len_left = 0; - return read_count; -} - -/** - * lib_ring_buffer_file_read - Read buffer record payload. - * @filp: file structure pointer. - * @buffer: user buffer to read data into. - * @count: number of bytes to read. - * @ppos: file read position. - * - * Returns a negative value on error, or the number of bytes read on success. - * ppos is used to save the position _within the current record_ between calls - * to read(). - */ -static -ssize_t lib_ring_buffer_file_read(struct file *filp, - char __user *user_buf, - size_t count, - loff_t *ppos) -{ - struct inode *inode = filp->f_dentry->d_inode; - struct lib_ring_buffer *buf = inode->i_private; - struct channel *chan = buf->backend.chan; - - return channel_ring_buffer_file_read(filp, user_buf, count, ppos, - chan, buf, 0); -} - -/** - * channel_file_read - Read channel record payload. - * @filp: file structure pointer. - * @buffer: user buffer to read data into. - * @count: number of bytes to read. - * @ppos: file read position. - * - * Returns a negative value on error, or the number of bytes read on success. - * ppos is used to save the position _within the current record_ between calls - * to read(). - */ -static -ssize_t channel_file_read(struct file *filp, - char __user *user_buf, - size_t count, - loff_t *ppos) -{ - struct inode *inode = filp->f_dentry->d_inode; - struct channel *chan = inode->i_private; - const struct lib_ring_buffer_config *config = chan->backend.config; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - return channel_ring_buffer_file_read(filp, user_buf, count, - ppos, chan, NULL, 1); - else { - struct lib_ring_buffer *buf = - channel_get_ring_buffer(config, chan, 0); - return channel_ring_buffer_file_read(filp, user_buf, count, - ppos, chan, buf, 0); - } -} - -static -int lib_ring_buffer_file_open(struct inode *inode, struct file *file) -{ - struct lib_ring_buffer *buf = inode->i_private; - int ret; - - ret = lib_ring_buffer_iterator_open(buf); - if (ret) - return ret; - - file->private_data = buf; - ret = nonseekable_open(inode, file); - if (ret) - goto release_iter; - return 0; - -release_iter: - lib_ring_buffer_iterator_release(buf); - return ret; -} - -static -int lib_ring_buffer_file_release(struct inode *inode, struct file *file) -{ - struct lib_ring_buffer *buf = inode->i_private; - - lib_ring_buffer_iterator_release(buf); - return 0; -} - -static -int channel_file_open(struct inode *inode, struct file *file) -{ - struct channel *chan = inode->i_private; - int ret; - - ret = channel_iterator_open(chan); - if (ret) - return ret; - - file->private_data = chan; - ret = nonseekable_open(inode, file); - if (ret) - goto release_iter; - return 0; - -release_iter: - channel_iterator_release(chan); - return ret; -} - -static -int channel_file_release(struct inode *inode, struct file *file) -{ - struct channel *chan = inode->i_private; - - channel_iterator_release(chan); - return 0; -} - -const struct file_operations channel_payload_file_operations = { - .open = channel_file_open, - .release = channel_file_release, - .read = channel_file_read, - .llseek = lib_ring_buffer_no_llseek, -}; -EXPORT_SYMBOL_GPL(channel_payload_file_operations); - -const struct file_operations lib_ring_buffer_payload_file_operations = { - .open = lib_ring_buffer_file_open, - .release = lib_ring_buffer_file_release, - .read = lib_ring_buffer_file_read, - .llseek = lib_ring_buffer_no_llseek, -}; -EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations); diff --git a/libringbuffer/vatomic.h b/libringbuffer/vatomic.h index eea6d049..831ea043 100644 --- a/libringbuffer/vatomic.h +++ b/libringbuffer/vatomic.h @@ -9,6 +9,9 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include +#include + /* * Same data type (long) accessed differently depending on configuration. * v field is for non-atomic access (protected by mutual exclusion). @@ -18,46 +21,37 @@ * atomic_long_t is used for globally shared buffers. */ union v_atomic { - local_t l; - atomic_long_t a; + long a; /* accessed through uatomic */ long v; }; static inline long v_read(const struct lib_ring_buffer_config *config, union v_atomic *v_a) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU) - return local_read(&v_a->l); - else - return atomic_long_read(&v_a->a); + assert(config->sync != RING_BUFFER_SYNC_PER_CPU); + return uatomic_read(&v_a->a); } static inline void v_set(const struct lib_ring_buffer_config *config, union v_atomic *v_a, long v) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU) - local_set(&v_a->l, v); - else - atomic_long_set(&v_a->a, v); + assert(config->sync != RING_BUFFER_SYNC_PER_CPU); + uatomic_set(&v_a->a, v); } static inline void v_add(const struct lib_ring_buffer_config *config, long v, union v_atomic *v_a) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU) - local_add(v, &v_a->l); - else - atomic_long_add(v, &v_a->a); + assert(config->sync != RING_BUFFER_SYNC_PER_CPU); + uatomic_add(&v_a->a, v); } static inline void v_inc(const struct lib_ring_buffer_config *config, union v_atomic *v_a) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU) - local_inc(&v_a->l); - else - atomic_long_inc(&v_a->a); + assert(config->sync != RING_BUFFER_SYNC_PER_CPU); + uatomic_inc(&v_a->a); } /* @@ -73,10 +67,8 @@ static inline long v_cmpxchg(const struct lib_ring_buffer_config *config, union v_atomic *v_a, long old, long _new) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU) - return local_cmpxchg(&v_a->l, old, _new); - else - return atomic_long_cmpxchg(&v_a->a, old, _new); + assert(config->sync != RING_BUFFER_SYNC_PER_CPU); + return uatomic_cmpxchg(&v_a->a, old, _new); } #endif /* _LINUX_RING_BUFFER_VATOMIC_H */ -- 2.34.1