X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=e99ba8a5942a77084b740f75ef80fc0781b89ed3;hb=74d48abe30fc241acf091e4a1215bc215887f2d2;hp=4f8cc37189397c499886302b56ce4107ac8640bc;hpb=e185cf4b895d5d9a040d74be1dfa4d004eff04b7;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 4f8cc371..e99ba8a5 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -1107,7 +1107,7 @@ retry: */ if (((commit_count - chan->backend.subbuf_size) & chan->commit_count_mask) - - (buf_trunc(consumed_cur, chan) + - (buf_trunc(consumed, chan) >> chan->backend.num_subbuf_order) != 0) goto nodata; @@ -1116,7 +1116,7 @@ retry: * Check that we are not about to read the same subbuffer in * which the writer head is. */ - if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan) + if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan) == 0) goto nodata; @@ -1449,10 +1449,11 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - uint64_t *tsc) + uint64_t *tsc, + struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - unsigned long off; + unsigned long off, reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); offsets->old = offsets->begin; @@ -1477,36 +1478,69 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * timestamps) are visible to the reader. This is required for * quiescence guarantees for the fusion merge. */ - if (mode == SWITCH_FLUSH || off > 0) { - if (caa_unlikely(off == 0)) { - /* - * A final flush that encounters an empty - * sub-buffer cannot switch buffer if a - * reader is located within this sub-buffer. - * Anyway, the purpose of final flushing of a - * sub-buffer at offset 0 is to handle the case - * of entirely empty stream. - */ - if (caa_unlikely(subbuf_trunc(offsets->begin, chan) - - subbuf_trunc((unsigned long) - uatomic_read(&buf->consumed), chan) - >= chan->backend.buf_size)) - return -1; - /* - * The client does not save any header information. - * Don't switch empty subbuffer on finalize, because it - * is invalid to deliver a completely empty subbuffer. - */ - if (!config->cb.subbuffer_header_size()) + if (mode != SWITCH_FLUSH && !off) + return -1; /* we do not have to switch : buffer is empty */ + + if (caa_unlikely(off == 0)) { + unsigned long sb_index, commit_count; + + /* + * We are performing a SWITCH_FLUSH. At this stage, there are no + * concurrent writes into the buffer. + * + * The client does not save any header information. Don't + * switch empty subbuffer on finalize, because it is invalid to + * deliver a completely empty subbuffer. + */ + if (!config->cb.subbuffer_header_size()) + return -1; + + /* Test new buffer integrity */ + sb_index = subbuf_index(offsets->begin, chan); + commit_count = v_read(config, + &shmp_index(handle, buf->commit_cold, + sb_index)->cc_sb); + reserve_commit_diff = + (buf_trunc(offsets->begin, chan) + >> chan->backend.num_subbuf_order) + - (commit_count & chan->commit_count_mask); + if (caa_likely(reserve_commit_diff == 0)) { + /* Next subbuffer not being written to. */ + if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE && + subbuf_trunc(offsets->begin, chan) + - subbuf_trunc((unsigned long) + uatomic_read(&buf->consumed), chan) + >= chan->backend.buf_size)) { + /* + * We do not overwrite non consumed buffers + * and we are full : don't switch. + */ return -1; + } else { + /* + * Next subbuffer not being written to, and we + * are either in overwrite mode or the buffer is + * not full. It's safe to write in this new + * subbuffer. + */ + } + } else { /* - * Need to write the subbuffer start header on finalize. + * Next subbuffer reserve offset does not match the + * commit offset. Don't perform switch in + * producer-consumer and overwrite mode. Caused by + * either a writer OOPS or too many nested writes over a + * reserve/commit pair. */ - offsets->switch_old_start = 1; + return -1; } - offsets->begin = subbuf_align(offsets->begin, chan); - } else - return -1; /* we do not have to switch : buffer is empty */ + + /* + * Need to write the subbuffer start header on finalize. + */ + offsets->switch_old_start = 1; + } + offsets->begin = subbuf_align(offsets->begin, chan); /* Note: old points to the next subbuf at offset 0 */ offsets->end = offsets->begin; return 0; @@ -1536,7 +1570,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi */ do { if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets, - &tsc)) + &tsc, handle)) return; /* Switch not needed */ } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old);