X-Git-Url: http://git.lttng.org/?p=ust.git;a=blobdiff_plain;f=libust%2Fbuffers.c;h=8ecebb9355060bb752d7f66f9f0f4f0895ac747f;hp=e5a1db3962e542e44be1f43453579328a4dc3bad;hb=HEAD;hpb=ef15e552c145311c21bb89a1f5e9c6c040e1737f diff --git a/libust/buffers.c b/libust/buffers.c index e5a1db3..8ecebb9 100644 --- a/libust/buffers.c +++ b/libust/buffers.c @@ -3,7 +3,7 @@ * LTTng userspace tracer buffering system * * Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca) - * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca) + * Copyright (C) 2008-2011 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -20,6 +20,13 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +/* + * Note: this code does not support the ref/noref flag and reader-owned + * subbuffer scheme. Therefore, flight recorder mode uses a mechanism + * where the reader can read corrupted data (and detect this), thus + * returning -EIO. + */ + #include #include #include @@ -33,7 +40,7 @@ #include "channels.h" #include "tracer.h" #include "tracercore.h" -#include "usterr.h" +#include "usterr_signal_safe.h" struct ltt_reserve_switch_offsets { long begin, end, old; @@ -45,6 +52,9 @@ struct ltt_reserve_switch_offsets { static DEFINE_MUTEX(ust_buffers_channels_mutex); static CDS_LIST_HEAD(ust_buffers_channels); +static void ltt_force_switch(struct ust_buffer *buf, + enum force_switch_mode mode); + static int get_n_cpus(void) { int result; @@ -146,7 +156,13 @@ static void ltt_buffer_begin(struct ust_buffer *buf, header->cycle_count_begin = tsc; header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ - /* FIXME: add memory barrier? */ + /* + * No memory barrier needed to order data_data/sb_size vs commit count + * update, because commit count update contains a compiler barrier that + * ensures the order of the writes are OK from a program POV. It only + * matters for crash dump recovery which is not executed concurrently, + * so memory write order does not matter. + */ ltt_write_trace_header(channel->trace, header); } @@ -254,12 +270,8 @@ unmap_buf: return -1; } -static void ltt_relay_print_buffer_errors(struct ust_channel *chan, int cpu); - static void close_buf(struct ust_buffer *buf) { - struct ust_channel *chan = buf->chan; - int cpu = buf->cpu; int result; result = shmdt(buf->buf_data); @@ -267,8 +279,6 @@ static void close_buf(struct ust_buffer *buf) PERROR("shmdt"); } - free(buf->commit_count); - result = close(buf->data_ready_fd_read); if (result < 0) { PERROR("close"); @@ -278,9 +288,6 @@ static void close_buf(struct ust_buffer *buf) if (result < 0 && errno != EBADF) { PERROR("close"); } - - /* FIXME: This spews out errors, are they real?: - * ltt_relay_print_buffer_errors(chan, cpu); */ } @@ -319,13 +326,10 @@ static int open_channel(struct ust_channel *chan, size_t subbuf_size, return 0; - /* Jump directly inside the loop to close the buffers that were already - * opened. */ - for(; i>=0; i--) { - close_buf(chan->buf[i]); + /* Error handling */ error: - do {} while(0); - } + for(i--; i >= 0; i--) + close_buf(chan->buf[i]); pthread_mutex_unlock(&ust_buffers_channels_mutex); return -1; @@ -338,10 +342,14 @@ static void close_channel(struct ust_channel *chan) return; pthread_mutex_lock(&ust_buffers_channels_mutex); - for(i=0; in_cpus; i++) { - /* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't - * initialize to NULL so we cannot use this check. Should we? */ -//ust// if (chan->buf[i]) + /* + * checking for chan->buf[i] being NULL or not is useless in + * practice because we allocate buffers for all possible cpus. + * However, should we decide to change this and only allocate + * for online cpus, this check becomes useful. + */ + for (i=0; in_cpus; i++) { + if (chan->buf[i]) close_buf(chan->buf[i]); } @@ -350,11 +358,6 @@ static void close_channel(struct ust_channel *chan) pthread_mutex_unlock(&ust_buffers_channels_mutex); } -static void ltt_force_switch(struct ust_buffer *buf, - enum force_switch_mode mode); - - - /* * offset is assumed to never be 0 here : never deliver a completely empty * subbuffer. The lost size is between 0 and subbuf_size-1. @@ -368,7 +371,6 @@ static notrace void ltt_buffer_end(struct ust_buffer *buf, subbuf_idx * buf->chan->subbuf_size); u32 data_size = SUBBUF_OFFSET(offset - 1, buf->chan) + 1; - header->data_size = data_size; header->sb_size = PAGE_ALIGN(data_size); header->cycle_count_end = tsc; header->events_lost = uatomic_read(&buf->events_lost); @@ -376,6 +378,13 @@ static notrace void ltt_buffer_end(struct ust_buffer *buf, if(unlikely(header->events_lost > 0)) { DBG("Some events (%d) were lost in %s_%d", header->events_lost, buf->chan->channel_name, buf->cpu); } + /* + * Makes sure data_size write happens after write of the rest of the + * buffer end data, because data_size is used to identify a completely + * written subbuffer in a crash dump. + */ + cmm_barrier(); + header->data_size = data_size; } /* @@ -412,51 +421,13 @@ int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed) * data and the write offset. Correct consumed offset ordering * wrt commit count is insured by the use of cmpxchg to update * the consumed offset. - * smp_call_function_single can fail if the remote CPU is offline, - * this is OK because then there is no wmb to execute there. - * If our thread is executing on the same CPU as the on the buffers - * belongs to, we don't have to synchronize it at all. If we are - * migrated, the scheduler will take care of the memory cmm_barriers. - * Normally, smp_call_function_single() should ensure program order when - * executing the remote function, which implies that it surrounds the - * function execution with : - * smp_mb() - * send IPI - * csd_lock_wait - * recv IPI - * smp_mb() - * exec. function - * smp_mb() - * csd unlock - * smp_mb() - * - * However, smp_call_function_single() does not seem to clearly execute - * such barriers. It depends on spinlock semantic to provide the barrier - * before executing the IPI and, when busy-looping, csd_lock_wait only - * executes smp_mb() when it has to wait for the other CPU. - * - * I don't trust this code. Therefore, let's add the smp_mb() sequence - * required ourself, even if duplicated. It has no performance impact - * anyway. - * - * smp_mb() is needed because cmm_smp_rmb() and cmm_smp_wmb() only order read vs - * read and write vs write. They do not ensure core synchronization. We - * really have to ensure total order between the 3 cmm_barriers running on - * the 2 CPUs. */ -//ust// #ifdef LTT_NO_IPI_BARRIER + /* * Local rmb to match the remote wmb to read the commit count before the * buffer data and the write offset. */ cmm_smp_rmb(); -//ust// #else -//ust// if (raw_smp_processor_id() != buf->cpu) { -//ust// smp_mb(); /* Total order with IPI handler smp_mb() */ -//ust// smp_call_function_single(buf->cpu, remote_mb, NULL, 1); -//ust// smp_mb(); /* Total order with IPI handler smp_mb() */ -//ust// } -//ust// #endif write_offset = uatomic_read(&buf->offset); /* @@ -479,12 +450,6 @@ int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed) == 0) { return -EAGAIN; } - - /* FIXME: is this ok to disable the reading feature? */ -//ust// retval = update_read_sb_index(buf, consumed_idx); -//ust// if (retval) -//ust// return retval; - *consumed = consumed_old; return 0; @@ -499,14 +464,12 @@ int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old) consumed_old = consumed_old | uconsumed_old; consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan); -//ust// spin_lock(<t_buf->full_lock); if (uatomic_cmpxchg(&buf->consumed, consumed_old, consumed_new) != consumed_old) { /* We have been pushed by the writer : the last * buffer read _is_ corrupted! It can also * happen if this is a buffer we never got. */ -//ust// spin_unlock(<t_buf->full_lock); return -EIO; } else { /* tell the client that buffer is now unfull */ @@ -515,83 +478,10 @@ int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old) index = SUBBUF_INDEX(consumed_old, buf->chan); data = BUFFER_OFFSET(consumed_old, buf->chan); ltt_buf_unfull(buf, index, data); -//ust// spin_unlock(<t_buf->full_lock); } return 0; } -static void ltt_relay_print_subbuffer_errors( - struct ust_channel *channel, - long cons_off, int cpu) -{ - struct ust_buffer *ltt_buf = channel->buf[cpu]; - long cons_idx, commit_count, commit_count_sb, write_offset; - - cons_idx = SUBBUF_INDEX(cons_off, channel); - commit_count = uatomic_read(<t_buf->commit_count[cons_idx].cc); - commit_count_sb = uatomic_read(<t_buf->commit_count[cons_idx].cc_sb); - - /* - * No need to order commit_count and write_offset reads because we - * execute after trace is stopped when there are no readers left. - */ - write_offset = uatomic_read(<t_buf->offset); - WARN( "LTT : unread channel %s offset is %ld " - "and cons_off : %ld (cpu %d)\n", - channel->channel_name, write_offset, cons_off, cpu); - /* Check each sub-buffer for non filled commit count */ - if (((commit_count - channel->subbuf_size) & channel->commit_count_mask) - - (BUFFER_TRUNC(cons_off, channel) >> channel->n_subbufs_order) != 0) { - ERR("LTT : %s : subbuffer %lu has non filled " - "commit count [cc, cc_sb] [%lu,%lu].\n", - channel->channel_name, cons_idx, commit_count, commit_count_sb); - } - ERR("LTT : %s : commit count : %lu, subbuf size %zd\n", - channel->channel_name, commit_count, - channel->subbuf_size); -} - -static void ltt_relay_print_errors(struct ust_trace *trace, - struct ust_channel *channel, int cpu) -{ - struct ust_buffer *ltt_buf = channel->buf[cpu]; - long cons_off; - - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!channel) - return; - -//ust// for (cons_off = 0; cons_off < rchan->alloc_size; -//ust// cons_off = SUBBUF_ALIGN(cons_off, rchan)) -//ust// ust_buffers_print_written(ltt_chan, cons_off, cpu); - for (cons_off = uatomic_read(<t_buf->consumed); - (SUBBUF_TRUNC(uatomic_read(<t_buf->offset), - channel) - - cons_off) > 0; - cons_off = SUBBUF_ALIGN(cons_off, channel)) - ltt_relay_print_subbuffer_errors(channel, cons_off, cpu); -} - -static void ltt_relay_print_buffer_errors(struct ust_channel *channel, int cpu) -{ - struct ust_trace *trace = channel->trace; - struct ust_buffer *ltt_buf = channel->buf[cpu]; - - if (uatomic_read(<t_buf->events_lost)) - ERR("channel %s: %ld events lost (cpu %d)", - channel->channel_name, - uatomic_read(<t_buf->events_lost), cpu); - if (uatomic_read(<t_buf->corrupted_subbuffers)) - ERR("channel %s : %ld corrupted subbuffers (cpu %d)", - channel->channel_name, - uatomic_read(<t_buf->corrupted_subbuffers), cpu); - - ltt_relay_print_errors(trace, channel, cpu); -} - static int map_buf_structs(struct ust_channel *chan) { void *ptr; @@ -655,6 +545,7 @@ static int unmap_buf_structs(struct ust_channel *chan) PERROR("shmdt"); } } + return 0; } /* @@ -723,29 +614,14 @@ static void remove_channel(struct ust_channel *chan) free(chan->buf_struct_shmids); free(chan->buf); - } static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel) { -//ust// unsigned int i; -//ust// struct rchan *rchan = ltt_channel->trans_channel_data; -//ust// -//ust// for_each_possible_cpu(i) { -//ust// struct ltt_channel_buf_struct *ltt_buf = -//ust// percpu_ptr(ltt_channel->buf, i); -//ust// -//ust// if (uatomic_read(<t_buf->wakeup_readers) == 1) { -//ust// uatomic_set(<t_buf->wakeup_readers, 0); -//ust// wake_up_interruptible(&rchan->buf[i]->read_wait); -//ust// } -//ust// } } static void ltt_relay_finish_buffer(struct ust_channel *channel, unsigned int cpu) { -// int result; - if (channel->buf[cpu]) { struct ust_buffer *buf = channel->buf[cpu]; ltt_force_switch(buf, FORCE_FLUSH); @@ -760,7 +636,7 @@ static void finish_channel(struct ust_channel *channel) { unsigned int i; - for(i=0; in_cpus; i++) { + for (i=0; in_cpus; i++) { ltt_relay_finish_buffer(channel, i); } } @@ -986,14 +862,12 @@ void ltt_force_switch_lockless_slow(struct ust_buffer *buf, */ if (mode == FORCE_ACTIVE) { ltt_reserve_push_reader(chan, buf, offsets.end - 1); -//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.end - 1, chan)); } /* * Switch old subbuffer if needed. */ if (offsets.end_switch_old) { -//ust// ltt_clear_noref_flag(rchan, buf, SUBBUF_INDEX(offsets.old - 1, rchan)); ltt_reserve_switch_old_subbuf(chan, buf, &offsets, &tsc); } @@ -1171,16 +1045,10 @@ int ltt_reserve_slot_lockless_slow(struct ust_channel *chan, */ ltt_reserve_push_reader(chan, buf, offsets.end - 1); - /* - * Clear noref flag for this subbuffer. - */ -//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.end - 1, chan)); - /* * Switch old subbuffer if needed. */ if (unlikely(offsets.end_switch_old)) { -//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.old - 1, chan)); ltt_reserve_switch_old_subbuf(chan, buf, &offsets, tsc); DBG("Switching %s_%d", chan->channel_name, cpu); } @@ -1242,6 +1110,10 @@ size_t ltt_write_event_header_slow(struct ust_channel *channel, case LTT_RFLAG_ID: header.id_time = 31 << LTT_TSC_BITS; break; + default: + WARN_ON_ONCE(1); + header.id_time = 0; + break; } header.id_time |= (u32)tsc & LTT_TSC_MASK;