Fix: consumerd: slow metadata push slows down application registration
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 29 Jun 2023 18:04:37 +0000 (14:04 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 27 Jul 2023 17:49:24 +0000 (13:49 -0400)
Issue observed
--------------

When rotating the channels of a session configured with a "per-pid"
buffer sharing policy, applications with a long registration
timeout (e.g. LTTNG_UST_REGISTER_TIMEOUT=-1, see LTTNG-UST(3)) sometimes
experience long start-up times.

Cause
-----

The session list lock is held during the registration of an application
and during the setup of a rotation.

While setting up a rotation in the userspace domain, the session daemon
flushes its metadata cache to the userspace consumer daemon and waits
for a confirmation that all metadata emitted before that point in time
has been serialized (whether on disk or sent through a network output).

As the consumer daemon waits for the metadata to be consumed, it
periodically checks the metadata stream's output position with a 200ms
delay (see DEFAULT_METADATA_AVAILABILITY_WAIT_TIME).

In practice, in per-uid mode, this delay is seldomly encountered since
the metadata has already been pushed by the consumption thread.
Moreover, if it was not, a single polling iteration will typically
suffice.

However, in per-pid buffering mode and with a sustained "heavy" data
production rate, this delay becomes problematic since:
  - metadata is pushed for every application,
  - the delay is hit almost systematically as the consumption thread
    is busy and has to catch up to consume the most recent metadata.

Hence, some rotation setups can easily take multiple seconds (at least
200ms per application). This makes the locking scheme employed on that
path unsuitable as it blocks some operations (like application
registrations) for an extended period of time.

Solution
--------

The polling "back-off" delay is eliminated by using a waiter that allows
the consumer daemon thread that runs the metadata push command to
wake-up whenever the criteria used to evaluate the "pushed" metadata
position are changed.

Those criteria are:
  - the metadata stream's pushed position
  - the lifetime of the metadata channel's stream
  - the status of the session's endpoint

Whenever those states are affected, the waiters are woken-up to force a
re-evaluation of the metadata cache flush position and, eventually,
cause the metadata push command to complete.

Note
----

The waiter queue is adapted from urcu-wait.h of liburcu (also LGPL
licensed).

Change-Id: Ib86c2e878abe205c73f930e6de958c0b10486a37
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/notification-thread-events.c
src/common/consumer/consumer-metadata-cache.c
src/common/consumer/consumer-metadata-cache.h
src/common/consumer/consumer-timer.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h
src/common/waiter.c
src/common/waiter.h

index 5733ecda8ac63f2d95b89a975548f91e8cd20d85..eba9947fb7409ecebee8fc081755226a1f67d680 100644 (file)
@@ -3286,12 +3286,12 @@ end:
                free(cmd);
                cmd = NULL;
        } else {
-               lttng_waiter_wake_up(&cmd->reply_waiter);
+               lttng_waiter_wake(&cmd->reply_waiter);
        }
        return ret;
 error_unlock:
        /* Wake-up and return a fatal error to the calling thread. */
-       lttng_waiter_wake_up(&cmd->reply_waiter);
+       lttng_waiter_wake(&cmd->reply_waiter);
        cmd->reply_code = LTTNG_ERR_FATAL;
 error:
        /* Indicate a fatal error to the caller. */
index 1038b84e63fd58cb3fb2dc49a76a6e1a3fc7cdc8..a5943fd33ae6f8f858af5c61e3504a70dbceb2e1 100644 (file)
@@ -188,17 +188,15 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel)
 /*
  * Check if the cache is flushed up to the offset passed in parameter.
  *
- * Return 0 if everything has been flushed, 1 if there is data not flushed.
+ * Return true if everything has been flushed, false if there is data not flushed.
  */
-int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+static
+bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel,
                uint64_t offset, int timer)
 {
-       int ret = 0;
+       bool done_flushing = false;
        struct lttng_consumer_stream *metadata_stream;
 
-       assert(channel);
-       assert(channel->metadata_cache);
-
        /*
         * If not called from a timer handler, we have to take the
         * channel lock to be mutually exclusive with channel teardown.
@@ -216,7 +214,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
                 * Having no metadata stream means the channel is being destroyed so there
                 * is no cache to flush anymore.
                 */
-               ret = 0;
+               done_flushing = true;
                goto end_unlock_channel;
        }
 
