#include <urcu/compiler.h>
#include <urcu/tls-compat.h>
#include <signal.h>
+#include <stdint.h>
#include <pthread.h>
#include <lttng/ringbuffer-config.h>
#endif
extern
-int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx);
+int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx);
extern
void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf,
consumed_new) != consumed_old));
}
+/*
+ * Move consumed position to the beginning of subbuffer in which the
+ * write offset is. Should only be used on ring buffers that are not
+ * actively being written into, because clear_reader does not take into
+ * account the commit counters when moving the consumed position, which
+ * can make concurrent trace producers or consumers observe consumed
+ * position further than the write offset, which breaks ring buffer
+ * algorithm guarantees.
+ */
+static inline
+void lib_ring_buffer_clear_reader(struct lttng_ust_lib_ring_buffer *buf,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct channel *chan;
+ const struct lttng_ust_lib_ring_buffer_config *config;
+ unsigned long offset, consumed_old, consumed_new;
+
+ chan = shmp(handle, buf->backend.chan);
+ if (!chan)
+ return;
+ config = &chan->backend.config;
+
+ do {
+ offset = v_read(config, &buf->offset);
+ consumed_old = uatomic_read(&buf->consumed);
+ CHAN_WARN_ON(chan, (long) (subbuf_trunc(offset, chan)
+ - subbuf_trunc(consumed_old, chan))
+ < 0);
+ consumed_new = subbuf_trunc(offset, chan);
+ } while (caa_unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
+ consumed_new) != consumed_old));
+}
+
static inline
int lib_ring_buffer_pending_data(const struct lttng_ust_lib_ring_buffer_config *config,
struct lttng_ust_lib_ring_buffer *buf,
struct lttng_ust_shm_handle *handle)
{
unsigned long offset, idx, commit_count;
+ struct commit_counters_hot *cc_hot;
CHAN_WARN_ON(chan, config->alloc != RING_BUFFER_ALLOC_PER_CPU);
CHAN_WARN_ON(chan, config->sync != RING_BUFFER_SYNC_PER_CPU);
do {
offset = v_read(config, &buf->offset);
idx = subbuf_index(offset, chan);
- commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->cc);
+ cc_hot = shmp_index(handle, buf->commit_hot, idx);
+ if (caa_unlikely(!cc_hot))
+ return 0;
+ commit_count = v_read(config, &cc_hot->cc);
} while (offset != v_read(config, &buf->offset));
return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
void lib_ring_buffer_write_commit_counter(const struct lttng_ust_lib_ring_buffer_config *config,
struct lttng_ust_lib_ring_buffer *buf,
struct channel *chan,
- unsigned long idx,
unsigned long buf_offset,
unsigned long commit_count,
- struct lttng_ust_shm_handle *handle)
+ struct lttng_ust_shm_handle *handle,
+ struct commit_counters_hot *cc_hot)
{
unsigned long commit_seq_old;
if (caa_unlikely(subbuf_offset(buf_offset - commit_count, chan)))
return;
- commit_seq_old = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->seq);
- while ((long) (commit_seq_old - commit_count) < 0)
- commit_seq_old = v_cmpxchg(config, &shmp_index(handle, buf->commit_hot, idx)->seq,
- commit_seq_old, commit_count);
+ commit_seq_old = v_read(config, &cc_hot->seq);
+ if (caa_likely((long) (commit_seq_old - commit_count) < 0))
+ v_set(config, &cc_hot->seq, commit_count);
}
extern int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,