X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=libustconsumer%2Flowlevel.c;h=a54a8db0c85e8fec893ca0bae4a2c195b7185cc6;hb=30ffe2794fc413035208cdd2a7a061bc208e210f;hp=730dd110f9d0eb0760eedc76c4cec84dc50edaf6;hpb=fbae86d664c12e450d3cb702b602701d37781b41;p=ust.git diff --git a/libustconsumer/lowlevel.c b/libustconsumer/lowlevel.c index 730dd11..a54a8db 100644 --- a/libustconsumer/lowlevel.c +++ b/libustconsumer/lowlevel.c @@ -22,7 +22,7 @@ #include "ust/ustconsumer.h" #include "buffers.h" #include "tracer.h" -#include "usterr.h" +#include "usterr_signal_safe.h" /* This truncates to an offset in the buffer. */ #define USTD_BUFFER_TRUNC(offset, bufinfo) \ @@ -31,6 +31,66 @@ #define LTT_MAGIC_NUMBER 0x00D6B7ED #define LTT_REV_MAGIC_NUMBER 0xEDB7D600 + +static void ltt_relay_print_subbuffer_errors( + struct buffer_info *buf, + long cons_off, int cpu) +{ + struct ust_buffer *ust_buf = buf->bufstruct_mem; + long cons_idx, commit_count, commit_count_mask, write_offset; + + cons_idx = SUBBUF_INDEX(cons_off, buf); + commit_count = uatomic_read(&ust_buf->commit_seq[cons_idx]); + commit_count_mask = (~0UL >> get_count_order(buf->n_subbufs)); + + /* + * No need to order commit_count and write_offset reads because we + * execute after trace is stopped when there are no readers left. + */ + write_offset = uatomic_read(&ust_buf->offset); + WARN( "LTT : unread channel %s offset is %ld " + "and cons_off : %ld (cpu %d)\n", + buf->channel, write_offset, cons_off, cpu); + /* Check each sub-buffer for non filled commit count */ + if (((commit_count - buf->subbuf_size) & commit_count_mask) + - (BUFFER_TRUNC(cons_off, buf) >> get_count_order(buf->n_subbufs)) != 0) { + ERR("LTT : %s : subbuffer %lu has non filled " + "commit count [seq] [%lu].\n", + buf->channel, cons_idx, commit_count); + } + ERR("LTT : %s : commit count : %lu, subbuf size %d\n", + buf->channel, commit_count, + buf->subbuf_size); +} + +static void ltt_relay_print_errors(struct buffer_info *buf, int cpu) +{ + struct ust_buffer *ust_buf = buf->bufstruct_mem; + long cons_off; + + for (cons_off = uatomic_read(&ust_buf->consumed); + (SUBBUF_TRUNC(uatomic_read(&ust_buf->offset), buf) + - cons_off) > 0; + cons_off = SUBBUF_ALIGN(cons_off, buf)) + ltt_relay_print_subbuffer_errors(buf, cons_off, cpu); +} + +static void ltt_relay_print_buffer_errors(struct buffer_info *buf, int cpu) +{ + struct ust_buffer *ust_buf = buf->bufstruct_mem; + + if (uatomic_read(&ust_buf->events_lost)) + ERR("channel %s: %ld events lost (cpu %d)", + buf->channel, + uatomic_read(&ust_buf->events_lost), cpu); + if (uatomic_read(&ust_buf->corrupted_subbuffers)) + ERR("channel %s : %ld corrupted subbuffers (cpu %d)", + buf->channel, + uatomic_read(&ust_buf->corrupted_subbuffers), cpu); + + ltt_relay_print_errors(buf, cpu); +} + /* Returns the size of a subbuffer size. This is the size that * will need to be written to disk. * @@ -64,81 +124,84 @@ size_t subbuffer_data_size(void *subbuf) void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, struct buffer_info *buf) { - struct ust_buffer *ustbuf = buf->bufstruct_mem; - - long write_offset = uatomic_read(&ustbuf->offset); - long consumed_offset = uatomic_read(&ustbuf->consumed); - - long i_subbuf; + struct ust_buffer *ust_buf = buf->bufstruct_mem; + unsigned long n_subbufs_order = get_count_order(buf->n_subbufs); + unsigned long commit_seq_mask = (~0UL >> n_subbufs_order); + unsigned long cons_off; + int ret; DBG("processing dead buffer (%s)", buf->name); - DBG("consumed offset is %ld (%s)", consumed_offset, buf->name); - DBG("write offset is %ld (%s)", write_offset, buf->name); - - /* First subbuf that we need to consume now. It is not modulo'd. - * Consumed_offset is the next byte to consume. */ - long first_subbuf = consumed_offset / buf->subbuf_size; - /* Last subbuf that we need to consume now. It is not modulo'd. - * Write_offset is the next place to write so write_offset-1 is the - * last place written. */ - long last_subbuf = (write_offset - 1) / buf->subbuf_size; - - DBG("first_subbuf=%ld", first_subbuf); - DBG("last_subbuf=%ld", last_subbuf); - - if(last_subbuf - first_subbuf >= buf->n_subbufs) { - DBG("an overflow has occurred, nothing can be recovered"); - return; - } - - /* Iterate on subbuffers to recover. */ - for(i_subbuf = first_subbuf % buf->n_subbufs; ; i_subbuf++, i_subbuf %= buf->n_subbufs) { - /* commit_seq is the offset in the buffer of the end of the last sequential commit. - * Bytes beyond this limit cannot be recovered. This is a free-running counter. */ - long commit_seq = uatomic_read(&ustbuf->commit_seq[i_subbuf]); - - unsigned long valid_length = buf->subbuf_size; - long n_subbufs_order = get_count_order(buf->n_subbufs); - long commit_seq_mask = (~0UL >> n_subbufs_order); - - struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem+i_subbuf*buf->subbuf_size); - - if((commit_seq & commit_seq_mask) == 0) { - /* There is nothing to do. */ - /* FIXME: is this needed? */ - break; - } + DBG("consumed offset is %ld (%s)", uatomic_read(&ust_buf->consumed), + buf->name); + DBG("write offset is %ld (%s)", uatomic_read(&ust_buf->offset), + buf->name); + + /* + * Iterate on subbuffers to recover, including the one the writer + * just wrote data into. Using write position - 1 since the writer + * position points into the position that is going to be written. + */ + for (cons_off = uatomic_read(&ust_buf->consumed); + (long) (SUBBUF_TRUNC(uatomic_read(&ust_buf->offset) - 1, buf) + - cons_off) >= 0; + cons_off = SUBBUF_ALIGN(cons_off, buf)) { + /* + * commit_seq is the offset in the buffer of the end of the last sequential commit. + * Bytes beyond this limit cannot be recovered. This is a free-running counter. + */ + unsigned long commit_seq = + uatomic_read(&ust_buf->commit_seq[SUBBUF_INDEX(cons_off, buf)]); + struct ltt_subbuffer_header *header = + (struct ltt_subbuffer_header *)((char *) buf->mem + + SUBBUF_INDEX(cons_off, buf) * buf->subbuf_size); + unsigned long valid_length; /* Check if subbuf was fully written. This is from Mathieu's algorithm/paper. */ - /* FIXME: not sure data_size = 0xffffffff when the buffer is not full. It might - * take the value of the header size initially */ if (((commit_seq - buf->subbuf_size) & commit_seq_mask) - - (USTD_BUFFER_TRUNC(consumed_offset, buf) >> n_subbufs_order) == 0 - && header->data_size != 0xffffffff && header->sb_size != 0xffffffff) { - /* If it was, we only check the data_size. This is the amount of valid data at - * the beginning of the subbuffer. */ + - (USTD_BUFFER_TRUNC(uatomic_read(&ust_buf->consumed), buf) >> n_subbufs_order) == 0 + && header->data_size != 0xffffffff) { + assert(header->sb_size != 0xffffffff); + /* + * If it was fully written, we only check the data_size. + * This is the amount of valid data at the beginning of + * the subbuffer. + */ valid_length = header->data_size; - DBG("writing full subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length); - } - else { - /* If the subbuffer was not fully written, then we don't check data_size because - * it hasn't been written yet. Instead we check commit_seq and use it to choose - * a value for data_size. The viewer will need this value when parsing. + DBG("writing full subbuffer (%ld) with valid_length = %ld", + SUBBUF_INDEX(cons_off, buf), valid_length); + } else { + /* + * If the subbuffer was not fully written, then we don't + * check data_size because it hasn't been written yet. + * Instead we check commit_seq and use it to choose a + * value for data_size. The viewer will need this value + * when parsing. Generally, this will happen only for + * the last subbuffer. However, if we have threads still + * holding reserved slots in the previous subbuffers, + * which could happen for other subbuffers prior to the + * last one. Note that when data_size is set, the + * commit_seq count is still at a value that shows the + * amount of valid data to read. It's only _after_ + * writing data_size that commit_seq is updated to + * include the end-of-buffer padding. */ - - valid_length = commit_seq & (buf->subbuf_size-1); - DBG("writing unfull subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length); + valid_length = commit_seq & (buf->subbuf_size - 1); + DBG("writing unfull subbuffer (%ld) with valid_length = %ld", + SUBBUF_INDEX(cons_off, buf), valid_length); header->data_size = valid_length; header->sb_size = PAGE_ALIGN(valid_length); - assert(i_subbuf == (last_subbuf % buf->n_subbufs)); } - /* TODO: check on_read_partial_subbuffer return value */ - if(callbacks->on_read_partial_subbuffer) - callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length); - - if(i_subbuf == last_subbuf % buf->n_subbufs) - break; + if (callbacks->on_read_partial_subbuffer) { + ret = callbacks->on_read_partial_subbuffer(callbacks, buf, + SUBBUF_INDEX(cons_off, buf), + valid_length); + if (ret < 0) + break; /* Error happened */ + } } + /* Increment the consumed offset */ + uatomic_set(&ust_buf->consumed, cons_off); + ltt_relay_print_buffer_errors(buf, buf->channel_cpu); }