#include <fcntl.h>
#include <signal.h>
#include <time.h>
+#include <stdbool.h>
#include <urcu/compiler.h>
#include <urcu/ref.h>
#include <urcu/tls-compat.h>
#include "backend.h"
#include "frontend.h"
#include "shm.h"
-#include "tlsfixup.h"
+#include "rb-init.h"
#include "../liblttng-ust/compat.h" /* For ENODATA */
/* Print DBG() messages about events lost only every 1048576 hits */
#define CLOCKID CLOCK_MONOTONIC
#define LTTNG_UST_RING_BUFFER_GET_RETRY 10
#define LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS 10
+#define RETRY_DELAY_MS 100 /* 100 ms. */
/*
* Non-static to ensure the compiler does not optimize away the xor.
.lock = PTHREAD_MUTEX_INITIALIZER,
};
+int lttng_ust_blocking_retry_timeout =
+ CONFIG_LTTNG_UST_DEFAULT_BLOCKING_RETRY_TIMEOUT_MS;
+
+void lttng_ust_ringbuffer_set_retry_timeout(int timeout)
+{
+ lttng_ust_blocking_retry_timeout = timeout;
+}
+
/**
* lib_ring_buffer_reset - Reset ring buffer to initial values.
* @buf: Ring buffer.
return -EAGAIN;
}
+/**
+ * Performs the same function as lib_ring_buffer_snapshot(), but the positions
+ * are saved regardless of whether the consumed and produced positions are
+ * in the same subbuffer.
+ * @buf: ring buffer
+ * @consumed: consumed byte count indicating the last position read
+ * @produced: produced byte count indicating the last position written
+ *
+ * This function is meant to provide information on the exact producer and
+ * consumer positions without regard for the "snapshot" feature.
+ */
+int lib_ring_buffer_snapshot_sample_positions(
+ struct lttng_ust_lib_ring_buffer *buf,
+ unsigned long *consumed, unsigned long *produced,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct channel *chan;
+ const struct lttng_ust_lib_ring_buffer_config *config;
+
+ chan = shmp(handle, buf->backend.chan);
+ if (!chan)
+ return -EPERM;
+ config = &chan->backend.config;
+ cmm_smp_rmb();
+ *consumed = uatomic_read(&buf->consumed);
+ /*
+ * No need to issue a memory barrier between consumed count read and
+ * write offset read, because consumed count can only change
+ * concurrently in overwrite mode, and we keep a sequence counter
+ * identifier derived from the write offset to check we are getting
+ * the same sub-buffer we are expecting (the sub-buffers are atomically
+ * "tagged" upon writes, tags are checked upon read).
+ */
+ *produced = v_read(config, &buf->offset);
+ return 0;
+}
+
/**
* lib_ring_buffer_move_consumer - move consumed counter forward
* @buf: ring buffer
lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
}
+static
+bool handle_blocking_retry(int *timeout_left_ms)
+{
+ int timeout = *timeout_left_ms, delay;
+
+ if (caa_likely(!timeout))
+ return false; /* Do not retry, discard event. */
+ if (timeout < 0) /* Wait forever. */
+ delay = RETRY_DELAY_MS;
+ else
+ delay = min_t(int, timeout, RETRY_DELAY_MS);
+ (void) poll(NULL, 0, delay);
+ if (timeout > 0)
+ *timeout_left_ms -= delay;
+ return true; /* Retry. */
+}
+
/*
* Returns :
* 0 if ok
int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- struct lttng_ust_lib_ring_buffer_ctx *ctx)
+ struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle = ctx->handle;
unsigned long reserve_commit_diff, offset_cmp;
+ int timeout_left_ms = lttng_ust_blocking_retry_timeout;
retry:
offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->size = config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
>= chan->backend.buf_size)) {
unsigned long nr_lost;
+ if (handle_blocking_retry(&timeout_left_ms))
+ goto retry;
+
/*
* We do not overwrite non consumed buffers
* and we are full : record is lost.
config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
* -EIO for other errors, else returns 0.
* It will take care of sub-buffer switching.
*/
-int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
+int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
struct channel *chan = ctx->chan;
struct lttng_ust_shm_handle *handle = ctx->handle;
do {
ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
- ctx);
+ ctx, client_ctx);
if (caa_unlikely(ret))
return ret;
} while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,