X-Git-Url: http://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=5f1f93b0e1024c6c1eee422d80930b71ccaaf1a8;hp=be00191749ba5e81e03fef925f1c95ee3bb2ee96;hb=b1316da1ffbd276fc8271e7a9438e683ad352781;hpb=934ba8fdfbe546a25a9df61b6292bdb3d28e4ab1 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index be0019174..5f1f93b0e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1283,6 +1283,17 @@ error_unlock: return ret; } +static +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + + DBG("Reset metadata cache of session %" PRIu64, + stream->chan->session_id); + stream->ust_metadata_pushed = 0; +} + /* * Receive the metadata updates from the sessiond. Supports receiving * overlapping metadata, but is needs to always belong to a contiguous @@ -1297,6 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; + enum consumer_metadata_cache_write_status cache_write_status; DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); @@ -1320,10 +1332,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, health_code_update(); pthread_mutex_lock(&channel->metadata_cache->lock); - ret = consumer_metadata_cache_write(channel, offset, len, version, - metadata_str); + cache_write_status = consumer_metadata_cache_write( + channel, offset, len, version, metadata_str); pthread_mutex_unlock(&channel->metadata_cache->lock); - if (ret < 0) { + switch (cache_write_status) { + case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: + /* + * The write entirely overlapped with existing contents of the + * same metadata version (same content); there is nothing to do. + */ + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED: + /* + * The metadata cache was invalidated (previously pushed + * content has been overwritten). Reset the stream's consumed + * metadata position to ensure the metadata poll thread consumes + * the whole cache. + */ + pthread_mutex_lock(&channel->metadata_stream->lock); + metadata_stream_reset_cache_consumed_position( + channel->metadata_stream); + pthread_mutex_unlock(&channel->metadata_stream->lock); + /* Fall-through. */ + case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT: + /* + * In both cases, the metadata poll thread has new data to + * consume. + */ + ret = consumer_metadata_wakeup_pipe(channel); + if (ret) { + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto end_free; + } + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR: /* Unable to handle metadata. Notify session daemon. */ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; /* @@ -1332,6 +1374,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ goto end_free; + default: + abort(); } if (!wait) { @@ -2464,15 +2508,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) return ustctl_stream_close_wakeup_fd(stream->ustream); } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) -{ - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); - stream->ust_metadata_pushed = 0; -} - /* * Write up to one packet from the metadata cache to the channel. * @@ -3051,6 +3086,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); assert(stream->ustream); + ASSERT_LOCKED(stream->lock); DBG("UST consumer checking data pending");