Fix: consumerd: live client receives incomplete metadata
[lttng-tools.git] / src / common / consumer / metadata-bucket.c
CommitLineData
f5ba75b4
JG
1/*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8#include "metadata-bucket.h"
9
10#include <common/buffer-view.h>
11#include <common/consumer/consumer.h>
12#include <common/dynamic-buffer.h>
13#include <common/macros.h>
14#include <common/error.h>
15
16struct metadata_bucket {
17 struct lttng_dynamic_buffer content;
18 struct {
19 metadata_bucket_flush_cb fn;
20 void *data;
21 } flush;
22 unsigned int buffer_count;
23};
24
25struct metadata_bucket *metadata_bucket_create(
26 metadata_bucket_flush_cb flush, void *data)
27{
28 struct metadata_bucket *bucket;
29
30 bucket = zmalloc(sizeof(typeof(*bucket)));
31 if (!bucket) {
32 PERROR("Failed to allocate buffer bucket");
33 goto end;
34 }
35
36 bucket->flush.fn = flush;
37 bucket->flush.data = data;
38 lttng_dynamic_buffer_init(&bucket->content);
39end:
40 return bucket;
41}
42
43void metadata_bucket_destroy(struct metadata_bucket *bucket)
44{
45 if (!bucket) {
46 return;
47 }
48
49 if (bucket->content.size > 0) {
50 WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
51 bucket->content.size, bucket->buffer_count);
52 }
53
54 lttng_dynamic_buffer_reset(&bucket->content);
55 free(bucket);
56}
57
58void metadata_bucket_reset(struct metadata_bucket *bucket)
59{
60 lttng_dynamic_buffer_reset(&bucket->content);
61 lttng_dynamic_buffer_init(&bucket->content);
62 bucket->buffer_count = 0;
63}
64
65enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
66 const struct stream_subbuffer *buffer)
67{
68 ssize_t ret;
69 struct lttng_buffer_view flushed_view;
70 struct stream_subbuffer flushed_subbuffer;
71 enum metadata_bucket_status status;
72 const bool should_flush =
73 LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent);
74 const size_t padding_this_buffer =
75 buffer->info.metadata.padded_subbuf_size -
76 buffer->info.metadata.subbuf_size;
77 size_t flush_size;
78
79 DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
80 buffer->buffer.buffer.size,
81 buffer->info.metadata.subbuf_size,
82 buffer->info.metadata.padded_subbuf_size,
83 buffer->info.metadata.coherent.value ? "true" : "false");
84 /*
85 * If no metadata was accumulated and this buffer should be
86 * flushed, don't copy it unecessarily; just flush it directly.
87 */
88 if (!should_flush || bucket->buffer_count != 0) {
89 /*
90 * Append the _padded_ subbuffer since they are combined
91 * into a single "virtual" subbuffer that will be
92 * flushed at once.
93 *
94 * This means that some padding will be sent over the
95 * network, but should not represent a large amount
96 * of data as incoherent subbuffers are typically
97 * pretty full.
98 *
99 * The padding of the last subbuffer (coherent) added to
100 * the bucket is not sent, which is what really matters
101 * from an efficiency point of view.
102 */
103 ret = lttng_dynamic_buffer_append_view(
104 &bucket->content, &buffer->buffer.buffer);
105 if (ret) {
106 status = METADATA_BUCKET_STATUS_ERROR;
107 goto end;
108 }
109 }
110
111 bucket->buffer_count++;
112 if (!should_flush) {
113 status = METADATA_BUCKET_STATUS_OK;
114 goto end;
115 }
116
117 flushed_view = bucket->content.size != 0 ?
118 lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) :
119 lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1);
120
121 /*
122 * The flush is done with the size of all padded sub-buffers, except
123 * for the last one which we can safely "trim". The padding of the last
124 * packet will be reconstructed by the relay daemon.
125 */
126 flush_size = flushed_view.size - padding_this_buffer;
127
128 flushed_subbuffer = (typeof(flushed_subbuffer)) {
129 .buffer.buffer = flushed_view,
130 .info.metadata.subbuf_size = flush_size,
131 .info.metadata.padded_subbuf_size = flushed_view.size,
132 .info.metadata.version = buffer->info.metadata.version,
133 .info.metadata.coherent = buffer->info.metadata.coherent,
134 };
135
136 DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
137 flushed_view.size, bucket->buffer_count,
138 bucket->buffer_count > 1 ? "s" : "");
139 ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data);
140 if (ret >= 0) {
141 status = METADATA_BUCKET_STATUS_OK;
142 } else {
143 status = METADATA_BUCKET_STATUS_ERROR;
144 }
145
146 metadata_bucket_reset(bucket);
147
148end:
149 return status;
150}
This page took 0.028433 seconds and 4 git commands to generate.