+++ /dev/null
-/*
- * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * SPDX-License-Identifier: GPL-2.0-only
- *
- */
-
-#include "metadata-bucket.h"
-
-#include <common/buffer-view.h>
-#include <common/consumer/consumer.h>
-#include <common/dynamic-buffer.h>
-#include <common/macros.h>
-#include <common/error.h>
-
-struct metadata_bucket {
- struct lttng_dynamic_buffer content;
- struct {
- metadata_bucket_flush_cb fn;
- void *data;
- } flush;
- unsigned int buffer_count;
-};
-
-struct metadata_bucket *metadata_bucket_create(
- metadata_bucket_flush_cb flush, void *data)
-{
- struct metadata_bucket *bucket;
-
- bucket = zmalloc(sizeof(typeof(*bucket)));
- if (!bucket) {
- PERROR("Failed to allocate buffer bucket");
- goto end;
- }
-
- bucket->flush.fn = flush;
- bucket->flush.data = data;
- lttng_dynamic_buffer_init(&bucket->content);
-end:
- return bucket;
-}
-
-void metadata_bucket_destroy(struct metadata_bucket *bucket)
-{
- if (!bucket) {
- return;
- }
-
- if (bucket->content.size > 0) {
- WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
- bucket->content.size, bucket->buffer_count);
- }
-
- lttng_dynamic_buffer_reset(&bucket->content);
- free(bucket);
-}
-
-void metadata_bucket_reset(struct metadata_bucket *bucket)
-{
- lttng_dynamic_buffer_reset(&bucket->content);
- lttng_dynamic_buffer_init(&bucket->content);
- bucket->buffer_count = 0;
-}
-
-enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
- const struct stream_subbuffer *buffer)
-{
- ssize_t ret;
- struct lttng_buffer_view flushed_view;
- struct stream_subbuffer flushed_subbuffer;
- enum metadata_bucket_status status;
- const bool should_flush =
- LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent);
- const size_t padding_this_buffer =
- buffer->info.metadata.padded_subbuf_size -
- buffer->info.metadata.subbuf_size;
- size_t flush_size;
-
- DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
- buffer->buffer.buffer.size,
- buffer->info.metadata.subbuf_size,
- buffer->info.metadata.padded_subbuf_size,
- buffer->info.metadata.coherent.value ? "true" : "false");
- /*
- * If no metadata was accumulated and this buffer should be
- * flushed, don't copy it unecessarily; just flush it directly.
- */
- if (!should_flush || bucket->buffer_count != 0) {
- /*
- * Append the _padded_ subbuffer since they are combined
- * into a single "virtual" subbuffer that will be
- * flushed at once.
- *
- * This means that some padding will be sent over the
- * network, but should not represent a large amount
- * of data as incoherent subbuffers are typically
- * pretty full.
- *
- * The padding of the last subbuffer (coherent) added to
- * the bucket is not sent, which is what really matters
- * from an efficiency point of view.
- */
- ret = lttng_dynamic_buffer_append_view(
- &bucket->content, &buffer->buffer.buffer);
- if (ret) {
- status = METADATA_BUCKET_STATUS_ERROR;
- goto end;
- }
- }
-
- bucket->buffer_count++;
- if (!should_flush) {
- status = METADATA_BUCKET_STATUS_OK;
- goto end;
- }
-
- flushed_view = bucket->content.size != 0 ?
- lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) :
- lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1);
-
- /*
- * The flush is done with the size of all padded sub-buffers, except
- * for the last one which we can safely "trim". The padding of the last
- * packet will be reconstructed by the relay daemon.
- */
- flush_size = flushed_view.size - padding_this_buffer;
-
- flushed_subbuffer = (typeof(flushed_subbuffer)) {
- .buffer.buffer = flushed_view,
- .info.metadata.subbuf_size = flush_size,
- .info.metadata.padded_subbuf_size = flushed_view.size,
- .info.metadata.version = buffer->info.metadata.version,
- .info.metadata.coherent = buffer->info.metadata.coherent,
- };
-
- DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
- flushed_view.size, bucket->buffer_count,
- bucket->buffer_count > 1 ? "s" : "");
- ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data);
- if (ret >= 0) {
- status = METADATA_BUCKET_STATUS_OK;
- } else {
- status = METADATA_BUCKET_STATUS_ERROR;
- }
-
- metadata_bucket_reset(bucket);
-
-end:
- return status;
-}