X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Ffrontend_internal.h;h=a6b96c1a098ab089199cbc9d06f59549bcc4328f;hb=824f40b81426c6ac82685251018dae00947786a9;hp=3bd5721eb261df7d26199d8ce5c35470a779127f;hpb=852c29366c62c78f78af261f0287afcd92eae6b5;p=lttng-ust.git diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index 3bd5721e..a6b96c1a 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -16,10 +16,12 @@ * Dual LGPL v2.1/GPL v2 license. */ -#include "../../wrapper/ringbuffer/config.h" -#include "../../wrapper/ringbuffer/backend_types.h" -#include "../../wrapper/ringbuffer/frontend_types.h" -#include "../../lib/prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */ +#include + +#include +#include "backend_types.h" +#include "frontend_types.h" +#include "shm.h" /* Buffer offset macros */ @@ -81,7 +83,7 @@ unsigned long subbuf_index(unsigned long offset, struct channel *chan) * last_tsc atomically. */ -#if (BITS_PER_LONG == 32) +#if (CAA_BITS_PER_LONG == 32) static inline void save_last_tsc(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, u64 tsc) @@ -142,7 +144,8 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx); extern void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, - enum switch_mode mode); + enum switch_mode mode, + struct shm_handle *handle); /* Buffer write helpers */ @@ -154,7 +157,7 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf, unsigned long consumed_old, consumed_new; do { - consumed_old = atomic_long_read(&buf->consumed); + consumed_old = uatomic_read(&buf->consumed); /* * If buffer is in overwrite mode, push the reader consumed * count if the write position has reached it and we are not @@ -170,7 +173,7 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf, consumed_new = subbuf_align(consumed_old, chan); else return; - } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old, + } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old, consumed_new) != consumed_old)); } @@ -178,22 +181,24 @@ static inline void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) - v_set(config, &buf->commit_hot[idx].seq, commit_count); + v_set(config, &shmp_index(handle, buf->commit_hot, idx)->seq, commit_count); } static inline int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - struct channel *chan) + struct channel *chan, + struct shm_handle *handle) { unsigned long consumed_old, consumed_idx, commit_count, write_offset; - consumed_old = atomic_long_read(&buf->consumed); + consumed_old = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed_old, chan); - commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb); /* * No memory barrier here, since we are only interested * in a statistically correct polling result. The next poll will @@ -237,9 +242,10 @@ int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config, static inline unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { - return subbuffer_get_data_size(config, &buf->backend, idx); + return subbuffer_get_data_size(config, &buf->backend, idx, handle); } /* @@ -250,7 +256,8 @@ unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config static inline int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - struct channel *chan) + struct channel *chan, + struct shm_handle *handle) { unsigned long offset, idx, commit_count; @@ -268,7 +275,7 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi do { offset = v_read(config, &buf->offset); idx = subbuf_index(offset, chan); - commit_count = v_read(config, &buf->commit_hot[idx].cc); + commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->cc); } while (offset != v_read(config, &buf->offset)); return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) @@ -281,7 +288,8 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, struct channel *chan, unsigned long offset, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -316,7 +324,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ - if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, + if (likely(v_cmpxchg(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { /* @@ -328,17 +336,20 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, tsc = config->cb.ring_buffer_clock_read(chan); v_add(config, subbuffer_get_records_count(config, - &buf->backend, idx), + &buf->backend, + idx, handle), &buf->records_count); v_add(config, subbuffer_count_records_overrun(config, &buf->backend, - idx), + idx, handle), &buf->records_overrun); config->cb.buffer_end(buf, tsc, idx, lib_ring_buffer_get_data_size(config, buf, - idx)); + idx, + handle), + handle); /* * Set noref flag and offset for this subbuffer id. @@ -346,7 +357,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * are ordered before set noref and offset. */ lib_ring_buffer_set_noref_offset(config, &buf->backend, idx, - buf_trunc_val(offset, chan)); + buf_trunc_val(offset, chan), handle); /* * Order set_noref and record counter updates before the @@ -354,21 +365,47 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * respect to writers coming into the subbuffer after * wrap around, and also order wrt concurrent readers. */ - smp_mb(); + cmm_smp_mb(); /* End of exclusive subbuffer access */ - v_set(config, &buf->commit_cold[idx].cc_sb, + v_set(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, commit_count); lib_ring_buffer_vmcore_check_deliver(config, buf, - commit_count, idx); + commit_count, idx, handle); /* * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER - && atomic_long_read(&buf->active_readers) - && lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + && (uatomic_read(&buf->active_readers) + || uatomic_read(&buf->active_shadow_readers)) + && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { + int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); + + if (wakeup_fd >= 0) { + int ret; + /* + * Wake-up the other end by + * writing a null byte in the + * pipe (non-blocking). + * Important note: Because + * writing into the pipe is + * non-blocking (and therefore + * we allow dropping wakeup + * data, as long as there is + * wakeup data present in the + * pipe buffer to wake up the + * consumer), the consumer + * should perform the following + * sequence for waiting: + * 1) empty the pipe (reads). + * 2) check if there is data in + * the buffer. + * 3) wait on the pipe (poll). + */ + do { + ret = write(wakeup_fd, "", 1); + } while (ret == -1L && errno == EINTR); + } } } @@ -390,7 +427,8 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c unsigned long idx, unsigned long buf_offset, unsigned long commit_count, - size_t slot_size) + size_t slot_size, + struct shm_handle *handle) { unsigned long offset, commit_seq_old; @@ -408,17 +446,20 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c if (unlikely(subbuf_offset(offset - commit_count, chan))) return; - commit_seq_old = v_read(config, &buf->commit_hot[idx].seq); + commit_seq_old = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->seq); while ((long) (commit_seq_old - commit_count) < 0) - commit_seq_old = v_cmpxchg(config, &buf->commit_hot[idx].seq, + commit_seq_old = v_cmpxchg(config, &shmp_index(handle, buf->commit_hot, idx)->seq, commit_seq_old, commit_count); } extern int lib_ring_buffer_create(struct lib_ring_buffer *buf, - struct channel_backend *chanb, int cpu); -extern void lib_ring_buffer_free(struct lib_ring_buffer *buf); + struct channel_backend *chanb, int cpu, + struct shm_handle *handle, + struct shm_object *shmobj); +extern void lib_ring_buffer_free(struct lib_ring_buffer *buf, + struct shm_handle *handle); /* Keep track of trap nesting inside ring buffer code */ -DECLARE_PER_CPU(unsigned int, lib_ring_buffer_nesting); +extern __thread unsigned int lib_ring_buffer_nesting; #endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */