X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libust%2Fbuffers.h;h=4017964c7ea7410967ef610e7d6477163d7f608f;hb=8161463975e218e0833d31ab1577a7ceb9e8e9f3;hp=bff3ed54ba2f2eb4c06192c283f6aa3f6e68bd13;hpb=b73a4c471dc987ea8548632dffb3c7050de77dd0;p=ust.git diff --git a/libust/buffers.h b/libust/buffers.h index bff3ed5..4017964 100644 --- a/libust/buffers.h +++ b/libust/buffers.h @@ -1,23 +1,40 @@ /* * buffers.h + * LTTng userspace tracer buffering system * * Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca) * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca) * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef _UST_BUFFERS_H #define _UST_BUFFERS_H -#include #include + +#include +#include + +#include "usterr_signal_safe.h" #include "channels.h" #include "tracerconst.h" #include "tracercore.h" #include "header-inline.h" -#include -/***** SHOULD BE REMOVED ***** */ +/***** FIXME: SHOULD BE REMOVED ***** */ /* * BUFFER_TRUNC zeroes the subbuffer offset and the subbuffer number parts of @@ -42,28 +59,22 @@ /**************************************/ struct commit_counters { - local_t cc; - local_t cc_sb; /* Incremented _once_ at sb switch */ + long cc; /* ATOMIC */ + long cc_sb; /* ATOMIC - Incremented _once_ at sb switch */ }; struct ust_buffer { /* First 32 bytes cache-hot cacheline */ - local_t offset; /* Current offset in the buffer */ + long offset; /* Current offset in the buffer *atomic* */ struct commit_counters *commit_count; /* Commit count per sub-buffer */ - atomic_long_t consumed; /* - * Current offset in the buffer - * standard atomic access (shared) - */ + long consumed; /* Current offset in the buffer *atomic* access (shared) */ unsigned long last_tsc; /* * Last timestamp written in the buffer. */ /* End of first 32 bytes cacheline */ - atomic_long_t active_readers; /* - * Active readers count - * standard atomic access (shared) - */ - local_t events_lost; - local_t corrupted_subbuffers; + long active_readers; /* ATOMIC - Active readers count standard atomic access (shared) */ + long events_lost; /* ATOMIC */ + long corrupted_subbuffers; /* *ATOMIC* */ /* one byte is written to this pipe when data is available, in order to wake the consumer */ /* portability: Single byte writes must be as quick as possible. The kernel-side @@ -72,6 +83,11 @@ struct ust_buffer { int data_ready_fd_write; /* the reading end of the pipe */ int data_ready_fd_read; + /* + * List of buffers with an open pipe, used for fork and forced subbuffer + * switch. + */ + struct cds_list_head open_buffers_list; unsigned int finalized; //ust// struct timer_list switch_timer; /* timer for periodical switch */ @@ -79,14 +95,14 @@ struct ust_buffer { struct ust_channel *chan; - struct kref kref; + struct urcu_ref urcu_ref; void *buf_data; size_t buf_size; int shmid; unsigned int cpu; /* commit count per subbuffer; must be at end of struct */ - local_t commit_seq[0] ____cacheline_aligned; + long commit_seq[0]; /* ATOMIC */ } ____cacheline_aligned; /* @@ -96,10 +112,12 @@ struct ust_buffer { */ enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH }; -extern int ltt_reserve_slot_lockless_slow(struct ust_trace *trace, - struct ust_channel *ltt_channel, void **transport_data, - size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, - unsigned int *rflags, int largest_align, int cpu); +extern int ltt_reserve_slot_lockless_slow(struct ust_channel *chan, + struct ust_trace *trace, size_t data_size, + int largest_align, int cpu, + struct ust_buffer **ret_buf, + size_t *slot_size, long *buf_offset, + u64 *tsc, unsigned int *rflags); extern void ltt_force_switch_lockless_slow(struct ust_buffer *buf, enum force_switch_mode mode); @@ -184,7 +202,7 @@ static __inline__ void ltt_reserve_push_reader( long consumed_old, consumed_new; do { - consumed_old = atomic_long_read(&buf->consumed); + consumed_old = uatomic_read(&buf->consumed); /* * If buffer is in overwrite mode, push the reader consumed * count if the write position has reached it and we are not @@ -202,7 +220,7 @@ static __inline__ void ltt_reserve_push_reader( consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan); else return; - } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old, + } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old, consumed_new) != consumed_old)); } @@ -210,7 +228,7 @@ static __inline__ void ltt_vmcore_check_deliver( struct ust_buffer *buf, long commit_count, long idx) { - local_set(&buf->commit_seq[idx], commit_count); + uatomic_set(&buf->commit_seq[idx], commit_count); } static __inline__ void ltt_check_deliver(struct ust_channel *chan, @@ -230,7 +248,7 @@ static __inline__ void ltt_check_deliver(struct ust_channel *chan, * value without adding a add_return atomic operation to the * fast path. */ - if (likely(local_cmpxchg(&buf->commit_count[idx].cc_sb, + if (likely(uatomic_cmpxchg(&buf->commit_count[idx].cc_sb, old_commit_count, commit_count) == old_commit_count)) { int result; @@ -255,16 +273,16 @@ static __inline__ int ltt_poll_deliver(struct ust_channel *chan, struct ust_buff { long consumed_old, consumed_idx, commit_count, write_offset; - consumed_old = atomic_long_read(&buf->consumed); + consumed_old = uatomic_read(&buf->consumed); consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan); - commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb); + commit_count = uatomic_read(&buf->commit_count[consumed_idx].cc_sb); /* - * No memory barrier here, since we are only interested + * No memory cmm_barrier here, since we are only interested * in a statistically correct polling result. The next poll will * get the data is we are racing. The mb() that ensures correct * memory order is in get_subbuf. */ - write_offset = local_read(&buf->offset); + write_offset = uatomic_read(&buf->offset); /* * Check that the subbuffer we are trying to consume has been @@ -302,7 +320,7 @@ static __inline__ int ltt_relay_try_reserve( long *o_begin, long *o_end, long *o_old, size_t *before_hdr_pad, size_t *size) { - *o_begin = local_read(&buf->offset); + *o_begin = uatomic_read(&buf->offset); *o_old = *o_begin; *tsc = trace_clock_read64(); @@ -343,21 +361,24 @@ static __inline__ int ltt_relay_try_reserve( return 0; } -static __inline__ int ltt_reserve_slot(struct ust_trace *trace, - struct ust_channel *chan, void **transport_data, - size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, - unsigned int *rflags, int largest_align, int cpu) +static __inline__ int ltt_reserve_slot(struct ust_channel *chan, + struct ust_trace *trace, size_t data_size, + int largest_align, int cpu, + struct ust_buffer **ret_buf, + size_t *slot_size, long *buf_offset, u64 *tsc, + unsigned int *rflags) { - struct ust_buffer *buf = chan->buf[cpu]; + struct ust_buffer *buf = *ret_buf = chan->buf[cpu]; long o_begin, o_end, o_old; size_t before_hdr_pad; /* * Perform retryable operations. */ - /* FIXME: make this rellay per cpu? */ - if (unlikely(__get_cpu_var(ltt_nesting) > 4)) { - local_inc(&buf->events_lost); + /* FIXME: make this really per cpu? */ + if (unlikely(CMM_LOAD_SHARED(ltt_nesting) > 4)) { + DBG("Dropping event because nesting is too deep."); + uatomic_inc(&buf->events_lost); return -EPERM; } @@ -367,7 +388,7 @@ static __inline__ int ltt_reserve_slot(struct ust_trace *trace, &before_hdr_pad, slot_size))) goto slow_path; - if (unlikely(local_cmpxchg(&buf->offset, o_old, o_end) != o_old)) + if (unlikely(uatomic_cmpxchg(&buf->offset, o_old, o_end) != o_old)) goto slow_path; /* @@ -391,19 +412,16 @@ static __inline__ int ltt_reserve_slot(struct ust_trace *trace, *buf_offset = o_begin + before_hdr_pad; return 0; slow_path: - return ltt_reserve_slot_lockless_slow(trace, chan, - transport_data, data_size, slot_size, buf_offset, tsc, - rflags, largest_align, cpu); + return ltt_reserve_slot_lockless_slow(chan, trace, data_size, + largest_align, cpu, ret_buf, + slot_size, buf_offset, tsc, + rflags); } /* * Force a sub-buffer switch for a per-cpu buffer. This operation is * completely reentrant : can be called while tracing is active with * absolutely no lock held. - * - * Note, however, that as a local_cmpxchg is used for some atomic - * operations, this function must be called from the CPU which owns the buffer - * for a ACTIVE flush. */ static __inline__ void ltt_force_switch(struct ust_buffer *buf, enum force_switch_mode mode) @@ -413,14 +431,14 @@ static __inline__ void ltt_force_switch(struct ust_buffer *buf, /* * for flight recording. must be called after relay_commit. - * This function decrements de subbuffer's lost_size each time the commit count - * reaches back the reserve offset (module subbuffer size). It is useful for - * crash dump. + * This function increments the subbuffers's commit_seq counter each time the + * commit count reaches back the reserve offset (module subbuffer size). It is + * useful for crash dump. */ -#ifdef CONFIG_LTT_VMCORE -static __inline__ void ltt_write_commit_counter(struct rchan_buf *buf, - struct ltt_channel_buf_struct *ltt_buf, - long idx, long buf_offset, long commit_count, size_t data_size) +//ust// #ifdef CONFIG_LTT_VMCORE +static __inline__ void ltt_write_commit_counter(struct ust_channel *chan, + struct ust_buffer *buf, long idx, long buf_offset, + long commit_count, size_t data_size) { long offset; long commit_seq_old; @@ -436,17 +454,19 @@ static __inline__ void ltt_write_commit_counter(struct rchan_buf *buf, if (unlikely(SUBBUF_OFFSET(offset - commit_count, buf->chan))) return; - commit_seq_old = local_read(<t_buf->commit_seq[idx]); + commit_seq_old = uatomic_read(&buf->commit_seq[idx]); while (commit_seq_old < commit_count) - commit_seq_old = local_cmpxchg(<t_buf->commit_seq[idx], + commit_seq_old = uatomic_cmpxchg(&buf->commit_seq[idx], commit_seq_old, commit_count); + + DBG("commit_seq for channel %s_%d, subbuf %ld is now %ld", buf->chan->channel_name, buf->cpu, idx, commit_count); } -#else -static __inline__ void ltt_write_commit_counter(struct ust_buffer *buf, - long idx, long buf_offset, long commit_count, size_t data_size) -{ -} -#endif +//ust// #else +//ust// static __inline__ void ltt_write_commit_counter(struct ust_buffer *buf, +//ust// long idx, long buf_offset, long commit_count, size_t data_size) +//ust// { +//ust// } +//ust// #endif /* * Atomic unordered slot commit. Increments the commit count in the @@ -469,17 +489,9 @@ static __inline__ void ltt_commit_slot( long endidx = SUBBUF_INDEX(offset_end - 1, chan); long commit_count; -#ifdef LTT_NO_IPI_BARRIER - smp_wmb(); -#else - /* - * Must write slot data before incrementing commit count. - * This compiler barrier is upgraded into a smp_mb() by the IPI - * sent by get_subbuf(). - */ - barrier(); -#endif - local_add(slot_size, &buf->commit_count[endidx].cc); + cmm_smp_wmb(); + + uatomic_add(&buf->commit_count[endidx].cc, slot_size); /* * commit count read can race with concurrent OOO commit count updates. * This is only needed for ltt_check_deliver (for non-polling delivery @@ -489,7 +501,7 @@ static __inline__ void ltt_commit_slot( * - Multiple delivery for the same sub-buffer (which is handled * gracefully by the reader code) if the value is for a full * sub-buffer. It's important that we can never miss a sub-buffer - * delivery. Re-reading the value after the local_add ensures this. + * delivery. Re-reading the value after the uatomic_add ensures this. * - Reading a commit_count with a higher value that what was actually * added to it for the ltt_write_commit_counter call (again caused by * a concurrent committer). It does not matter, because this function @@ -497,36 +509,108 @@ static __inline__ void ltt_commit_slot( * reserve offset for a specific sub-buffer, which is completely * independent of the order. */ - commit_count = local_read(&buf->commit_count[endidx].cc); + commit_count = uatomic_read(&buf->commit_count[endidx].cc); ltt_check_deliver(chan, buf, offset_end - 1, commit_count, endidx); /* - * Update lost_size for each commit. It's needed only for extracting + * Update data_size for each commit. It's needed only for extracting * ltt buffers from vmcore, after crash. */ - ltt_write_commit_counter(buf, endidx, buf_offset, commit_count, data_size); + ltt_write_commit_counter(chan, buf, endidx, buf_offset, commit_count, data_size); } -void _ust_buffers_write(struct ust_buffer *buf, size_t offset, - const void *src, size_t len, ssize_t cpy); +void _ust_buffers_strncpy_fixup(struct ust_buffer *buf, size_t offset, + size_t len, size_t copied, int terminated); static __inline__ int ust_buffers_write(struct ust_buffer *buf, size_t offset, const void *src, size_t len) { - size_t cpy; size_t buf_offset = BUFFER_OFFSET(offset, buf->chan); assert(buf_offset < buf->chan->subbuf_size*buf->chan->subbuf_cnt); + assert(buf_offset + len + <= buf->chan->subbuf_size*buf->chan->subbuf_cnt); + + ust_buffers_do_copy(buf->buf_data + buf_offset, src, len); + + return len; +} + +/* + * ust_buffers_do_memset - write character into dest. + * @dest: destination + * @src: source character + * @len: length to write + */ +static __inline__ +void ust_buffers_do_memset(void *dest, char src, size_t len) +{ + /* + * What we really want here is an __inline__ memset, but we + * don't have constants, so gcc generally uses a function call. + */ + for (; len > 0; len--) + *(u8 *)dest++ = src; +} + +/* + * ust_buffers_do_strncpy - copy a string up to a certain number of bytes + * @dest: destination + * @src: source + * @len: max. length to copy + * @terminated: output string ends with \0 (output) + * + * returns the number of bytes copied. Does not finalize with \0 if len is + * reached. + */ +static __inline__ +size_t ust_buffers_do_strncpy(void *dest, const void *src, size_t len, + int *terminated) +{ + size_t orig_len = len; + + *terminated = 0; + /* + * What we really want here is an __inline__ strncpy, but we + * don't have constants, so gcc generally uses a function call. + */ + for (; len > 0; len--) { + *(u8 *)dest = CMM_LOAD_SHARED(*(const u8 *)src); + /* Check with dest, because src may be modified concurrently */ + if (*(const u8 *)dest == '\0') { + len--; + *terminated = 1; + break; + } + dest++; + src++; + } + return orig_len - len; +} - cpy = min_t(size_t, len, buf->buf_size - buf_offset); - ust_buffers_do_copy(buf->buf_data + buf_offset, src, cpy); +static __inline__ +int ust_buffers_strncpy(struct ust_buffer *buf, size_t offset, const void *src, + size_t len) +{ + size_t buf_offset = BUFFER_OFFSET(offset, buf->chan); + ssize_t copied; + int terminated; - if (unlikely(len != cpy)) - _ust_buffers_write(buf, buf_offset, src, len, cpy); + assert(buf_offset < buf->chan->subbuf_size*buf->chan->subbuf_cnt); + assert(buf_offset + len + <= buf->chan->subbuf_size*buf->chan->subbuf_cnt); + + copied = ust_buffers_do_strncpy(buf->buf_data + buf_offset, + src, len, &terminated); + if (unlikely(copied < len || !terminated)) + _ust_buffers_strncpy_fixup(buf, offset, len, copied, + terminated); return len; } -int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed); -int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old); +extern int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed); +extern int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old); + +extern void init_ustrelay_transport(void); #endif /* _UST_BUFFERS_H */