fix: relayd: unaligned access in trace_chunk_registry_ht_key_hash
[lttng-tools.git] / src / common / consumer / consumer-metadata-cache.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9 #define _LGPL_SOURCE
10 #include "consumer-metadata-cache.hpp"
11
12 #include <common/common.hpp>
13 #include <common/consumer/consumer.hpp>
14 #include <common/sessiond-comm/sessiond-comm.hpp>
15 #include <common/ust-consumer/ust-consumer.hpp>
16 #include <common/utils.hpp>
17
18 #include <inttypes.h>
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 enum metadata_cache_update_version_status {
26 METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED,
27 METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED,
28 };
29
30 extern struct lttng_consumer_global_data the_consumer_data;
31
32 /*
33 * Reset the metadata cache.
34 */
35 static void metadata_cache_reset(struct consumer_metadata_cache *cache)
36 {
37 const int ret = lttng_dynamic_buffer_set_size(&cache->contents, 0);
38
39 LTTNG_ASSERT(ret == 0);
40 }
41
42 /*
43 * Check if the metadata cache version changed.
44 * If it did, reset the metadata cache.
45 * The metadata cache lock MUST be held.
46 */
47 static enum metadata_cache_update_version_status
48 metadata_cache_update_version(struct consumer_metadata_cache *cache, uint64_t version)
49 {
50 enum metadata_cache_update_version_status status;
51
52 if (cache->version == version) {
53 status = METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED;
54 goto end;
55 }
56
57 DBG("Metadata cache version update to %" PRIu64, version);
58 cache->version = version;
59 status = METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED;
60
61 end:
62 return status;
63 }
64
65 /*
66 * Write metadata to the cache, extend the cache if necessary. We support
67 * overlapping updates, but they need to be contiguous. Send the
68 * contiguous metadata in cache to the ring buffer. The metadata cache
69 * lock MUST be acquired to write in the cache.
70 *
71 * See `enum consumer_metadata_cache_write_status` for the meaning of the
72 * various returned status codes.
73 */
74 enum consumer_metadata_cache_write_status
75 consumer_metadata_cache_write(struct consumer_metadata_cache *cache,
76 unsigned int offset,
77 unsigned int len,
78 uint64_t version,
79 const char *data)
80 {
81 int ret = 0;
82 enum consumer_metadata_cache_write_status status;
83 bool cache_is_invalidated = false;
84 uint64_t original_size;
85
86 LTTNG_ASSERT(cache);
87 ASSERT_LOCKED(cache->lock);
88 original_size = cache->contents.size;
89
90 if (metadata_cache_update_version(cache, version) ==
91 METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) {
92 metadata_cache_reset(cache);
93 cache_is_invalidated = true;
94 }
95
96 DBG("Writing %u bytes from offset %u in metadata cache", len, offset);
97 if (offset + len > cache->contents.size) {
98 ret = lttng_dynamic_buffer_set_size(&cache->contents, offset + len);
99 if (ret) {
100 ERR("Extending metadata cache");
101 status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR;
102 goto end;
103 }
104 }
105
106 memcpy(cache->contents.data + offset, data, len);
107
108 if (cache_is_invalidated) {
109 status = CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED;
110 } else if (cache->contents.size > original_size) {
111 status = CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT;
112 } else {
113 status = CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE;
114 LTTNG_ASSERT(cache->contents.size == original_size);
115 }
116
117 end:
118 return status;
119 }
120
121 /*
122 * Create the metadata cache, original allocated size: max_sb_size
123 *
124 * Return 0 on success, a negative value on error.
125 */
126 int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel)
127 {
128 int ret;
129
130 LTTNG_ASSERT(channel);
131
132 channel->metadata_cache = zmalloc<consumer_metadata_cache>();
133 if (!channel->metadata_cache) {
134 PERROR("zmalloc metadata cache struct");
135 ret = -1;
136 goto end;
137 }
138 ret = pthread_mutex_init(&channel->metadata_cache->lock, nullptr);
139 if (ret != 0) {
140 PERROR("mutex init");
141 goto end_free_cache;
142 }
143
144 lttng_dynamic_buffer_init(&channel->metadata_cache->contents);
145 ret = lttng_dynamic_buffer_set_capacity(&channel->metadata_cache->contents,
146 DEFAULT_METADATA_CACHE_SIZE);
147 if (ret) {
148 PERROR("Failed to pre-allocate metadata cache storage of %d bytes on creation",
149 DEFAULT_METADATA_CACHE_SIZE);
150 ret = -1;
151 goto end_free_mutex;
152 }
153
154 DBG("Allocated metadata cache: current capacity = %zu",
155 lttng_dynamic_buffer_get_capacity_left(&channel->metadata_cache->contents));
156
157 ret = 0;
158 goto end;
159
160 end_free_mutex:
161 pthread_mutex_destroy(&channel->metadata_cache->lock);
162 end_free_cache:
163 free(channel->metadata_cache);
164 end:
165 return ret;
166 }
167
168 /*
169 * Destroy and free the metadata cache
170 */
171 void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel)
172 {
173 if (!channel || !channel->metadata_cache) {
174 return;
175 }
176
177 DBG("Destroying metadata cache");
178
179 pthread_mutex_destroy(&channel->metadata_cache->lock);
180 lttng_dynamic_buffer_reset(&channel->metadata_cache->contents);
181 free(channel->metadata_cache);
182 }
183
184 /*
185 * Check if the cache is flushed up to the offset passed in parameter.
186 *
187 * Return true if everything has been flushed, false if there is data not flushed.
188 */
189 namespace {
190 bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel,
191 uint64_t offset,
192 int timer)
193 {
194 bool done_flushing = false;
195 struct lttng_consumer_stream *metadata_stream;
196
197 /*
198 * If not called from a timer handler, we have to take the
199 * channel lock to be mutually exclusive with channel teardown.
200 * Timer handler does not need to take this lock because it is
201 * already synchronized by timer stop (and, more importantly,
202 * taking this lock in a timer handler would cause a deadlock).
203 */
204 if (!timer) {
205 pthread_mutex_lock(&channel->lock);
206 }
207 pthread_mutex_lock(&channel->timer_lock);
208 metadata_stream = channel->metadata_stream;
209 if (!metadata_stream) {
210 /*
211 * Having no metadata stream means the channel is being destroyed so there
212 * is no cache to flush anymore.
213 */
214 done_flushing = true;
215 goto end_unlock_channel;
216 }
217
218 pthread_mutex_lock(&metadata_stream->lock);
219 pthread_mutex_lock(&channel->metadata_cache->lock);
220
221 if (metadata_stream->ust_metadata_pushed >= offset) {
222 done_flushing = true;
223 } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
224 /* An inactive endpoint means we don't have to flush anymore. */
225 done_flushing = true;
226 } else {
227 /* Still not completely flushed. */
228 done_flushing = false;
229 }
230
231 pthread_mutex_unlock(&channel->metadata_cache->lock);
232 pthread_mutex_unlock(&metadata_stream->lock);
233
234 end_unlock_channel:
235 pthread_mutex_unlock(&channel->timer_lock);
236 if (!timer) {
237 pthread_mutex_unlock(&channel->lock);
238 }
239
240 return done_flushing;
241 }
242 } /* namespace */
243
244 /*
245 * Wait until the cache is flushed up to the offset passed in parameter or the
246 * metadata stream has been destroyed.
247 */
248 void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
249 uint64_t offset,
250 bool invoked_by_timer)
251 {
252 assert(channel);
253 assert(channel->metadata_cache);
254
255 if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
256 return;
257 }
258
259 /* Metadata cache is not currently flushed, wait on wait queue. */
260 for (;;) {
261 lttng::synchro::waiter waiter;
262
263 channel->metadata_pushed_wait_queue.add(waiter);
264 if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
265 /* Wake up all waiters, ourself included. */
266 channel->metadata_pushed_wait_queue.wake_all();
267 /* Ensure proper teardown of waiter. */
268 waiter.wait();
269 break;
270 }
271
272 waiter.wait();
273 }
274 }
This page took 0.03459 seconds and 4 git commands to generate.