@@ -224,23 +222,56 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
        pthread_mutex_lock(&channel->metadata_cache->lock);
 
        if (metadata_stream->ust_metadata_pushed >= offset) {
-               ret = 0;
+               done_flushing = true;
        } else if (channel->metadata_stream->endpoint_status !=
                        CONSUMER_ENDPOINT_ACTIVE) {
                /* An inactive endpoint means we don't have to flush anymore. */
-               ret = 0;
+               done_flushing = true;
        } else {
                /* Still not completely flushed. */
-               ret = 1;
+               done_flushing = false;
        }
 
        pthread_mutex_unlock(&channel->metadata_cache->lock);
        pthread_mutex_unlock(&metadata_stream->lock);
+
 end_unlock_channel:
        pthread_mutex_unlock(&channel->timer_lock);
        if (!timer) {
                pthread_mutex_unlock(&channel->lock);
        }
 
-       return ret;
+       return done_flushing;
+}
+
+/*
+ * Wait until the cache is flushed up to the offset passed in parameter or the
+ * metadata stream has been destroyed.
+ */
+void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+               uint64_t offset, bool invoked_by_timer)
+{
+       assert(channel);
+       assert(channel->metadata_cache);
+
+       if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
+               return;
+       }
+
+       /* Metadata cache is not currently flushed, wait on wait queue. */
+       for (;;) {
+               struct lttng_waiter waiter;
+
+               lttng_waiter_init(&waiter);
+               lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter);
+               if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
+                       /* Wake up all waiters, ourself included. */
+                       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+                       /* Ensure proper teardown of waiter. */
+                       lttng_waiter_wait(&waiter);
+                       break;
+               }
+
+               lttng_waiter_wait(&waiter);
+       }
 }
index b8f4efadc4c5ec59913cb75ea1644882fab2331b..ae588e8d514747a066ed400c507710feda2731e8 100644 (file)
@@ -56,7 +56,7 @@ consumer_metadata_cache_write(struct consumer_metadata_cache *cache,
                const char *data);
 int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
 void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
-int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
-               uint64_t offset, int timer);
+void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+               uint64_t offset, bool invoked_by_timer);
 
 #endif /* CONSUMER_METADATA_CACHE_H */
index d0cf170dacf1b08f88dcdc77ba1b541d0ede23c6..a16ae2eb95a50eda2e6a83e12aeb11b14babff1c 100644 (file)
@@ -99,7 +99,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
                 * - metadata_socket_lock
                 *   - Calling lttng_ustconsumer_recv_metadata():
                 *     - channel->metadata_cache->lock
-                *     - Calling consumer_metadata_cache_flushed():
+                *     - Calling consumer_wait_metadata_cache_flushed():
                 *       - channel->timer_lock
                 *         - channel->metadata_cache->lock
                 *
@@ -108,7 +108,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
                 * they are held while consumer_timer_switch_stop() is
                 * called.
                 */
-               ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
+               ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
                if (ret < 0) {
                        channel->switch_timer_error = 1;
                }
index cbd3f67a900d211fc6173de56464a4faa47c754f..a2df6c9587b5b047f6c71a286d3973f3a248f1f1 100644 (file)
@@ -465,6 +465,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
        cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
