Fix: ust-consumer: metadata thread not woken-up after version change
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 8 Feb 2021 19:40:33 +0000 (14:40 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 9 Feb 2021 03:03:23 +0000 (22:03 -0500)
Issue observed
==============

The metadata regeneration test fails, very rarely, in the "streaming"
case on the CI. The interesting part of the test boils down to:
  1) start session
  2) launch an app tracing one event
  3) stop session
  4) delete metadata file
  5) start session
  6) regenerate metadata
  7) stop session
  8) destroy session
  9) read trace: babeltrace fails on an invalid metadata file.

The problem is hard to capture, but modifying the test allows us to see
that there appears to be a short window between steps 7 and 8 where the
metadata file is empty or doesn't exist.

Cause
=====

When metadata is regenerated, its version is bumped and the metadata
cache is "reset". In some cases, such as in this test, the new metadata
will have exactly the same size as it had prior as nothing happened to
change that (e.g. no new apps/probes were registered).

When this occurs, the metadata thread is not woken-up by
consumer_metadata_cache_write() as it sees that max_offset of the
metadata cache didn't change; the data was replaced but it has the same
size.

The metadata consumption thread also checks for version bumps and
resets the amount of consumed metadata. Hence, if the "cache write"
operation woke up the metadata consumption thread, the stream's
"ust metadata pushed" state would be reset and the new contents would
be consumed.

Solution
========

The metadata stream's "ust metadata pushed" position is directly reset
to zero when a metadata version change is detected by the metadata
cache. The metadata poll thread is also woken up to resume the
consumption of the newly-available data.

It is unclear why the change to the consumption position was only done
on the metadata consumption thread's code path and not directly by the
session daemon command handling.

Note that a session rotation will also result in a reset of the pushed
position and a wake-up of the metadata poll thread from the command
handling thread. I am speculating that this couldn't be done due to the
design of the locking at the time of the original
implementation (I haven't checked).

In implementing this change, the metadata reception code path is
untangled a bit to separate the logic that affects the metadata stream
from the logic that manages the metadata cache. I suspect the original
error stems from a mix-up/confusion between both concerns.

When a metadata version change happens, the metadata cache resets its
'max_offset' (in other words, it's current size) and notifies the
caller. The caller then resets the "ust pushed metadata" position to
zero and wakes-up the metadata thread to consume the new contents of the
metadata cache.

Known drawbacks
===============

None.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I142ef957140d497ac7fc4294ca65a55c12518598

src/common/consumer/consumer-metadata-cache.c
src/common/consumer/consumer-metadata-cache.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/ust-consumer/ust-consumer.c

index 987bf7c8577d8c683a6b7e4a751c8b77ff2108a5..f712fa7f65043aeff6d688b06f9b22867683c8cc 100644 (file)
 
 #include "consumer-metadata-cache.h"
 
+enum metadata_cache_update_version_status {
+       METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED,
+       METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED,
+};
+
 extern struct lttng_consumer_global_data consumer_data;
 
 /*
@@ -74,60 +79,23 @@ void metadata_cache_reset(struct consumer_metadata_cache *cache)
  * Check if the metadata cache version changed.
  * If it did, reset the metadata cache.
  * The metadata cache lock MUST be held.
- *
- * Returns 0 on success, a negative value on error.
  */
-static
-int metadata_cache_check_version(struct consumer_metadata_cache *cache,
-               uint64_t version)
+static enum metadata_cache_update_version_status metadata_cache_update_version(
+               struct consumer_metadata_cache *cache, uint64_t version)
 {
-       int ret = 0;
+       enum metadata_cache_update_version_status status;
 
        if (cache->version == version) {
+               status = METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED;
                goto end;
        }
 
        DBG("Metadata cache version update to %" PRIu64, version);
-       metadata_cache_reset(cache);
        cache->version = version;
+       status = METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED;
 
 end:
-       return ret;
-}
-
-/*
- * Write a character on the metadata poll pipe to wake the metadata thread.
- * Returns 0 on success, -1 on error.
- */
-int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
-{
-       int ret = 0;
-       const char dummy = 'c';
-
-       if (channel->monitor && channel->metadata_stream) {
-               ssize_t write_ret;
-
-               write_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
-                               &dummy, 1);
-               if (write_ret < 1) {
-                       if (errno == EWOULDBLOCK) {
-                               /*
-                                * This is fine, the metadata poll thread
-                                * is having a hard time keeping-up, but
-                                * it will eventually wake-up and consume
-                                * the available data.
-                                */
-                               ret = 0;
-                       } else {
-                               PERROR("Wake-up UST metadata pipe");
-                               ret = -1;
-                               goto end;
-                       }
-               }
-       }
-
-end:
-       return ret;
+       return status;
 }
 
 /*
@@ -136,23 +104,31 @@ end:
  * contiguous metadata in cache to the ring buffer. The metadata cache
  * lock MUST be acquired to write in the cache.
  *
- * Return 0 on success, a negative value on error.
+ * See `enum consumer_metadata_cache_write_status` for the meaning of the
+ * various returned status codes.
  */
-int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
+enum consumer_metadata_cache_write_status
+consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
                unsigned int offset, unsigned int len, uint64_t version,
                const char *data)
 {
        int ret = 0;
        struct consumer_metadata_cache *cache;
+       enum consumer_metadata_cache_write_status status;
+       bool cache_is_invalidated = false;
+       uint64_t original_max_offset;
 
        assert(channel);
        assert(channel->metadata_cache);
 
        cache = channel->metadata_cache;
+       ASSERT_LOCKED(cache->lock);
+       original_max_offset = cache->max_offset;
 
-       ret = metadata_cache_check_version(cache, version);
-       if (ret < 0) {
-               goto end;
+       if (metadata_cache_update_version(cache, version) ==
+                       METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) {
+               metadata_cache_reset(cache);
+               cache_is_invalidated = true;
        }
 
        DBG("Writing %u bytes from offset %u in metadata cache", len, offset);
@@ -162,18 +138,25 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
                                len - cache->cache_alloc_size + offset);
                if (ret < 0) {
                        ERR("Extending metadata cache");
+                       status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR;
                        goto end;
                }
        }
 
        memcpy(cache->data + offset, data, len);
