X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-metadata-cache.cpp;fp=src%2Fcommon%2Fconsumer%2Fconsumer-metadata-cache.cpp;h=462e079d8c705dfb5785244fd67f3cd4b63d5f59;hb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;hp=9af9ab0fc67ecb1d16f5f5a6a77c6780550f8d06;hpb=a968a9e069d76bfb1ac416dc834994c6a82b085e;p=lttng-tools.git diff --git a/src/common/consumer/consumer-metadata-cache.cpp b/src/common/consumer/consumer-metadata-cache.cpp index 9af9ab0fc..462e079d8 100644 --- a/src/common/consumer/consumer-metadata-cache.cpp +++ b/src/common/consumer/consumer-metadata-cache.cpp @@ -184,18 +184,16 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) /* * Check if the cache is flushed up to the offset passed in parameter. * - * Return 0 if everything has been flushed, 1 if there is data not flushed. + * Return true if everything has been flushed, false if there is data not flushed. */ -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, - uint64_t offset, - int timer) +namespace { +bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + int timer) { - int ret = 0; + bool done_flushing = false; struct lttng_consumer_stream *metadata_stream; - LTTNG_ASSERT(channel); - LTTNG_ASSERT(channel->metadata_cache); - /* * If not called from a timer handler, we have to take the * channel lock to be mutually exclusive with channel teardown. @@ -213,7 +211,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, * Having no metadata stream means the channel is being destroyed so there * is no cache to flush anymore. */ - ret = 0; + done_flushing = true; goto end_unlock_channel; } @@ -221,22 +219,57 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, pthread_mutex_lock(&channel->metadata_cache->lock); if (metadata_stream->ust_metadata_pushed >= offset) { - ret = 0; + done_flushing = true; } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { /* An inactive endpoint means we don't have to flush anymore. */ - ret = 0; + done_flushing = true; } else { /* Still not completely flushed. */ - ret = 1; + done_flushing = false; } pthread_mutex_unlock(&channel->metadata_cache->lock); pthread_mutex_unlock(&metadata_stream->lock); + end_unlock_channel: pthread_mutex_unlock(&channel->timer_lock); if (!timer) { pthread_mutex_unlock(&channel->lock); } - return ret; + return done_flushing; +} +} /* namespace */ + +/* + * Wait until the cache is flushed up to the offset passed in parameter or the + * metadata stream has been destroyed. + */ +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + bool invoked_by_timer) +{ + assert(channel); + assert(channel->metadata_cache); + + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + return; + } + + /* Metadata cache is not currently flushed, wait on wait queue. */ + for (;;) { + struct lttng_waiter waiter; + + lttng_waiter_init(&waiter); + lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter); + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + /* Wake up all waiters, ourself included. */ + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + /* Ensure proper teardown of waiter. */ + lttng_waiter_wait(&waiter); + break; + } + + lttng_waiter_wait(&waiter); + } }