@@ -1047,6 +1049,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->is_live = is_in_live_session;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
+       lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
 
        switch (output) {
        case LTTNG_EVENT_SPLICE:
@@ -2177,6 +2180,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
         * pointer value.
         */
        channel->metadata_stream = NULL;
+       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
index 5128bcd257620d31d77853b541c74199a5d3c906..53c39e6eb003e9530050de53e276f7ef1672fe05 100644 (file)
@@ -29,6 +29,7 @@
 #include <common/credentials.h>
 #include <common/buffer-view.h>
 #include <common/dynamic-array.h>
+#include <common/waiter.h>
 
 struct lttng_consumer_local_data;
 
@@ -185,6 +186,11 @@ struct lttng_consumer_channel {
        /* Metadata cache is metadata channel */
        struct consumer_metadata_cache *metadata_cache;
 
+       /*
+        * Wait queue awaiting updates to metadata stream's flushed position.
+        */
+       struct lttng_wait_queue metadata_pushed_wait_queue;
+
        /* For UST metadata periodical flush */
        int switch_timer_enabled;
        timer_t switch_timer;
index e27e15ca573ade45605606ebbff0a01448ae67a2..b43ae58ffd84b4be5c836130341d6c8b200b672c 100644 (file)
@@ -950,6 +950,8 @@ error:
         */
        consumer_stream_destroy(metadata->metadata_stream, NULL);
        metadata->metadata_stream = NULL;
+       lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
 send_streams_error:
 error_no_stream:
 end:
@@ -985,7 +987,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
        if (ret < 0) {
                goto error;
        }
@@ -1032,6 +1034,7 @@ error_stream:
         */
        consumer_stream_destroy(metadata_stream, NULL);
        metadata_channel->metadata_stream = NULL;
+       lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
 
 error:
        rcu_read_unlock();
@@ -1275,7 +1278,7 @@ void metadata_stream_reset_cache_consumed_position(
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, uint64_t version,
-               struct lttng_consumer_channel *channel, int timer, int wait)
+               struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
@@ -1364,13 +1367,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        if (!wait) {
                goto end_free;
        }
-       while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
-               DBG("Waiting for metadata to be flushed");
-
-               health_code_update();
 
-               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
-       }
+       consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
 
 end_free:
        free(metadata_str);
@@ -1821,7 +1819,7 @@ end_get_channel_nosignal:
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
-                               version, found_channel, 0, 1);
+                               version, found_channel, false, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
@@ -2613,6 +2611,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                goto end;
        }
        stream->ust_metadata_pushed += write_len;
+       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
 
        assert(stream->chan->metadata_cache->contents.size >=
                        stream->ust_metadata_pushed);
@@ -2662,7 +2661,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata(
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                status = SYNC_METADATA_STATUS_ERROR;
@@ -3312,7 +3311,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * pushed out due to concurrent interaction with the session daemon.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer, int wait)
+               struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
@@ -3420,7 +3419,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
-                       key, offset, len, version, channel, timer, wait);
+                       key, offset, len, version, channel, invoked_by_timer, wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
index e481f0850114a68165bc1baff41559ef09bbd93b..d8d7b6b1a7af05ebeaf76f36a039dff65ba57e92 100644 (file)
@@ -49,9 +49,11 @@ void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata);
 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream);
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, uint64_t version,
-               struct lttng_consumer_channel *channel, int timer, int wait);
+               struct lttng_consumer_channel *channel, bool invoked_by_timer,
+               int wait);
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer, int wait);
+               struct lttng_consumer_channel *channel, bool invoked_by_timer,
+               int wait);
 enum sync_metadata_status lttng_ustconsumer_sync_metadata(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *metadata);
index db2de6557f5df5a5465d66c8e2df591202953983..e72f53bd46ec31ab7582bda10bcd04ceeaf7f86f 100644 (file)
@@ -43,15 +43,18 @@ void lttng_waiter_wait(struct lttng_waiter *waiter)
 {
        unsigned int i;
 
-       DBG("Beginning of waiter wait period");
-       /* Load and test condition before read state */
+       DBG("Beginning of waiter \"wait\" period");
+
+       /* Load and test condition before read state. */
        cmm_smp_rmb();
        for (i = 0; i < WAIT_ATTEMPTS; i++) {
                if (uatomic_read(&waiter->state) != WAITER_WAITING) {
                        goto skip_futex_wait;
                }
+
                caa_cpu_relax();
        }
+
        while (uatomic_read(&waiter->state) == WAITER_WAITING) {
                if (!futex_noasync(&waiter->state, FUTEX_WAIT, WAITER_WAITING, NULL, NULL, 0)) {
                        /*
@@ -64,6 +67,7 @@ void lttng_waiter_wait(struct lttng_waiter *waiter)
                         */
                        continue;
                }
+
                switch (errno) {
                case EAGAIN:
                        /* Value already changed. */
@@ -90,13 +94,16 @@ skip_futex_wait:
                if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) {
                        break;
                }
+
                caa_cpu_relax();
        }
+
        while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) {
                poll(NULL, 0, 10);
        }
