X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-metadata-cache.cpp;h=462e079d8c705dfb5785244fd67f3cd4b63d5f59;hb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;hp=a810c7dab9eea84bbac194d6f1afa304603ee100;hpb=c9e313bc594f40a86eed237dce222c0fc99c957f;p=lttng-tools.git diff --git a/src/common/consumer/consumer-metadata-cache.cpp b/src/common/consumer/consumer-metadata-cache.cpp index a810c7dab..462e079d8 100644 --- a/src/common/consumer/consumer-metadata-cache.cpp +++ b/src/common/consumer/consumer-metadata-cache.cpp @@ -7,20 +7,20 @@ */ #define _LGPL_SOURCE -#include -#include -#include -#include -#include -#include +#include "consumer-metadata-cache.hpp" #include -#include +#include #include #include -#include +#include -#include "consumer-metadata-cache.hpp" +#include +#include +#include +#include +#include +#include enum metadata_cache_update_version_status { METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED, @@ -32,8 +32,7 @@ extern struct lttng_consumer_global_data the_consumer_data; /* * Reset the metadata cache. */ -static -void metadata_cache_reset(struct consumer_metadata_cache *cache) +static void metadata_cache_reset(struct consumer_metadata_cache *cache) { const int ret = lttng_dynamic_buffer_set_size(&cache->contents, 0); @@ -45,8 +44,8 @@ void metadata_cache_reset(struct consumer_metadata_cache *cache) * If it did, reset the metadata cache. * The metadata cache lock MUST be held. */ -static enum metadata_cache_update_version_status metadata_cache_update_version( - struct consumer_metadata_cache *cache, uint64_t version) +static enum metadata_cache_update_version_status +metadata_cache_update_version(struct consumer_metadata_cache *cache, uint64_t version) { enum metadata_cache_update_version_status status; @@ -74,8 +73,10 @@ end: */ enum consumer_metadata_cache_write_status consumer_metadata_cache_write(struct consumer_metadata_cache *cache, - unsigned int offset, unsigned int len, uint64_t version, - const char *data) + unsigned int offset, + unsigned int len, + uint64_t version, + const char *data) { int ret = 0; enum consumer_metadata_cache_write_status status; @@ -87,15 +88,14 @@ consumer_metadata_cache_write(struct consumer_metadata_cache *cache, original_size = cache->contents.size; if (metadata_cache_update_version(cache, version) == - METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) { + METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) { metadata_cache_reset(cache); cache_is_invalidated = true; } DBG("Writing %u bytes from offset %u in metadata cache", len, offset); if (offset + len > cache->contents.size) { - ret = lttng_dynamic_buffer_set_size( - &cache->contents, offset + len); + ret = lttng_dynamic_buffer_set_size(&cache->contents, offset + len); if (ret) { ERR("Extending metadata cache"); status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR; @@ -129,33 +129,30 @@ int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel) LTTNG_ASSERT(channel); - channel->metadata_cache = (consumer_metadata_cache *) zmalloc( - sizeof(struct consumer_metadata_cache)); + channel->metadata_cache = zmalloc(); if (!channel->metadata_cache) { PERROR("zmalloc metadata cache struct"); ret = -1; goto end; } - ret = pthread_mutex_init(&channel->metadata_cache->lock, NULL); + ret = pthread_mutex_init(&channel->metadata_cache->lock, nullptr); if (ret != 0) { PERROR("mutex init"); goto end_free_cache; } lttng_dynamic_buffer_init(&channel->metadata_cache->contents); - ret = lttng_dynamic_buffer_set_capacity( - &channel->metadata_cache->contents, - DEFAULT_METADATA_CACHE_SIZE); + ret = lttng_dynamic_buffer_set_capacity(&channel->metadata_cache->contents, + DEFAULT_METADATA_CACHE_SIZE); if (ret) { PERROR("Failed to pre-allocate metadata cache storage of %d bytes on creation", - DEFAULT_METADATA_CACHE_SIZE); + DEFAULT_METADATA_CACHE_SIZE); ret = -1; goto end_free_mutex; } DBG("Allocated metadata cache: current capacity = %zu", - lttng_dynamic_buffer_get_capacity_left( - &channel->metadata_cache->contents)); + lttng_dynamic_buffer_get_capacity_left(&channel->metadata_cache->contents)); ret = 0; goto end; @@ -187,17 +184,16 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) /* * Check if the cache is flushed up to the offset passed in parameter. * - * Return 0 if everything has been flushed, 1 if there is data not flushed. + * Return true if everything has been flushed, false if there is data not flushed. */ -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, - uint64_t offset, int timer) +namespace { +bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + int timer) { - int ret = 0; + bool done_flushing = false; struct lttng_consumer_stream *metadata_stream; - LTTNG_ASSERT(channel); - LTTNG_ASSERT(channel->metadata_cache); - /* * If not called from a timer handler, we have to take the * channel lock to be mutually exclusive with channel teardown. @@ -215,7 +211,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, * Having no metadata stream means the channel is being destroyed so there * is no cache to flush anymore. */ - ret = 0; + done_flushing = true; goto end_unlock_channel; } @@ -223,23 +219,57 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, pthread_mutex_lock(&channel->metadata_cache->lock); if (metadata_stream->ust_metadata_pushed >= offset) { - ret = 0; - } else if (channel->metadata_stream->endpoint_status != - CONSUMER_ENDPOINT_ACTIVE) { + done_flushing = true; + } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { /* An inactive endpoint means we don't have to flush anymore. */ - ret = 0; + done_flushing = true; } else { /* Still not completely flushed. */ - ret = 1; + done_flushing = false; } pthread_mutex_unlock(&channel->metadata_cache->lock); pthread_mutex_unlock(&metadata_stream->lock); + end_unlock_channel: pthread_mutex_unlock(&channel->timer_lock); if (!timer) { pthread_mutex_unlock(&channel->lock); } - return ret; + return done_flushing; +} +} /* namespace */ + +/* + * Wait until the cache is flushed up to the offset passed in parameter or the + * metadata stream has been destroyed. + */ +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + bool invoked_by_timer) +{ + assert(channel); + assert(channel->metadata_cache); + + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + return; + } + + /* Metadata cache is not currently flushed, wait on wait queue. */ + for (;;) { + struct lttng_waiter waiter; + + lttng_waiter_init(&waiter); + lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter); + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + /* Wake up all waiters, ourself included. */ + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + /* Ensure proper teardown of waiter. */ + lttng_waiter_wait(&waiter); + break; + } + + lttng_waiter_wait(&waiter); + } }