X-Git-Url: https://git.lttng.org/?p=ust.git;a=blobdiff_plain;f=libust%2Fbuffers.h;h=a2f17a9f78d01e83f3e1ee2a5d4e7eb40e92f081;hp=c6b13d076ecbc26d445efec8c5cbd96775cc732b;hb=e5bc3b0f4d6c0407492ebdea863483925393e1bc;hpb=204141ee9da22a244c9095287f4f1c513784b171 diff --git a/libust/buffers.h b/libust/buffers.h index c6b13d0..a2f17a9 100644 --- a/libust/buffers.h +++ b/libust/buffers.h @@ -12,10 +12,12 @@ #include #include #include "channels.h" -#include "buffers.h" +#include "tracerconst.h" +#include "tracercore.h" +#include "header-inline.h" +#include -/* Return the size of the minimum number of pages that can contain x. */ -#define FIX_SIZE(x) ((((x) - 1) & PAGE_MASK) + PAGE_SIZE) +/***** SHOULD BE REMOVED ***** */ /* * BUFFER_TRUNC zeroes the subbuffer offset and the subbuffer number parts of @@ -37,10 +39,17 @@ */ #define UST_CHANNEL_VERSION 8 +/**************************************/ + +struct commit_counters { + local_t cc; + local_t cc_sb; /* Incremented _once_ at sb switch */ +}; + struct ust_buffer { /* First 32 bytes cache-hot cacheline */ local_t offset; /* Current offset in the buffer */ - local_t *commit_count; /* Commit count per sub-buffer */ + struct commit_counters *commit_count; /* Commit count per sub-buffer */ atomic_long_t consumed; /* * Current offset in the buffer * standard atomic access (shared) @@ -64,7 +73,12 @@ struct ust_buffer { /* the reading end of the pipe */ int data_ready_fd_read; + unsigned int finalized; +//ust// struct timer_list switch_timer; /* timer for periodical switch */ + unsigned long switch_timer_interval; /* 0 = unset */ + struct ust_channel *chan; + struct kref kref; void *buf_data; size_t buf_size; @@ -75,19 +89,22 @@ struct ust_buffer { local_t commit_seq[0] ____cacheline_aligned; } ____cacheline_aligned; -extern void _ust_buffers_write(struct ust_buffer *buf, size_t offset, - const void *src, size_t len, ssize_t cpy); - /* - * Return the address where a given offset is located. - * Should be used to get the current subbuffer header pointer. Given we know - * it's never on a page boundary, it's safe to write directly to this address, - * as long as the write is never bigger than a page size. + * A switch is done during tracing or as a final flush after tracing (so it + * won't write in the new sub-buffer). + * FIXME: make this message clearer */ -extern void *ust_buffers_offset_address(struct ust_buffer *buf, - size_t offset); +enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH }; + +extern int ltt_reserve_slot_lockless_slow(struct ust_trace *trace, + struct ust_channel *ltt_channel, void **transport_data, + size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, + unsigned int *rflags, int largest_align, int cpu); + +extern void ltt_force_switch_lockless_slow(struct ust_buffer *buf, + enum force_switch_mode mode); + -/* FIXME: lttng has a version for systems with inefficient unaligned access */ static __inline__ void ust_buffers_do_copy(void *dest, const void *src, size_t len) { union { @@ -99,23 +116,405 @@ static __inline__ void ust_buffers_do_copy(void *dest, const void *src, size_t l } u = { .src = src }; switch (len) { - case 0: break; - case 1: *(u8 *)dest = *u.src8; + case 0: break; + case 1: *(u8 *)dest = *u.src8; break; - case 2: *(u16 *)dest = *u.src16; + case 2: *(u16 *)dest = *u.src16; break; - case 4: *(u32 *)dest = *u.src32; + case 4: *(u32 *)dest = *u.src32; break; - case 8: *(u64 *)dest = *u.src64; + case 8: *(u64 *)dest = *u.src64; break; default: memcpy(dest, src, len); } } -/* FIXME: there is both a static inline and a '_' non static inline version ?? */ +static __inline__ void *ust_buffers_offset_address(struct ust_buffer *buf, size_t offset) +{ + return ((char *)buf->buf_data)+offset; +} + +/* + * Last TSC comparison functions. Check if the current TSC overflows + * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc + * atomically. + */ + +/* FIXME: does this test work properly? */ +#if (BITS_PER_LONG == 32) +static __inline__ void save_last_tsc(struct ust_buffer *ltt_buf, + u64 tsc) +{ + ltt_buf->last_tsc = (unsigned long)(tsc >> LTT_TSC_BITS); +} + +static __inline__ int last_tsc_overflow(struct ust_buffer *ltt_buf, + u64 tsc) +{ + unsigned long tsc_shifted = (unsigned long)(tsc >> LTT_TSC_BITS); + + if (unlikely((tsc_shifted - ltt_buf->last_tsc))) + return 1; + else + return 0; +} +#else +static __inline__ void save_last_tsc(struct ust_buffer *ltt_buf, + u64 tsc) +{ + ltt_buf->last_tsc = (unsigned long)tsc; +} + +static __inline__ int last_tsc_overflow(struct ust_buffer *ltt_buf, + u64 tsc) +{ + if (unlikely((tsc - ltt_buf->last_tsc) >> LTT_TSC_BITS)) + return 1; + else + return 0; +} +#endif + +static __inline__ void ltt_reserve_push_reader( + struct ust_channel *rchan, + struct ust_buffer *buf, + long offset) +{ + long consumed_old, consumed_new; + + do { + consumed_old = atomic_long_read(&buf->consumed); + /* + * If buffer is in overwrite mode, push the reader consumed + * count if the write position has reached it and we are not + * at the first iteration (don't push the reader farther than + * the writer). This operation can be done concurrently by many + * writers in the same buffer, the writer being at the farthest + * write position sub-buffer index in the buffer being the one + * which will win this loop. + * If the buffer is not in overwrite mode, pushing the reader + * only happens if a sub-buffer is corrupted. + */ + if (unlikely((SUBBUF_TRUNC(offset, buf->chan) + - SUBBUF_TRUNC(consumed_old, buf->chan)) + >= rchan->alloc_size)) + consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan); + else + return; + } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old, + consumed_new) != consumed_old)); +} + +static __inline__ void ltt_vmcore_check_deliver( + struct ust_buffer *buf, + long commit_count, long idx) +{ + local_set(&buf->commit_seq[idx], commit_count); +} + +static __inline__ void ltt_check_deliver(struct ust_channel *chan, + struct ust_buffer *buf, + long offset, long commit_count, long idx) +{ + long old_commit_count = commit_count - chan->subbuf_size; + + /* Check if all commits have been done */ + if (unlikely((BUFFER_TRUNC(offset, chan) + >> chan->n_subbufs_order) + - (old_commit_count + & chan->commit_count_mask) == 0)) { + /* + * If we succeeded in updating the cc_sb, we are delivering + * the subbuffer. Deals with concurrent updates of the "cc" + * value without adding a add_return atomic operation to the + * fast path. + */ + if (likely(local_cmpxchg(&buf->commit_count[idx].cc_sb, + old_commit_count, commit_count) + == old_commit_count)) { + int result; + + /* + * Set noref flag for this subbuffer. + */ +//ust// ltt_set_noref_flag(rchan, buf, idx); + ltt_vmcore_check_deliver(buf, commit_count, idx); + + /* wakeup consumer */ + result = write(buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } + } + } +} + +static __inline__ int ltt_poll_deliver(struct ust_channel *chan, struct ust_buffer *buf) +{ + long consumed_old, consumed_idx, commit_count, write_offset; + + consumed_old = atomic_long_read(&buf->consumed); + consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan); + commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb); + /* + * No memory barrier here, since we are only interested + * in a statistically correct polling result. The next poll will + * get the data is we are racing. The mb() that ensures correct + * memory order is in get_subbuf. + */ + write_offset = local_read(&buf->offset); + + /* + * Check that the subbuffer we are trying to consume has been + * already fully committed. + */ + + if (((commit_count - chan->subbuf_size) + & chan->commit_count_mask) + - (BUFFER_TRUNC(consumed_old, buf->chan) + >> chan->n_subbufs_order) + != 0) + return 0; + + /* + * Check that we are not about to read the same subbuffer in + * which the writer head is. + */ + if ((SUBBUF_TRUNC(write_offset, buf->chan) + - SUBBUF_TRUNC(consumed_old, buf->chan)) + == 0) + return 0; + + return 1; + +} + +/* + * returns 0 if reserve ok, or 1 if the slow path must be taken. + */ +static __inline__ int ltt_relay_try_reserve( + struct ust_channel *chan, + struct ust_buffer *buf, + size_t data_size, + u64 *tsc, unsigned int *rflags, int largest_align, + long *o_begin, long *o_end, long *o_old, + size_t *before_hdr_pad, size_t *size) +{ + *o_begin = local_read(&buf->offset); + *o_old = *o_begin; + + *tsc = trace_clock_read64(); + +//ust// #ifdef CONFIG_LTT_VMCORE +//ust// prefetch(&buf->commit_count[SUBBUF_INDEX(*o_begin, rchan)]); +//ust// prefetch(&buf->commit_seq[SUBBUF_INDEX(*o_begin, rchan)]); +//ust// #else +//ust// prefetchw(&buf->commit_count[SUBBUF_INDEX(*o_begin, rchan)]); +//ust// #endif + if (last_tsc_overflow(buf, *tsc)) + *rflags = LTT_RFLAG_ID_SIZE_TSC; + + if (unlikely(SUBBUF_OFFSET(*o_begin, buf->chan) == 0)) + return 1; + + *size = ust_get_header_size(chan, + *o_begin, data_size, + before_hdr_pad, *rflags); + *size += ltt_align(*o_begin + *size, largest_align) + data_size; + if (unlikely((SUBBUF_OFFSET(*o_begin, buf->chan) + *size) + > buf->chan->subbuf_size)) + return 1; + + /* + * Event fits in the current buffer and we are not on a switch + * boundary. It's safe to write. + */ + *o_end = *o_begin + *size; + + if (unlikely((SUBBUF_OFFSET(*o_end, buf->chan)) == 0)) + /* + * The offset_end will fall at the very beginning of the next + * subbuffer. + */ + return 1; + + return 0; +} + +static __inline__ int ltt_reserve_slot(struct ust_trace *trace, + struct ust_channel *chan, void **transport_data, + size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, + unsigned int *rflags, int largest_align, int cpu) +{ + struct ust_buffer *buf = chan->buf[cpu]; + long o_begin, o_end, o_old; + size_t before_hdr_pad; + + /* + * Perform retryable operations. + */ + /* FIXME: make this rellay per cpu? */ + if (unlikely(LOAD_SHARED(ltt_nesting) > 4)) { + DBG("Dropping event because nesting is too deep."); + local_inc(&buf->events_lost); + return -EPERM; + } + + if (unlikely(ltt_relay_try_reserve(chan, buf, + data_size, tsc, rflags, + largest_align, &o_begin, &o_end, &o_old, + &before_hdr_pad, slot_size))) + goto slow_path; + + if (unlikely(local_cmpxchg(&buf->offset, o_old, o_end) != o_old)) + goto slow_path; + + /* + * Atomically update last_tsc. This update races against concurrent + * atomic updates, but the race will always cause supplementary full TSC + * events, never the opposite (missing a full TSC event when it would be + * needed). + */ + save_last_tsc(buf, *tsc); + + /* + * Push the reader if necessary + */ + ltt_reserve_push_reader(chan, buf, o_end - 1); + + /* + * Clear noref flag for this subbuffer. + */ +//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(o_end - 1, chan)); + + *buf_offset = o_begin + before_hdr_pad; + return 0; +slow_path: + return ltt_reserve_slot_lockless_slow(trace, chan, + transport_data, data_size, slot_size, buf_offset, tsc, + rflags, largest_align, cpu); +} + +/* + * Force a sub-buffer switch for a per-cpu buffer. This operation is + * completely reentrant : can be called while tracing is active with + * absolutely no lock held. + * + * Note, however, that as a local_cmpxchg is used for some atomic + * operations, this function must be called from the CPU which owns the buffer + * for a ACTIVE flush. + */ +static __inline__ void ltt_force_switch(struct ust_buffer *buf, + enum force_switch_mode mode) +{ + return ltt_force_switch_lockless_slow(buf, mode); +} + +/* + * for flight recording. must be called after relay_commit. + * This function increments the subbuffers's commit_seq counter each time the + * commit count reaches back the reserve offset (module subbuffer size). It is + * useful for crash dump. + */ +//ust// #ifdef CONFIG_LTT_VMCORE +static __inline__ void ltt_write_commit_counter(struct ust_channel *chan, + struct ust_buffer *buf, long idx, long buf_offset, + long commit_count, size_t data_size) +{ + long offset; + long commit_seq_old; + + offset = buf_offset + data_size; + + /* + * SUBBUF_OFFSET includes commit_count_mask. We can simply + * compare the offsets within the subbuffer without caring about + * buffer full/empty mismatch because offset is never zero here + * (subbuffer header and event headers have non-zero length). + */ + if (unlikely(SUBBUF_OFFSET(offset - commit_count, buf->chan))) + return; + + commit_seq_old = local_read(&buf->commit_seq[idx]); + while (commit_seq_old < commit_count) + commit_seq_old = local_cmpxchg(&buf->commit_seq[idx], + commit_seq_old, commit_count); + + DBG("commit_seq for channel %s_%d, subbuf %ld is now %ld", buf->chan->channel_name, buf->cpu, idx, commit_count); +} +//ust// #else +//ust// static __inline__ void ltt_write_commit_counter(struct ust_buffer *buf, +//ust// long idx, long buf_offset, long commit_count, size_t data_size) +//ust// { +//ust// } +//ust// #endif + +/* + * Atomic unordered slot commit. Increments the commit count in the + * specified sub-buffer, and delivers it if necessary. + * + * Parameters: + * + * @ltt_channel : channel structure + * @transport_data: transport-specific data + * @buf_offset : offset following the event header. + * @data_size : size of the event data. + * @slot_size : size of the reserved slot. + */ +static __inline__ void ltt_commit_slot( + struct ust_channel *chan, + struct ust_buffer *buf, long buf_offset, + size_t data_size, size_t slot_size) +{ + long offset_end = buf_offset; + long endidx = SUBBUF_INDEX(offset_end - 1, chan); + long commit_count; + +#ifdef LTT_NO_IPI_BARRIER + smp_wmb(); +#else + /* + * Must write slot data before incrementing commit count. + * This compiler barrier is upgraded into a smp_mb() by the IPI + * sent by get_subbuf(). + */ + barrier(); +#endif + local_add(slot_size, &buf->commit_count[endidx].cc); + /* + * commit count read can race with concurrent OOO commit count updates. + * This is only needed for ltt_check_deliver (for non-polling delivery + * only) and for ltt_write_commit_counter. The race can only cause the + * counter to be read with the same value more than once, which could + * cause : + * - Multiple delivery for the same sub-buffer (which is handled + * gracefully by the reader code) if the value is for a full + * sub-buffer. It's important that we can never miss a sub-buffer + * delivery. Re-reading the value after the local_add ensures this. + * - Reading a commit_count with a higher value that what was actually + * added to it for the ltt_write_commit_counter call (again caused by + * a concurrent committer). It does not matter, because this function + * is interested in the fact that the commit count reaches back the + * reserve offset for a specific sub-buffer, which is completely + * independent of the order. + */ + commit_count = local_read(&buf->commit_count[endidx].cc); + + ltt_check_deliver(chan, buf, offset_end - 1, commit_count, endidx); + /* + * Update data_size for each commit. It's needed only for extracting + * ltt buffers from vmcore, after crash. + */ + ltt_write_commit_counter(chan, buf, endidx, buf_offset, commit_count, data_size); +} + +void _ust_buffers_write(struct ust_buffer *buf, size_t offset, + const void *src, size_t len, ssize_t cpy); + static __inline__ int ust_buffers_write(struct ust_buffer *buf, size_t offset, - const void *src, size_t len) + const void *src, size_t len) { size_t cpy; size_t buf_offset = BUFFER_OFFSET(offset, buf->chan); @@ -124,24 +523,13 @@ static __inline__ int ust_buffers_write(struct ust_buffer *buf, size_t offset, cpy = min_t(size_t, len, buf->buf_size - buf_offset); ust_buffers_do_copy(buf->buf_data + buf_offset, src, cpy); - + if (unlikely(len != cpy)) _ust_buffers_write(buf, buf_offset, src, len, cpy); return len; } -int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t n_subbufs); -extern void ust_buffers_channel_close(struct ust_channel *chan); - -extern int ust_buffers_do_get_subbuf(struct ust_buffer *buf, long *pconsumed_old); - -extern int ust_buffers_do_put_subbuf(struct ust_buffer *buf, u32 uconsumed_old); - -extern void init_ustrelay_transport(void); - -/*static*/ /* inline */ notrace void ltt_commit_slot( - struct ust_channel *channel, - void **transport_data, long buf_offset, - size_t data_size, size_t slot_size); +int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed); +int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old); #endif /* _UST_BUFFERS_H */