common: compile libconsumer, libust-consumer, libkernel-consumer as C++
[lttng-tools.git] / src / common / consumer / metadata-bucket.cpp
diff --git a/src/common/consumer/metadata-bucket.cpp b/src/common/consumer/metadata-bucket.cpp
new file mode 100644 (file)
index 0000000..160185d
--- /dev/null
@@ -0,0 +1,156 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "metadata-bucket.h"
+
+#include <common/buffer-view.h>
+#include <common/consumer/consumer.h>
+#include <common/dynamic-buffer.h>
+#include <common/macros.h>
+#include <common/error.h>
+
+struct metadata_bucket {
+       struct lttng_dynamic_buffer content;
+       struct {
+               metadata_bucket_flush_cb fn;
+               void *data;
+       } flush;
+       unsigned int buffer_count;
+};
+
+struct metadata_bucket *metadata_bucket_create(
+               metadata_bucket_flush_cb flush, void *data)
+{
+       struct metadata_bucket *bucket;
+
+       bucket = (metadata_bucket *) zmalloc(sizeof(typeof(*bucket)));
+       if (!bucket) {
+               PERROR("Failed to allocate buffer bucket");
+               goto end;
+       }
+
+       bucket->flush.fn = flush;
+       bucket->flush.data = data;
+       lttng_dynamic_buffer_init(&bucket->content);
+end:
+       return bucket;
+}
+
+void metadata_bucket_destroy(struct metadata_bucket *bucket)
+{
+       if (!bucket) {
+               return;
+       }
+
+       if (bucket->content.size > 0) {
+               WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
+                               bucket->content.size, bucket->buffer_count);
+       }
+
+       lttng_dynamic_buffer_reset(&bucket->content);
+       free(bucket);
+}
+
+void metadata_bucket_reset(struct metadata_bucket *bucket)
+{
+       lttng_dynamic_buffer_reset(&bucket->content);
+       lttng_dynamic_buffer_init(&bucket->content);
+       bucket->buffer_count = 0;
+}
+
+enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
+               const struct stream_subbuffer *buffer)
+{
+       ssize_t ret;
+       struct lttng_buffer_view flushed_view;
+       struct stream_subbuffer flushed_subbuffer;
+       enum metadata_bucket_status status;
+       const bool should_flush =
+                       LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent);
+       const size_t padding_this_buffer =
+                       buffer->info.metadata.padded_subbuf_size -
+                       buffer->info.metadata.subbuf_size;
+       size_t flush_size;
+
+       DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
+                       buffer->buffer.buffer.size,
+                       buffer->info.metadata.subbuf_size,
+                       buffer->info.metadata.padded_subbuf_size,
+                       buffer->info.metadata.coherent.value ? "true" : "false");
+       /*
+        * If no metadata was accumulated and this buffer should be
+        * flushed, don't copy it unecessarily; just flush it directly.
+        */
+       if (!should_flush || bucket->buffer_count != 0) {
+               /*
+                * Append the _padded_ subbuffer since they are combined
+                * into a single "virtual" subbuffer that will be
+                * flushed at once.
+                *
+                * This means that some padding will be sent over the
+                * network, but should not represent a large amount
+                * of data as incoherent subbuffers are typically
+                * pretty full.
+                *
+                * The padding of the last subbuffer (coherent) added to
+                * the bucket is not sent, which is what really matters
+                * from an efficiency point of view.
+                */
+               ret = lttng_dynamic_buffer_append_view(
+                       &bucket->content, &buffer->buffer.buffer);
+               if (ret) {
+                       status = METADATA_BUCKET_STATUS_ERROR;
+                       goto end;
+               }
+       }
+
+       bucket->buffer_count++;
+       if (!should_flush) {
+               status = METADATA_BUCKET_STATUS_OK;
+               goto end;
+       }
+
+       flushed_view = bucket->content.size != 0 ?
+               lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) :
+               lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1);
+
+       /*
+        * The flush is done with the size of all padded sub-buffers, except
+        * for the last one which we can safely "trim". The padding of the last
+        * packet will be reconstructed by the relay daemon.
+        */
+       flush_size = flushed_view.size - padding_this_buffer;
+
+       flushed_subbuffer = (typeof(flushed_subbuffer)) {
+               .buffer = {
+                       .buffer = flushed_view,
+               },
+               .info = {
+                       .metadata = {
+                               .subbuf_size = flush_size,
+                               .padded_subbuf_size = flushed_view.size,
+                               .version = buffer->info.metadata.version,
+                               .coherent = buffer->info.metadata.coherent,
+                       },
+               },
+       };
+
+       DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
+                       flushed_view.size, bucket->buffer_count,
+                       bucket->buffer_count > 1 ? "s" : "");
+       ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data);
+       if (ret >= 0) {
+               status = METADATA_BUCKET_STATUS_OK;
+       } else {
+               status = METADATA_BUCKET_STATUS_ERROR;
+       }
+
+       metadata_bucket_reset(bucket);
+
+end:
+       return status;
+}
This page took 0.024051 seconds and 4 git commands to generate.