2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
8 #include "metadata-bucket.h"
10 #include <common/buffer-view.h>
11 #include <common/consumer/consumer.h>
12 #include <common/dynamic-buffer.h>
13 #include <common/macros.h>
14 #include <common/error.h>
16 struct metadata_bucket
{
17 struct lttng_dynamic_buffer content
;
19 metadata_bucket_flush_cb fn
;
22 unsigned int buffer_count
;
25 struct metadata_bucket
*metadata_bucket_create(
26 metadata_bucket_flush_cb flush
, void *data
)
28 struct metadata_bucket
*bucket
;
30 bucket
= zmalloc(sizeof(typeof(*bucket
)));
32 PERROR("Failed to allocate buffer bucket");
36 bucket
->flush
.fn
= flush
;
37 bucket
->flush
.data
= data
;
38 lttng_dynamic_buffer_init(&bucket
->content
);
43 void metadata_bucket_destroy(struct metadata_bucket
*bucket
)
49 if (bucket
->content
.size
> 0) {
50 WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
51 bucket
->content
.size
, bucket
->buffer_count
);
54 lttng_dynamic_buffer_reset(&bucket
->content
);
58 void metadata_bucket_reset(struct metadata_bucket
*bucket
)
60 lttng_dynamic_buffer_reset(&bucket
->content
);
61 lttng_dynamic_buffer_init(&bucket
->content
);
62 bucket
->buffer_count
= 0;
65 enum metadata_bucket_status
metadata_bucket_fill(struct metadata_bucket
*bucket
,
66 const struct stream_subbuffer
*buffer
)
69 struct lttng_buffer_view flushed_view
;
70 struct stream_subbuffer flushed_subbuffer
;
71 enum metadata_bucket_status status
;
72 const bool should_flush
=
73 LTTNG_OPTIONAL_GET(buffer
->info
.metadata
.coherent
);
74 const size_t padding_this_buffer
=
75 buffer
->info
.metadata
.padded_subbuf_size
-
76 buffer
->info
.metadata
.subbuf_size
;
79 DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
80 buffer
->buffer
.buffer
.size
,
81 buffer
->info
.metadata
.subbuf_size
,
82 buffer
->info
.metadata
.padded_subbuf_size
,
83 buffer
->info
.metadata
.coherent
.value
? "true" : "false");
85 * If no metadata was accumulated and this buffer should be
86 * flushed, don't copy it unecessarily; just flush it directly.
88 if (!should_flush
|| bucket
->buffer_count
!= 0) {
90 * Append the _padded_ subbuffer since they are combined
91 * into a single "virtual" subbuffer that will be
94 * This means that some padding will be sent over the
95 * network, but should not represent a large amount
96 * of data as incoherent subbuffers are typically
99 * The padding of the last subbuffer (coherent) added to
100 * the bucket is not sent, which is what really matters
101 * from an efficiency point of view.
103 ret
= lttng_dynamic_buffer_append_view(
104 &bucket
->content
, &buffer
->buffer
.buffer
);
106 status
= METADATA_BUCKET_STATUS_ERROR
;
111 bucket
->buffer_count
++;
113 status
= METADATA_BUCKET_STATUS_OK
;
117 flushed_view
= bucket
->content
.size
!= 0 ?
118 lttng_buffer_view_from_dynamic_buffer(&bucket
->content
, 0, -1) :
119 lttng_buffer_view_from_view(&buffer
->buffer
.buffer
, 0, -1);
122 * The flush is done with the size of all padded sub-buffers, except
123 * for the last one which we can safely "trim". The padding of the last
124 * packet will be reconstructed by the relay daemon.
126 flush_size
= flushed_view
.size
- padding_this_buffer
;
128 flushed_subbuffer
= (typeof(flushed_subbuffer
)) {
129 .buffer
.buffer
= flushed_view
,
130 .info
.metadata
.subbuf_size
= flush_size
,
131 .info
.metadata
.padded_subbuf_size
= flushed_view
.size
,
132 .info
.metadata
.version
= buffer
->info
.metadata
.version
,
133 .info
.metadata
.coherent
= buffer
->info
.metadata
.coherent
,
136 DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
137 flushed_view
.size
, bucket
->buffer_count
,
138 bucket
->buffer_count
> 1 ? "s" : "");
139 ret
= bucket
->flush
.fn(&flushed_subbuffer
, bucket
->flush
.data
);
141 status
= METADATA_BUCKET_STATUS_OK
;
143 status
= METADATA_BUCKET_STATUS_ERROR
;
146 metadata_bucket_reset(bucket
);