+
        assert(uatomic_read(&waiter->state) & WAITER_TEARDOWN);
-       DBG("End of waiter wait period");
+       DBG("End of waiter \"wait\" period");
 }
 
 /*
@@ -105,7 +112,7 @@ skip_futex_wait:
  * it to free this memory when it sees the WAITER_TEARDOWN flag.
  */
 LTTNG_HIDDEN
-void lttng_waiter_wake_up(struct lttng_waiter *waiter)
+void lttng_waiter_wake(struct lttng_waiter *waiter)
 {
        cmm_smp_mb();
        assert(uatomic_read(&waiter->state) == WAITER_WAITING);
@@ -117,6 +124,44 @@ void lttng_waiter_wake_up(struct lttng_waiter *waiter)
                        abort();
                }
        }
+
        /* Allow teardown of struct urcu_wait memory. */
        uatomic_or(&waiter->state, WAITER_TEARDOWN);
 }
+
+
+LTTNG_HIDDEN
+void lttng_wait_queue_init(struct lttng_wait_queue *queue)
+{
+       cds_wfs_init(&queue->stack);
+}
+
+LTTNG_HIDDEN
+void lttng_wait_queue_add(struct lttng_wait_queue *queue,
+               struct lttng_waiter *waiter)
+{
+       (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node);
+}
+
+LTTNG_HIDDEN
+void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue)
+{
+       struct cds_wfs_head *waiters;
+       struct cds_wfs_node *iter, *iter_n;
+
+       /* Move all waiters from the queue to our local stack. */
+       waiters = __cds_wfs_pop_all(&queue->stack);
+
+       /* Wake all waiters in our stack head. */
+       cds_wfs_for_each_blocking_safe(waiters, iter, iter_n) {
+               struct lttng_waiter *waiter =
+                       container_of(iter, struct lttng_waiter, wait_queue_node);
+
+               /* Don't wake already running threads. */
+               if (waiter->state & WAITER_RUNNING) {
+                       continue;
+               }
+
+               lttng_waiter_wake(waiter);
+       }
+}
index f4b73c74427c13ce7f949d810a8d9839e4bb92b4..ed84ac3c2894f6353a661e86164452cf84ef9e11 100644 (file)
@@ -22,6 +22,10 @@ struct lttng_waiter {
        int32_t state;
 };
 
+struct lttng_wait_queue {
+       struct cds_wfs_stack stack;
+};
+
 LTTNG_HIDDEN
 void lttng_waiter_init(struct lttng_waiter *waiter);
 
@@ -29,11 +33,29 @@ LTTNG_HIDDEN
 void lttng_waiter_wait(struct lttng_waiter *waiter);
 
 /*
- * lttng_waiter_wake_up must only be called by a single waker.
+ * lttng_waiter_wake must only be called by a single waker.
  * It is invalid for multiple "wake" operations to be invoked
  * on a single waiter without re-initializing it before.
  */
 LTTNG_HIDDEN
-void lttng_waiter_wake_up(struct lttng_waiter *waiter);
+void lttng_waiter_wake(struct lttng_waiter *waiter);
+
+LTTNG_HIDDEN
+void lttng_wait_queue_init(struct lttng_wait_queue *queue);
+
+/*
+ * Atomically add a waiter to a wait queue.
+ * A full memory barrier is issued before being added to the wait queue.
+ */
+LTTNG_HIDDEN
+void lttng_wait_queue_add(struct lttng_wait_queue *queue,
+               struct lttng_waiter *waiter);
+
+/*
+ * Wake every waiter present in the wait queue and remove them from
+ * the queue.
+ */
+LTTNG_HIDDEN
+void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue);
 
 #endif /* LTTNG_WAITER_H */
This page took 0.034732 seconds and 4 git commands to generate.