struct switch_offsets {
unsigned long begin, end, old;
size_t pre_header_padding, size;
- unsigned int switch_new_start:1, switch_old_start:1, switch_old_end:1;
+ unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
+ switch_old_end:1;
};
#ifdef CONFIG_NO_HZ
config->cb.subbuffer_header_size());
}
+/*
+ * lib_ring_buffer_switch_new_end: finish switching current subbuffer
+ *
+ * Calls subbuffer_set_data_size() to set the data size of the current
+ * sub-buffer. We do not need to perform check_deliver nor commit here,
+ * since this task will be done by the "commit" of the event for which
+ * we are currently doing the space reservation.
+ */
+static
+void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
+ struct channel *chan,
+ struct switch_offsets *offsets,
+ u64 tsc)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ unsigned long endidx, data_size;
+
+ endidx = subbuf_index(offsets->end - 1, chan);
+ data_size = subbuf_offset(offsets->end - 1, chan) + 1;
+ subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+}
+
/*
* Returns :
* 0 if ok
struct lib_ring_buffer_ctx *ctx)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- unsigned long reserve_commit_diff;
+ unsigned long reserve_commit_diff, offset_cmp;
- offsets->begin = v_read(config, &buf->offset);
+retry:
+ offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->old = offsets->begin;
offsets->switch_new_start = 0;
+ offsets->switch_new_end = 0;
offsets->switch_old_end = 0;
offsets->pre_header_padding = 0;
}
}
if (unlikely(offsets->switch_new_start)) {
- unsigned long sb_index;
+ unsigned long sb_index, commit_count;
/*
* We are typically not filling the previous buffer completely.
+ config->cb.subbuffer_header_size();
/* Test new buffer integrity */
sb_index = subbuf_index(offsets->begin, chan);
+ /*
+ * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+ * lib_ring_buffer_check_deliver() has the matching
+ * memory barriers required around commit_cold cc_sb
+ * updates to ensure reserve and commit counter updates
+ * are not seen reordered when updated by another CPU.
+ */
+ smp_rmb();
+ commit_count = v_read(config,
+ &buf->commit_cold[sb_index].cc_sb);
+ /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+ smp_rmb();
+ if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
+ /*
+ * The reserve counter have been concurrently updated
+ * while we read the commit counter. This means the
+ * commit counter we read might not match buf->offset
+ * due to concurrent update. We therefore need to retry.
+ */
+ goto retry;
+ }
reserve_commit_diff =
(buf_trunc(offsets->begin, chan)
>> chan->backend.num_subbuf_order)
- - ((unsigned long) v_read(config,
- &buf->commit_cold[sb_index].cc_sb)
- & chan->commit_count_mask);
+ - (commit_count & chan->commit_count_mask);
if (likely(reserve_commit_diff == 0)) {
/* Next subbuffer not being written to. */
if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
} else {
/*
* Next subbuffer reserve offset does not match the
- * commit offset. Drop record in producer-consumer and
- * overwrite mode. Caused by either a writer OOPS or too
- * many nested writes over a reserve/commit pair.
+ * commit offset, and this did not involve update to the
+ * reserve counter. Drop record in producer-consumer and
+ * overwrite mode. Caused by either a writer OOPS or
+ * too many nested writes over a reserve/commit pair.
*/
v_inc(config, &buf->records_lost_wrap);
return -EIO;
*/
}
offsets->end = offsets->begin + offsets->size;
+
+ if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
+ /*
+ * The offset_end will fall at the very beginning of the next
+ * subbuffer.
+ */
+ offsets->switch_new_end = 1; /* For offsets->begin */
+ }
return 0;
}
if (unlikely(offsets.switch_new_start))
lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
+ if (unlikely(offsets.switch_new_end))
+ lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
+
ctx->slot_size = offsets.size;
ctx->pre_offset = offsets.begin;
ctx->buf_offset = offsets.begin + offsets.pre_header_padding;