-       if (offset + len > cache->max_offset) {
-               cache->max_offset = offset + len;
-               ret = consumer_metadata_wakeup_pipe(channel);
+       cache->max_offset = max(cache->max_offset, offset + len);
+
+       if (cache_is_invalidated) {
+               status = CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED;
+       } else if (cache->max_offset > original_max_offset) {
+               status = CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT;
+       } else {
+               status = CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE;
+               assert(cache->max_offset == original_max_offset);
        }
 
 end:
-       return ret;
+       return status;
 }
 
 /*
index ecb7e15495d2a1706fa013c0d9b9741af90aa328..4c029b90e9208fdf74b1e8a6745028de49bb340a 100644 (file)
 
 #include <common/consumer/consumer.h>
 
+enum consumer_metadata_cache_write_status {
+       CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR = -1,
+       /*
+        * New metadata content was appended to the cache successfully.
+        * Previously available content remains valid.
+        */
+       CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT = 0,
+       /*
+        * The new content pushed to the cache invalidated the content that
+        * was already present. The contents of the cache should be re-read.
+        */
+       CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED,
+       /*
+        * A metadata cache write can simply overwrite an already existing
+        * section of the cache (and it should be a write-through with identical
+        * data). From the caller's standpoint, there is no change to the state
+        * of the cache.
+        */
+       CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE,
+};
+
 struct consumer_metadata_cache {
        char *data;
        uint64_t cache_alloc_size;
@@ -35,13 +56,13 @@ struct consumer_metadata_cache {
        pthread_mutex_t lock;
 };
 
-int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
+enum consumer_metadata_cache_write_status
+consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
                unsigned int offset, unsigned int len, uint64_t version,
                const char *data);
 int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
 void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
 int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
                uint64_t offset, int timer);
-int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
 
 #endif /* CONSUMER_METADATA_CACHE_H */
index 23853e5c29378c80144cfb1bda98cd2fe83305e7..cecdc3f9152e705b0b1f426856e167959c4d0283 100644 (file)
@@ -877,6 +877,43 @@ error:
        return outfd;
 }
 
