X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Ffrontend_internal.h;h=1fb08dfdec90d3f25e5f7ce85f39c010386badf4;hb=49deb55988d873a8b8084ca75644b7d61c7a047c;hp=6a1d3a6cb523fd36cf9e69c7d28fb1c4852872e9;hpb=a6352fd40a2090fd883a6c369144bf405c9e9ec4;p=ust.git diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index 6a1d3a6..1fb08df 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -18,7 +18,7 @@ #include -#include "config.h" +#include #include "backend_types.h" #include "frontend_types.h" #include "shm.h" @@ -144,7 +144,8 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx); extern void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, - enum switch_mode mode); + enum switch_mode mode, + struct shm_handle *handle); /* Buffer write helpers */ @@ -180,22 +181,24 @@ static inline void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) - v_set(config, &shmp(buf->commit_hot)[idx].seq, commit_count); + v_set(config, &shmp_index(handle, buf->commit_hot, idx)->seq, commit_count); } static inline int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - struct channel *chan) + struct channel *chan, + struct shm_handle *handle) { unsigned long consumed_old, consumed_idx, commit_count, write_offset; consumed_old = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed_old, chan); - commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb); /* * No memory barrier here, since we are only interested * in a statistically correct polling result. The next poll will @@ -239,9 +242,10 @@ int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config, static inline unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { - return subbuffer_get_data_size(config, &buf->backend, idx); + return subbuffer_get_data_size(config, &buf->backend, idx, handle); } /* @@ -252,7 +256,8 @@ unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config static inline int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - struct channel *chan) + struct channel *chan, + struct shm_handle *handle) { unsigned long offset, idx, commit_count; @@ -270,7 +275,7 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi do { offset = v_read(config, &buf->offset); idx = subbuf_index(offset, chan); - commit_count = v_read(config, &shmp(buf->commit_hot)[idx].cc); + commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->cc); } while (offset != v_read(config, &buf->offset)); return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) @@ -283,7 +288,8 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, struct channel *chan, unsigned long offset, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -318,7 +324,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ - if (likely(v_cmpxchg(config, &shmp(buf->commit_cold)[idx].cc_sb, + if (likely(v_cmpxchg(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { /* @@ -330,17 +336,20 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, tsc = config->cb.ring_buffer_clock_read(chan); v_add(config, subbuffer_get_records_count(config, - &buf->backend, idx), + &buf->backend, + idx, handle), &buf->records_count); v_add(config, subbuffer_count_records_overrun(config, &buf->backend, - idx), + idx, handle), &buf->records_overrun); config->cb.buffer_end(buf, tsc, idx, lib_ring_buffer_get_data_size(config, buf, - idx)); + idx, + handle), + handle); /* * Set noref flag and offset for this subbuffer id. @@ -348,7 +357,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * are ordered before set noref and offset. */ lib_ring_buffer_set_noref_offset(config, &buf->backend, idx, - buf_trunc_val(offset, chan)); + buf_trunc_val(offset, chan), handle); /* * Order set_noref and record counter updates before the @@ -358,19 +367,44 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, */ cmm_smp_mb(); /* End of exclusive subbuffer access */ - v_set(config, &shmp(buf->commit_cold)[idx].cc_sb, + v_set(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, commit_count); lib_ring_buffer_vmcore_check_deliver(config, buf, - commit_count, idx); + commit_count, idx, handle); /* * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER && uatomic_read(&buf->active_readers) - && lib_ring_buffer_poll_deliver(config, buf, chan)) { - //wake_up_interruptible(&buf->read_wait); - //wake_up_interruptible(&chan->read_wait); + && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { + int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); + + if (wakeup_fd >= 0) { + int ret; + /* + * Wake-up the other end by + * writing a null byte in the + * pipe (non-blocking). + * Important note: Because + * writing into the pipe is + * non-blocking (and therefore + * we allow dropping wakeup + * data, as long as there is + * wakeup data present in the + * pipe buffer to wake up the + * consumer), the consumer + * should perform the following + * sequence for waiting: + * 1) empty the pipe (reads). + * 2) check if there is data in + * the buffer. + * 3) wait on the pipe (poll). + */ + do { + ret = write(wakeup_fd, "", 1); + } while (ret == -1L && errno == EINTR); + } } } @@ -392,7 +426,8 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c unsigned long idx, unsigned long buf_offset, unsigned long commit_count, - size_t slot_size) + size_t slot_size, + struct shm_handle *handle) { unsigned long offset, commit_seq_old; @@ -410,16 +445,18 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c if (unlikely(subbuf_offset(offset - commit_count, chan))) return; - commit_seq_old = v_read(config, &shmp(buf->commit_hot)[idx].seq); + commit_seq_old = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->seq); while ((long) (commit_seq_old - commit_count) < 0) - commit_seq_old = v_cmpxchg(config, &shmp(buf->commit_hot)[idx].seq, + commit_seq_old = v_cmpxchg(config, &shmp_index(handle, buf->commit_hot, idx)->seq, commit_seq_old, commit_count); } extern int lib_ring_buffer_create(struct lib_ring_buffer *buf, struct channel_backend *chanb, int cpu, - struct shm_header *shm_header); -extern void lib_ring_buffer_free(struct lib_ring_buffer *buf); + struct shm_handle *handle, + struct shm_object *shmobj); +extern void lib_ring_buffer_free(struct lib_ring_buffer *buf, + struct shm_handle *handle); /* Keep track of trap nesting inside ring buffer code */ extern __thread unsigned int lib_ring_buffer_nesting;