Fix: post-clear trace chunk has a late beginning packet
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index 5dc380e5e32aa9a23c6d75c878bb6d2a3e10086d..398d71ae05f92f6d9c0437b4fb203532870eb5d6 100644 (file)
@@ -21,6 +21,7 @@
 #include <common/utils.h>
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "consumer-stream.h"
 
@@ -68,8 +69,7 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        uint64_t sequence_number;
-       const uint64_t discarded_events =
-                       LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number);
+       const uint64_t discarded_events = subbuf->info.data.events_discarded;
 
        if (!subbuf->info.data.sequence_number.is_set) {
                /* Command not supported by the tracer. */
@@ -153,7 +153,7 @@ static ssize_t consumer_stream_consume_mmap(
                        subbuffer->info.data.subbuf_size;
 
        return lttng_consumer_on_read_subbuffer_mmap(
-                       ctx, stream, &subbuffer->buffer.buffer, padding_size);
+                       stream, &subbuffer->buffer.buffer, padding_size);
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -196,6 +196,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
+       enum sync_metadata_status status;
 
        assert(metadata);
        assert(metadata->metadata_flag);
@@ -243,7 +244,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                        /*
                         * Empty the metadata cache and flush the current stream.
                         */
-                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       status = lttng_kconsumer_sync_metadata(metadata);
                        break;
                case LTTNG_CONSUMER32_UST:
                case LTTNG_CONSUMER64_UST:
@@ -251,18 +252,23 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                         * Ask the sessiond if we have new metadata waiting and update the
                         * consumer metadata cache.
                         */
-                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       status = lttng_ustconsumer_sync_metadata(ctx, metadata);
                        break;
                default:
-                       assert(0);
-                       ret = -1;
-                       break;
+                       abort();
                }
-               /*
-                * Error or no new metadata, we exit here.
-                */
-               if (ret <= 0 || ret == ENODATA) {
+
+               switch (status) {
+               case SYNC_METADATA_STATUS_NEW_DATA:
+                       break;
+               case SYNC_METADATA_STATUS_NO_DATA:
+                       ret = 0;
                        goto end_unlock_mutex;
+               case SYNC_METADATA_STATUS_ERROR:
+                       ret = -1;
+                       goto end_unlock_mutex;
+               default:
+                       abort();
                }
 
                /*
@@ -284,7 +290,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                 */
                pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
                pthread_mutex_unlock(&metadata->metadata_rdv_lock);
-       } while (ret == EAGAIN);
+       } while (status == SYNC_METADATA_STATUS_NEW_DATA);
 
        /* Success */
        return 0;
@@ -392,8 +398,8 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream,
        }
 
        DBG("New metadata version detected");
-       stream->metadata_version = subbuffer->info.metadata.version;
-       stream->reset_metadata_flag = 1;
+       consumer_stream_metadata_set_version(stream,
+                       subbuffer->info.metadata.version);
 
        if (stream->read_subbuffer_ops.reset_metadata) {
                stream->read_subbuffer_ops.reset_metadata(stream);
@@ -446,6 +452,8 @@ struct lttng_consumer_stream *consumer_stream_create(
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        stream->rotate_position = -1ULL;
+       /* Buffer is created with an open packet. */
+       stream->opened_packet_in_current_trace_chunk = true;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -726,6 +734,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
+       metadata_bucket_destroy(stream->metadata_bucket);
        call_rcu(&stream->node.head, free_stream_rcu);
 }
 
@@ -991,3 +1000,71 @@ bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
        assert(stream);
        return cds_lfht_is_node_deleted(&stream->node.node);
 }
+
+static ssize_t metadata_bucket_flush(
+               const struct stream_subbuffer *buffer, void *data)
+{
+       ssize_t ret;
+       struct lttng_consumer_stream *stream = data;
+
+       ret = consumer_stream_consume_mmap(NULL, stream, buffer);
+       if (ret < 0) {
+               goto end;
+       }
+end:
+       return ret;
+}
+
+static ssize_t metadata_bucket_consume(
+               struct lttng_consumer_local_data *unused,
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer)
+{
+       ssize_t ret;
+       enum metadata_bucket_status status;
+
+       status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
+       switch (status) {
+       case METADATA_BUCKET_STATUS_OK:
+               /* Return consumed size. */
+               ret = subbuffer->buffer.buffer.size;
+               break;
+       default:
+               ret = -1;
+       }
+
+       return ret;
+}
+
+int consumer_stream_enable_metadata_bucketization(
+               struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       assert(stream->metadata_flag);
+       assert(!stream->metadata_bucket);
+       assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+       stream->metadata_bucket = metadata_bucket_create(
+                       metadata_bucket_flush, stream);
+       if (!stream->metadata_bucket) {
+               ret = -1;
+               goto end;
+       }
+
+       stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
+end:
+       return ret;
+}
+
+void consumer_stream_metadata_set_version(
+               struct lttng_consumer_stream *stream, uint64_t new_version)
+{
+       assert(new_version > stream->metadata_version);
+       stream->metadata_version = new_version;
+       stream->reset_metadata_flag = 1;
+
+       if (stream->metadata_bucket) {
+               metadata_bucket_reset(stream->metadata_bucket);
+       }
+}
This page took 0.025887 seconds and 4 git commands to generate.