#include <fcntl.h>
#include <signal.h>
#include <time.h>
+#include <stdbool.h>
#include <urcu/compiler.h>
#include <urcu/ref.h>
#include <urcu/tls-compat.h>
.lock = PTHREAD_MUTEX_INITIALIZER,
};
-int lttng_ust_blocking_retry_timeout =
- CONFIG_LTTNG_UST_DEFAULT_BLOCKING_RETRY_TIMEOUT_MS;
+static bool lttng_ust_allow_blocking;
-void lttng_ust_ringbuffer_set_retry_timeout(int timeout)
+void lttng_ust_ringbuffer_set_allow_blocking(void)
{
- lttng_ust_blocking_retry_timeout = timeout;
+ lttng_ust_allow_blocking = true;
+}
+
+/* Get blocking timeout, in ms */
+static int lttng_ust_ringbuffer_get_timeout(struct channel *chan)
+{
+ if (!lttng_ust_allow_blocking)
+ return 0;
+ return chan->u.s.blocking_timeout_ms;
}
/**
void *buf_addr, size_t subbuf_size,
size_t num_subbuf, unsigned int switch_timer_interval,
unsigned int read_timer_interval,
- const int *stream_fds, int nr_stream_fds)
+ const int *stream_fds, int nr_stream_fds,
+ int64_t blocking_timeout)
{
int ret;
size_t shmsize, chansize;
struct lttng_ust_shm_handle *handle;
struct shm_object *shmobj;
unsigned int nr_streams;
+ int64_t blocking_timeout_ms;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
nr_streams = num_possible_cpus();
if (nr_stream_fds != nr_streams)
return NULL;
+ if (blocking_timeout < -1) {
+ return NULL;
+ }
+ /* usec to msec */
+ if (blocking_timeout == -1) {
+ blocking_timeout_ms = -1;
+ } else {
+ blocking_timeout_ms = blocking_timeout / 1000;
+ if (blocking_timeout_ms != (int32_t) blocking_timeout_ms) {
+ return NULL;
+ }
+ }
+
if (lib_ring_buffer_check_config(config, switch_timer_interval,
read_timer_interval))
return NULL;
/* Allocate normal memory for channel (not shared) */
shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM,
- -1);
+ -1, -1);
if (!shmobj)
goto error_append;
/* struct channel is at object 0, offset 0 (hardcoded) */
*priv_data = NULL;
}
+ chan->u.s.blocking_timeout_ms = (int32_t) blocking_timeout_ms;
+
ret = channel_backend_init(&chan->backend, name, config,
subbuf_size, num_subbuf, handle,
stream_fds);
return -EAGAIN;
}
+/**
+ * Performs the same function as lib_ring_buffer_snapshot(), but the positions
+ * are saved regardless of whether the consumed and produced positions are
+ * in the same subbuffer.
+ * @buf: ring buffer
+ * @consumed: consumed byte count indicating the last position read
+ * @produced: produced byte count indicating the last position written
+ *
+ * This function is meant to provide information on the exact producer and
+ * consumer positions without regard for the "snapshot" feature.
+ */
+int lib_ring_buffer_snapshot_sample_positions(
+ struct lttng_ust_lib_ring_buffer *buf,
+ unsigned long *consumed, unsigned long *produced,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct channel *chan;
+ const struct lttng_ust_lib_ring_buffer_config *config;
+
+ chan = shmp(handle, buf->backend.chan);
+ if (!chan)
+ return -EPERM;
+ config = &chan->backend.config;
+ cmm_smp_rmb();
+ *consumed = uatomic_read(&buf->consumed);
+ /*
+ * No need to issue a memory barrier between consumed count read and
+ * write offset read, because consumed count can only change
+ * concurrently in overwrite mode, and we keep a sequence counter
+ * identifier derived from the write offset to check we are getting
+ * the same sub-buffer we are expecting (the sub-buffers are atomically
+ * "tagged" upon writes, tags are checked upon read).
+ */
+ *produced = v_read(config, &buf->offset);
+ return 0;
+}
+
/**
* lib_ring_buffer_move_consumer - move consumed counter forward
* @buf: ring buffer
int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- struct lttng_ust_lib_ring_buffer_ctx *ctx)
+ struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle = ctx->handle;
unsigned long reserve_commit_diff, offset_cmp;
- int timeout_left_ms = lttng_ust_blocking_retry_timeout;
+ int timeout_left_ms = lttng_ust_ringbuffer_get_timeout(chan);
retry:
offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->size = config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
* -EIO for other errors, else returns 0.
* It will take care of sub-buffer switching.
*/
-int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
+int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
struct channel *chan = ctx->chan;
struct lttng_ust_shm_handle *handle = ctx->handle;
do {
ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
- ctx);
+ ctx, client_ctx);
if (caa_unlikely(ret))
return ret;
} while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,