* complete.
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
- uint64_t len, struct lttng_consumer_channel *channel,
- int timer, int wait)
+ uint64_t len, uint64_t version,
+ struct lttng_consumer_channel *channel, int timer, int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
health_code_update();
pthread_mutex_lock(&channel->metadata_cache->lock);
- ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
+ ret = consumer_metadata_cache_write(channel, offset, len, version,
+ metadata_str);
if (ret < 0) {
/* Unable to handle metadata. Notify session daemon. */
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
uint64_t len = msg.u.push_metadata.len;
uint64_t key = msg.u.push_metadata.key;
uint64_t offset = msg.u.push_metadata.target_offset;
+ uint64_t version = msg.u.push_metadata.version;
struct lttng_consumer_channel *channel;
DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
health_code_update();
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
- len, channel, 0, 1);
+ len, version, channel, 0, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_fatal;
}
}
}
- /* Try to rmdir all directories under shm_path root. */
- if (chan->root_shm_path[0]) {
- (void) run_as_recursive_rmdir(chan->root_shm_path,
- chan->uid, chan->gid);
- }
}
void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
consumer_metadata_cache_destroy(chan);
ustctl_destroy_channel(chan->uchan);
+ /* Try to rmdir all directories under shm_path root. */
+ if (chan->root_shm_path[0]) {
+ (void) run_as_recursive_rmdir(chan->root_shm_path,
+ chan->uid, chan->gid);
+ }
free(chan->stream_fds);
}
return ret;
}
+static
+void metadata_stream_reset_cache(struct lttng_consumer_stream *stream,
+ struct consumer_metadata_cache *cache)
+{
+ DBG("Metadata stream update to version %" PRIu64,
+ cache->version);
+ stream->ust_metadata_pushed = 0;
+ stream->metadata_version = cache->version;
+ stream->reset_metadata_flag = 1;
+}
+
+/*
+ * Check if the version of the metadata stream and metadata cache match.
+ * If the cache got updated, reset the metadata stream.
+ * The stream lock and metadata cache lock MUST be held.
+ * Return 0 on success, a negative value on error.
+ */
+static
+int metadata_stream_check_version(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ struct consumer_metadata_cache *cache = stream->chan->metadata_cache;
+
+ if (cache->version == stream->metadata_version) {
+ goto end;
+ }
+ metadata_stream_reset_cache(stream, cache);
+
+end:
+ return ret;
+}
+
/*
* Write up to one packet from the metadata cache to the channel.
*
int ret;
pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ ret = metadata_stream_check_version(stream);
+ if (ret < 0) {
+ goto end;
+ }
if (stream->chan->metadata_cache->max_offset
== stream->ust_metadata_pushed) {
ret = 0;
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- uint64_t len, key, offset;
+ uint64_t len, key, offset, version;
int ret;
assert(channel);
len = msg.u.push_metadata.len;
key = msg.u.push_metadata.key;
offset = msg.u.push_metadata.target_offset;
+ version = msg.u.push_metadata.version;
assert(key == channel->key);
if (len == 0) {
health_code_update();
ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, channel, timer, wait);
+ key, offset, len, version, channel, timer, wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive