*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
{
int ret = 0;
char *tmp_data_ptr;
- unsigned int new_size;
+ unsigned int new_size, old_size;
assert(channel);
assert(channel->metadata_cache);
- new_size = max_t(unsigned int,
- channel->metadata_cache->cache_alloc_size + size,
- channel->metadata_cache->cache_alloc_size << 1);
+ old_size = channel->metadata_cache->cache_alloc_size;
+ new_size = max_t(unsigned int, old_size + size, old_size << 1);
DBG("Extending metadata cache to %u", new_size);
tmp_data_ptr = realloc(channel->metadata_cache->data, new_size);
if (!tmp_data_ptr) {
ret = -1;
goto end;
}
+ /* Zero newly allocated memory */
+ memset(tmp_data_ptr + old_size, 0, new_size - old_size);
channel->metadata_cache->data = tmp_data_ptr;
channel->metadata_cache->cache_alloc_size = new_size;
/*
* Write metadata to the cache, extend the cache if necessary. We support
- * non-contiguous updates but not overlapping ones. If there is contiguous
- * metadata in the cache, we send it to the ring buffer. The metadata cache
+ * overlapping updates, but they need to be contiguous. Send the
+ * contiguous metadata in cache to the ring buffer. The metadata cache
* lock MUST be acquired to write in the cache.
*
* Return 0 on success, a negative value on error.
unsigned int offset, unsigned int len, char *data)
{
int ret = 0;
+ int size_ret;
struct consumer_metadata_cache *cache;
assert(channel);
}
memcpy(cache->data + offset, data, len);
- cache->total_bytes_written += len;
if (offset + len > cache->max_offset) {
- cache->max_offset = offset + len;
- }
+ char dummy = 'c';
- if (cache->max_offset == cache->total_bytes_written) {
- offset = cache->rb_pushed;
- len = cache->total_bytes_written - cache->rb_pushed;
- ret = lttng_ustconsumer_push_metadata(channel, cache->data, offset,
- len);
- if (ret < 0) {
- ERR("Pushing metadata");
- goto end;
+ cache->max_offset = offset + len;
+ if (channel->monitor) {
+ size_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
+ &dummy, 1);
+ if (size_ret < 1) {
+ ERR("Wakeup UST metadata pipe");
+ ret = -1;
+ goto end;
+ }
}
- cache->rb_pushed += len;
}
end:
DBG("Destroying metadata cache");
- if (channel->metadata_cache->max_offset >
- channel->metadata_cache->rb_pushed) {
- ERR("Destroying a cache not entirely commited");
- }
-
pthread_mutex_destroy(&channel->metadata_cache->lock);
free(channel->metadata_cache->data);
free(channel->metadata_cache);
* Return 0 if everything has been flushed, 1 if there is data not flushed.
*/
int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
- uint64_t offset)
+ uint64_t offset, int timer)
{
- int ret;
- struct consumer_metadata_cache *cache;
+ int ret = 0;
+ struct lttng_consumer_stream *metadata_stream;
assert(channel);
assert(channel->metadata_cache);
- cache = channel->metadata_cache;
-
- pthread_mutex_lock(&consumer_data.lock);
+ /*
+ * If not called from a timer handler, we have to take the
+ * channel lock to be mutually exclusive with channel teardown.
+ * Timer handler does not need to take this lock because it is
+ * already synchronized by timer stop (and, more importantly,
+ * taking this lock in a timer handler would cause a deadlock).
+ */
+ if (!timer) {
+ pthread_mutex_lock(&channel->lock);
+ }
+ pthread_mutex_lock(&channel->timer_lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
- if (cache->rb_pushed >= offset) {
- ret = 0;
- } else if (!channel->metadata_stream) {
+ metadata_stream = channel->metadata_stream;
+
+ if (!metadata_stream) {
/*
* Having no metadata stream means the channel is being destroyed so there
* is no cache to flush anymore.
*/
ret = 0;
+ } else if (metadata_stream->ust_metadata_pushed >= offset) {
+ ret = 0;
} else if (channel->metadata_stream->endpoint_status !=
CONSUMER_ENDPOINT_ACTIVE) {
/* An inactive endpoint means we don't have to flush anymore. */
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&channel->timer_lock);
+ if (!timer) {
+ pthread_mutex_unlock(&channel->lock);
+ }
return ret;
}