+/*
+ * Write a character on the metadata poll pipe to wake the metadata thread.
+ * Returns 0 on success, -1 on error.
+ */
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
+{
+       int ret = 0;
+
+       DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
+                       channel->name);
+       if (channel->monitor && channel->metadata_stream) {
+               const char dummy = 'c';
+               const ssize_t write_ret = lttng_write(
+                               channel->metadata_stream->ust_metadata_poll_pipe[1],
+                               &dummy, 1);
+
+               if (write_ret < 1) {
+                       if (errno == EWOULDBLOCK) {
+                               /*
+                                * This is fine, the metadata poll thread
+                                * is having a hard time keeping-up, but
+                                * it will eventually wake-up and consume
+                                * the available data.
+                                */
+                               ret = 0;
+                       } else {
+                               PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Trigger a dump of the metadata content. Following/during the succesful
  * completion of this call, the metadata poll thread will start receiving
index b45f88b756b2032dbf90daa74e91f31ed9ffc940..7b381b2c75db6c7300cc6581206e0e12b1f5c0fa 100644 (file)
@@ -1052,5 +1052,6 @@ enum lttcomm_return_code lttng_consumer_init_command(
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
 enum lttcomm_return_code lttng_consumer_open_channel_packets(
                struct lttng_consumer_channel *channel);
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
index be00191749ba5e81e03fef925f1c95ee3bb2ee96..5f1f93b0e1024c6c1eee422d80930b71ccaaf1a8 100644 (file)
@@ -1283,6 +1283,17 @@ error_unlock:
        return ret;
 }
 
+static
+void metadata_stream_reset_cache_consumed_position(
+               struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+
+       DBG("Reset metadata cache of session %" PRIu64,
+                       stream->chan->session_id);
+       stream->ust_metadata_pushed = 0;
+}
+
 /*
  * Receive the metadata updates from the sessiond. Supports receiving
  * overlapping metadata, but is needs to always belong to a contiguous
@@ -1297,6 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
+       enum consumer_metadata_cache_write_status cache_write_status;
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
@@ -1320,10 +1332,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        health_code_update();
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
-       ret = consumer_metadata_cache_write(channel, offset, len, version,
-                       metadata_str);
+       cache_write_status = consumer_metadata_cache_write(
+                       channel, offset, len, version, metadata_str);
        pthread_mutex_unlock(&channel->metadata_cache->lock);
-       if (ret < 0) {
+       switch (cache_write_status) {
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
+               /*
+                * The write entirely overlapped with existing contents of the
+                * same metadata version (same content); there is nothing to do.
+                */
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
+               /*
+                * The metadata cache was invalidated (previously pushed
+                * content has been overwritten). Reset the stream's consumed
+                * metadata position to ensure the metadata poll thread consumes
+                * the whole cache.
+                */
+               pthread_mutex_lock(&channel->metadata_stream->lock);
+               metadata_stream_reset_cache_consumed_position(
+                               channel->metadata_stream);
+               pthread_mutex_unlock(&channel->metadata_stream->lock);
+               /* Fall-through. */
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
+               /*
+                * In both cases, the metadata poll thread has new data to
+                * consume.
+                */
+               ret = consumer_metadata_wakeup_pipe(channel);
+               if (ret) {
+                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto end_free;
+               }
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
                /*
@@ -1332,6 +1374,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * waiting for the metadata cache to be flushed.
                 */
                goto end_free;
+       default:
+               abort();
        }
 
        if (!wait) {
@@ -2464,15 +2508,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
        return ustctl_stream_close_wakeup_fd(stream->ustream);
 }
 
-static
-void metadata_stream_reset_cache_consumed_position(
-               struct lttng_consumer_stream *stream)
-{
-       DBG("Reset metadata cache of session %" PRIu64,
-                       stream->chan->session_id);
-       stream->ust_metadata_pushed = 0;
-}
-
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
@@ -3051,6 +3086,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
        assert(stream->ustream);
+       ASSERT_LOCKED(stream->lock);
 
        DBG("UST consumer checking data pending");
 
This page took 0.031467 seconds and 4 git commands to generate.