Calculate context length outside of retry loop
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index be20d69b9864d014c9271807254aef282c8b59c8..76d6ec746c40ade48905bcfbf0cd3c81274d812e 100644 (file)
@@ -60,6 +60,7 @@
 #include <fcntl.h>
 #include <signal.h>
 #include <time.h>
+#include <stdbool.h>
 #include <urcu/compiler.h>
 #include <urcu/ref.h>
 #include <urcu/tls-compat.h>
@@ -72,7 +73,7 @@
 #include "backend.h"
 #include "frontend.h"
 #include "shm.h"
-#include "tlsfixup.h"
+#include "rb-init.h"
 #include "../liblttng-ust/compat.h"    /* For ENODATA */
 
 /* Print DBG() messages about events lost only every 1048576 hits */
@@ -84,6 +85,7 @@
 #define CLOCKID                CLOCK_MONOTONIC
 #define LTTNG_UST_RING_BUFFER_GET_RETRY                10
 #define LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS   10
+#define RETRY_DELAY_MS                         100     /* 100 ms. */
 
 /*
  * Non-static to ensure the compiler does not optimize away the xor.
@@ -149,6 +151,14 @@ static struct timer_signal_data timer_signal = {
        .lock = PTHREAD_MUTEX_INITIALIZER,
 };
 
+int lttng_ust_blocking_retry_timeout =
+               CONFIG_LTTNG_UST_DEFAULT_BLOCKING_RETRY_TIMEOUT_MS;
+
+void lttng_ust_ringbuffer_set_retry_timeout(int timeout)
+{
+       lttng_ust_blocking_retry_timeout = timeout;
+}
+
 /**
  * lib_ring_buffer_reset - Reset ring buffer to initial values.
  * @buf: Ring buffer.
@@ -1295,6 +1305,43 @@ nodata:
                return -EAGAIN;
 }
 
+/**
+ * Performs the same function as lib_ring_buffer_snapshot(), but the positions
+ * are saved regardless of whether the consumed and produced positions are
+ * in the same subbuffer.
+ * @buf: ring buffer
+ * @consumed: consumed byte count indicating the last position read
+ * @produced: produced byte count indicating the last position written
+ *
+ * This function is meant to provide information on the exact producer and
+ * consumer positions without regard for the "snapshot" feature.
+ */
+int lib_ring_buffer_snapshot_sample_positions(
+                            struct lttng_ust_lib_ring_buffer *buf,
+                            unsigned long *consumed, unsigned long *produced,
+                            struct lttng_ust_shm_handle *handle)
+{
+       struct channel *chan;
+       const struct lttng_ust_lib_ring_buffer_config *config;
+
+       chan = shmp(handle, buf->backend.chan);
+       if (!chan)
+               return -EPERM;
+       config = &chan->backend.config;
+       cmm_smp_rmb();
+       *consumed = uatomic_read(&buf->consumed);
+       /*
+        * No need to issue a memory barrier between consumed count read and
+        * write offset read, because consumed count can only change
+        * concurrently in overwrite mode, and we keep a sequence counter
+        * identifier derived from the write offset to check we are getting
+        * the same sub-buffer we are expecting (the sub-buffers are atomically
+        * "tagged" upon writes, tags are checked upon read).
+        */
+       *produced = v_read(config, &buf->offset);
+       return 0;
+}
+
 /**
  * lib_ring_buffer_move_consumer - move consumed counter forward
  * @buf: ring buffer
@@ -1985,6 +2032,23 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi
        lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
 }
 
+static
+bool handle_blocking_retry(int *timeout_left_ms)
+{
+       int timeout = *timeout_left_ms, delay;
+
+       if (caa_likely(!timeout))
+               return false;   /* Do not retry, discard event. */
+       if (timeout < 0)        /* Wait forever. */
+               delay = RETRY_DELAY_MS;
+       else
+               delay = min_t(int, timeout, RETRY_DELAY_MS);
+       (void) poll(NULL, 0, delay);
+       if (timeout > 0)
+               *timeout_left_ms -= delay;
+       return true;    /* Retry. */
+}
+
 /*
  * Returns :
  * 0 if ok
@@ -1996,11 +2060,13 @@ static
 int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
                                     struct channel *chan,
                                     struct switch_offsets *offsets,
-                                    struct lttng_ust_lib_ring_buffer_ctx *ctx)
+                                    struct lttng_ust_lib_ring_buffer_ctx *ctx,
+                                    void *client_ctx)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        struct lttng_ust_shm_handle *handle = ctx->handle;
        unsigned long reserve_commit_diff, offset_cmp;
+       int timeout_left_ms = lttng_ust_blocking_retry_timeout;
 
 retry:
        offsets->begin = offset_cmp = v_read(config, &buf->offset);
@@ -2023,7 +2089,7 @@ retry:
                offsets->size = config->cb.record_header_size(config, chan,
                                                offsets->begin,
                                                &offsets->pre_header_padding,
-                                               ctx);
+                                               ctx, client_ctx);
                offsets->size +=
                        lib_ring_buffer_align(offsets->begin + offsets->size,
                                              ctx->largest_align)
@@ -2083,6 +2149,9 @@ retry:
                                >= chan->backend.buf_size)) {
                                unsigned long nr_lost;
 
+                               if (handle_blocking_retry(&timeout_left_ms))
+                                       goto retry;
+
                                /*
                                 * We do not overwrite non consumed buffers
                                 * and we are full : record is lost.
@@ -2126,7 +2195,7 @@ retry:
                        config->cb.record_header_size(config, chan,
                                                offsets->begin,
                                                &offsets->pre_header_padding,
-                                               ctx);
+                                               ctx, client_ctx);
                offsets->size +=
                        lib_ring_buffer_align(offsets->begin + offsets->size,
                                              ctx->largest_align)
@@ -2180,7 +2249,8 @@ retry:
  * -EIO for other errors, else returns 0.
  * It will take care of sub-buffer switching.
  */
-int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
+int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
+               void *client_ctx)
 {
        struct channel *chan = ctx->chan;
        struct lttng_ust_shm_handle *handle = ctx->handle;
@@ -2201,7 +2271,7 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
 
        do {
                ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
-                                                      ctx);
+                                                      ctx, client_ctx);
                if (caa_unlikely(ret))
                        return ret;
        } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
This page took 0.024992 seconds and 4 git commands to generate.