#include "consumer-metadata-cache.h"
+extern struct lttng_consumer_global_data consumer_data;
+
/*
* Extend the allocated size of the metadata cache. Called only from
* lttng_ustconsumer_write_metadata_cache.
unsigned int offset, unsigned int len, char *data)
{
int ret = 0;
+ int size_ret;
struct consumer_metadata_cache *cache;
assert(channel);
}
if (cache->max_offset == cache->total_bytes_written) {
- offset = cache->rb_pushed;
- len = cache->total_bytes_written - cache->rb_pushed;
- ret = lttng_ustconsumer_push_metadata(channel, cache->data, offset,
- len);
- if (ret < 0) {
- ERR("Pushing metadata");
- goto end;
+ char dummy = 'c';
+
+ cache->contiguous = cache->max_offset;
+ if (channel->monitor) {
+ size_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
+ &dummy, 1);
+ if (size_ret < 1) {
+ ERR("Wakeup UST metadata pipe");
+ ret = -1;
+ goto end;
+ }
}
- cache->rb_pushed += len;
}
end:
DBG("Destroying metadata cache");
- if (channel->metadata_cache->max_offset >
- channel->metadata_cache->rb_pushed) {
- ERR("Destroying a cache not entirely commited");
- }
-
pthread_mutex_destroy(&channel->metadata_cache->lock);
free(channel->metadata_cache->data);
free(channel->metadata_cache);
* Return 0 if everything has been flushed, 1 if there is data not flushed.
*/
int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
- uint64_t offset)
+ uint64_t offset, int timer)
{
- int ret;
- struct consumer_metadata_cache *cache;
+ int ret = 0;
+ struct lttng_consumer_stream *metadata_stream;
assert(channel);
assert(channel->metadata_cache);
- cache = channel->metadata_cache;
-
+ /*
+ * If not called from a timer handler, we have to take the
+ * channel lock to be mutually exclusive with channel teardown.
+ * Timer handler does not need to take this lock because it is
+ * already synchronized by timer stop (and, more importantly,
+ * taking this lock in a timer handler would cause a deadlock).
+ */
+ if (!timer) {
+ pthread_mutex_lock(&channel->lock);
+ }
+ pthread_mutex_lock(&channel->timer_lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
- if (cache->rb_pushed >= offset) {
+
+ metadata_stream = channel->metadata_stream;
+
+ if (!metadata_stream) {
+ /*
+ * Having no metadata stream means the channel is being destroyed so there
+ * is no cache to flush anymore.
+ */
+ ret = 0;
+ } else if (metadata_stream->ust_metadata_pushed >= offset) {
+ ret = 0;
+ } else if (channel->metadata_stream->endpoint_status !=
+ CONSUMER_ENDPOINT_ACTIVE) {
+ /* An inactive endpoint means we don't have to flush anymore. */
ret = 0;
} else {
+ /* Still not completely flushed. */
ret = 1;
}
+
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->timer_lock);
+ if (!timer) {
+ pthread_mutex_unlock(&channel->lock);
+ }
return ret;
}