Fix: consumerd: live client receives incomplete metadata
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 1af9840cd5184b0cfebad0bf680257fbbadac9a5..6d6690a32161a4cbc24e05157f31407fe9c1df74 100644 (file)
@@ -37,6 +37,7 @@
 #include <common/utils.h>
 #include <common/index/index.h>
 #include <common/consumer/consumer.h>
+#include <common/optional.h>
 
 #include "ust-consumer.h"
 
@@ -1230,7 +1231,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
 
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(
                                        stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
@@ -2464,7 +2465,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        assert(write_len != 0);
        if (write_len < 0) {
                ERR("Writing one metadata packet");
-               ret = -1;
+               ret = write_len;
                goto end;
        }
        stream->ust_metadata_pushed += write_len;
@@ -2810,28 +2811,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
                struct stream_subbuffer *subbuffer)
 {
        int ret;
+       bool cache_empty;
+       bool got_subbuffer;
+       bool coherent;
+       bool buffer_empty;
+       unsigned long consumed_pos, produced_pos;
 
-       ret = ustctl_get_next_subbuf(stream->ustream);
-       if (ret) {
-               ret = commit_one_metadata_packet(stream);
-               if (ret < 0) {
-                       goto end;
-               } else if (ret == 0) {
-                       /* Not an error, the cache is empty. */
-                       ret = -ENODATA;
-                       goto end;
+       do {
+               ret = ustctl_get_next_subbuf(stream->ustream);
+               if (ret == 0) {
+                       got_subbuffer = true;
+               } else {
+                       got_subbuffer = false;
+                       if (ret != -EAGAIN) {
+                               /* Fatal error. */
+                               goto end;
+                       }
                }
 
-               ret = ustctl_get_next_subbuf(stream->ustream);
-               if (ret) {
-                       goto end;
+               /*
+                * Determine if the cache is empty and ensure that a sub-buffer
+                * is made available if the cache is not empty.
+                */
+               if (!got_subbuffer) {
+                       ret = commit_one_metadata_packet(stream);
+                       if (ret < 0 && ret != -ENOBUFS) {
+                               goto end;
+                       } else if (ret == 0) {
+                               /* Not an error, the cache is empty. */
+                               cache_empty = true;
+                               ret = -ENODATA;
+                               goto end;
+                       } else {
+                               cache_empty = false;
+                       }
+               } else {
+                       pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+                       cache_empty = stream->chan->metadata_cache->max_offset ==
+                                     stream->ust_metadata_pushed;
+                       pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                }
-       }
+       } while (!got_subbuffer);
 
+       /* Populate sub-buffer infos and view. */
        ret = get_next_subbuffer_common(stream, subbuffer);
        if (ret) {
                goto end;
        }
+
+       ret = lttng_ustconsumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               /*
+                * -EAGAIN is not expected since we got a sub-buffer and haven't
+                * pushed the consumption position yet (on put_next).
+                */
+               PERROR("Failed to take a snapshot of metadata buffer positions");
+               goto end;
+       }
+
+       ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
+       if (ret) {
+               PERROR("Failed to get metadata consumed position");
+               goto end;
+       }
+
+       ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
+       if (ret) {
+               PERROR("Failed to get metadata produced position");
+               goto end;
+       }
+
+       /* Last sub-buffer of the ring buffer ? */
+       buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
+
+       /*
+        * The sessiond registry lock ensures that coherent units of metadata
+        * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
+        * acquired, the cache is empty, and it is the only available sub-buffer
+        * available, it is safe to assume that it is "coherent".
+        */
+       coherent = got_subbuffer && cache_empty && buffer_empty;
+
+       LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
 end:
        return ret;
 }
@@ -2851,9 +2912,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream,
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
 }
 
-static void lttng_ustconsumer_set_stream_ops(
+static int lttng_ustconsumer_set_stream_ops(
                struct lttng_consumer_stream *stream)
 {
+       int ret = 0;
+
        stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
        if (stream->metadata_flag) {
                stream->read_subbuffer_ops.get_next_subbuffer =
@@ -2862,7 +2925,14 @@ static void lttng_ustconsumer_set_stream_ops(
                                extract_metadata_subbuffer_info;
                stream->read_subbuffer_ops.reset_metadata =
                                metadata_stream_reset_cache;
-               stream->read_subbuffer_ops.on_sleep = signal_metadata;
+               if (stream->chan->is_live) {
+                       stream->read_subbuffer_ops.on_sleep = signal_metadata;
+                       ret = consumer_stream_enable_metadata_bucketization(
+                                       stream);
+                       if (ret) {
+                               goto end;
+                       }
+               }
        } else {
                stream->read_subbuffer_ops.get_next_subbuffer =
                                get_next_subbuffer;
@@ -2876,6 +2946,8 @@ static void lttng_ustconsumer_set_stream_ops(
        }
 
        stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+       return ret;
 }
 
 /*
This page took 0.027813 seconds and 4 git commands to generate.