Fix: ust-consumer: metadata thread not woken-up after version change
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index be00191749ba5e81e03fef925f1c95ee3bb2ee96..5f1f93b0e1024c6c1eee422d80930b71ccaaf1a8 100644 (file)
@@ -1283,6 +1283,17 @@ error_unlock:
        return ret;
 }
 
+static
+void metadata_stream_reset_cache_consumed_position(
+               struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+
+       DBG("Reset metadata cache of session %" PRIu64,
+                       stream->chan->session_id);
+       stream->ust_metadata_pushed = 0;
+}
+
 /*
  * Receive the metadata updates from the sessiond. Supports receiving
  * overlapping metadata, but is needs to always belong to a contiguous
@@ -1297,6 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
+       enum consumer_metadata_cache_write_status cache_write_status;
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
@@ -1320,10 +1332,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        health_code_update();
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
-       ret = consumer_metadata_cache_write(channel, offset, len, version,
-                       metadata_str);
+       cache_write_status = consumer_metadata_cache_write(
+                       channel, offset, len, version, metadata_str);
        pthread_mutex_unlock(&channel->metadata_cache->lock);
-       if (ret < 0) {
+       switch (cache_write_status) {
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
+               /*
+                * The write entirely overlapped with existing contents of the
+                * same metadata version (same content); there is nothing to do.
+                */
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
+               /*
+                * The metadata cache was invalidated (previously pushed
+                * content has been overwritten). Reset the stream's consumed
+                * metadata position to ensure the metadata poll thread consumes
+                * the whole cache.
+                */
+               pthread_mutex_lock(&channel->metadata_stream->lock);
+               metadata_stream_reset_cache_consumed_position(
+                               channel->metadata_stream);
+               pthread_mutex_unlock(&channel->metadata_stream->lock);
+               /* Fall-through. */
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
+               /*
+                * In both cases, the metadata poll thread has new data to
+                * consume.
+                */
+               ret = consumer_metadata_wakeup_pipe(channel);
+               if (ret) {
+                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto end_free;
+               }
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
                /*
@@ -1332,6 +1374,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * waiting for the metadata cache to be flushed.
                 */
                goto end_free;
+       default:
+               abort();
        }
 
        if (!wait) {
@@ -2464,15 +2508,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
        return ustctl_stream_close_wakeup_fd(stream->ustream);
 }
 
-static
-void metadata_stream_reset_cache_consumed_position(
-               struct lttng_consumer_stream *stream)
-{
-       DBG("Reset metadata cache of session %" PRIu64,
-                       stream->chan->session_id);
-       stream->ust_metadata_pushed = 0;
-}
-
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
@@ -3051,6 +3086,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
        assert(stream->ustream);
+       ASSERT_LOCKED(stream->lock);
 
        DBG("UST consumer checking data pending");
 
This page took 0.024055 seconds and 4 git commands to generate.