Fix: consumer-stream: live viewers observe timestamps going backwards
[lttng-tools.git] / src / common / consumer / consumer-stream.cpp
index fbaf4aef502e69ea3d8937863df6aa68e0ccbdee..f9807cb0173f4bf6f4a7c0e9f838afb4631c16ec 100644 (file)
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/metadata-bucket.h>
+#include <common/consumer/metadata-bucket.h>
 #include <common/index/index.h>
 #include <common/kernel-consumer/kernel-consumer.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/macros.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 #include <common/utils.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/kernel-ctl/kernel-ctl.h>
 
 #include "consumer-stream.h"
 
@@ -51,6 +55,12 @@ static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream
        pthread_mutex_unlock(&stream->chan->lock);
 }
 
+static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+       ASSERT_LOCKED(stream->chan->lock);
+}
+
 static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
 {
        consumer_stream_data_lock_all(stream);
@@ -63,6 +73,12 @@ static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *st
        consumer_stream_data_unlock_all(stream);
 }
 
+static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->metadata_rdv_lock);
+       consumer_stream_data_assert_locked_all(stream);
+}
+
 /* Only used for data streams. */
 static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuf)
@@ -403,6 +419,7 @@ static int consumer_stream_sync_metadata_index(
                const struct stream_subbuffer *subbuffer,
                struct lttng_consumer_local_data *ctx)
 {
+       bool missed_metadata_flush;
        int ret;
 
        /* Block until all the metadata is sent. */
@@ -415,18 +432,34 @@ static int consumer_stream_sync_metadata_index(
 
        pthread_mutex_lock(&stream->metadata_timer_lock);
        stream->waiting_on_metadata = false;
-       if (stream->missed_metadata_flush) {
+       missed_metadata_flush = stream->missed_metadata_flush;
+       if (missed_metadata_flush) {
                stream->missed_metadata_flush = false;
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
-               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
-       } else {
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
        }
+       pthread_mutex_unlock(&stream->metadata_timer_lock);
        if (ret < 0) {
                goto end;
        }
 
        ret = consumer_stream_send_index(stream, subbuffer, ctx);
+       /*
+        * Send the live inactivity beacon to handle the situation where
+        * the live timer is prevented from sampling this stream
+        * because the stream lock was being held while this stream is
+        * waiting on metadata. This ensures live viewer progress in the
+        * unlikely scenario where a live timer would be prevented from
+        * locking a stream lock repeatedly due to a steady flow of
+        * incoming metadata, for a stream which is mostly inactive.
+        *
+        * It is important to send the inactivity beacon packet to
+        * relayd _after_ sending the index associated with the data
+        * that was just sent, otherwise this can cause live viewers to
+        * observe timestamps going backwards between an inactivity
+        * beacon and a following trace packet.
+        */
+       if (missed_metadata_flush) {
+               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
+       }
 end:
        return ret;
 }
@@ -711,6 +744,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                                consumer_stream_metadata_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_metadata_unlock_all;
+               stream->read_subbuffer_ops.assert_locked =
+                               consumer_stream_metadata_assert_locked_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                metadata_stream_check_version;
        } else {
@@ -739,6 +774,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_data_unlock_all;
+               stream->read_subbuffer_ops.assert_locked =
+                               consumer_stream_data_assert_locked_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                consumer_stream_update_stats;
        }
This page took 0.026322 seconds and 4 git commands to generate.