e3b8bde8230717a4f5e225d124f307aba135a4f1
[lttng-tools.git] / src / bin / lttng-relayd / index.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11
12 #include "connection.hpp"
13 #include "index.hpp"
14 #include "lttng-relayd.hpp"
15 #include "stream.hpp"
16
17 #include <common/common.hpp>
18 #include <common/compat/endian.hpp>
19 #include <common/urcu.hpp>
20 #include <common/utils.hpp>
21
22 /*
23 * Allocate a new relay index object. Pass the stream in which it is
24 * contained as parameter. The sequence number will be used as the hash
25 * table key.
26 *
27 * Called with stream mutex held.
28 * Return allocated object or else NULL on error.
29 */
30 static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num)
31 {
32 struct relay_index *index;
33
34 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
35 stream->stream_handle,
36 net_seq_num);
37
38 index = zmalloc<relay_index>();
39 if (!index) {
40 PERROR("Relay index zmalloc");
41 goto end;
42 }
43 if (!stream_get(stream)) {
44 ERR("Cannot get stream");
45 free(index);
46 index = nullptr;
47 goto end;
48 }
49 index->stream = stream;
50
51 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
52 pthread_mutex_init(&index->lock, nullptr);
53 urcu_ref_init(&index->ref);
54
55 end:
56 return index;
57 }
58
59 /*
60 * Add unique relay index to the given hash table. In case of a collision, the
61 * already existing object is put in the given _index variable.
62 *
63 * RCU read side lock MUST be acquired.
64 */
65 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
66 struct relay_index *index)
67 {
68 struct cds_lfht_node *node_ptr;
69 struct relay_index *_index;
70
71 ASSERT_RCU_READ_LOCKED();
72
73 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
74 stream->stream_handle,
75 index->index_n.key);
76
77 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
78 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
79 stream->indexes_ht->match_fct,
80 &index->index_n,
81 &index->index_n.node);
82 if (node_ptr != &index->index_n.node) {
83 _index = caa_container_of(node_ptr, struct relay_index, index_n.node);
84 } else {
85 _index = nullptr;
86 }
87 return _index;
88 }
89
90 /*
91 * Should be called with RCU read-side lock held.
92 */
93 static bool relay_index_get(struct relay_index *index)
94 {
95 ASSERT_RCU_READ_LOCKED();
96
97 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
98 index->stream->stream_handle,
99 index->index_n.key,
100 (int) index->ref.refcount);
101
102 return urcu_ref_get_unless_zero(&index->ref);
103 }
104
105 /*
106 * Get a relayd index in within the given stream, or create it if not
107 * present.
108 *
109 * Called with stream mutex held.
110 * Return index object or else NULL on error.
111 */
112 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
113 uint64_t net_seq_num)
114 {
115 struct lttng_ht_node_u64 *node;
116 struct lttng_ht_iter iter;
117 struct relay_index *index = nullptr;
118
119 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
120 stream->stream_handle,
121 net_seq_num);
122
123 lttng::urcu::read_lock_guard read_lock;
124 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
125 node = lttng_ht_iter_get_node_u64(&iter);
126 if (node) {
127 index = lttng::utils::container_of(node, &relay_index::index_n);
128 } else {
129 struct relay_index *oldindex;
130
131 index = relay_index_create(stream, net_seq_num);
132 if (!index) {
133 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
134 stream->stream_handle,
135 net_seq_num);
136 goto end;
137 }
138 oldindex = relay_index_add_unique(stream, index);
139 if (oldindex) {
140 /* Added concurrently, keep old. */
141 relay_index_put(index);
142 index = oldindex;
143 if (!relay_index_get(index)) {
144 index = nullptr;
145 }
146 } else {
147 stream->indexes_in_flight++;
148 index->in_hash_table = true;
149 }
150 }
151 end:
152 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
153 (index == NULL) ? "NOT " : "",
154 stream->stream_handle,
155 net_seq_num);
156 return index;
157 }
158
159 int relay_index_set_file(struct relay_index *index,
160 struct lttng_index_file *index_file,
161 uint64_t data_offset)
162 {
163 int ret = 0;
164
165 pthread_mutex_lock(&index->lock);
166 if (index->index_file) {
167 ret = -1;
168 goto end;
169 }
170 lttng_index_file_get(index_file);
171 index->index_file = index_file;
172 index->index_data.offset = data_offset;
173 end:
174 pthread_mutex_unlock(&index->lock);
175 return ret;
176 }
177
178 int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data)
179 {
180 int ret = 0;
181
182 pthread_mutex_lock(&index->lock);
183 if (index->has_index_data) {
184 ret = -1;
185 goto end;
186 }
187 /* Set everything except data_offset. */
188 index->index_data.packet_size = data->packet_size;
189 index->index_data.content_size = data->content_size;
190 index->index_data.timestamp_begin = data->timestamp_begin;
191 index->index_data.timestamp_end = data->timestamp_end;
192 index->index_data.events_discarded = data->events_discarded;
193 index->index_data.stream_id = data->stream_id;
194 index->has_index_data = true;
195 end:
196 pthread_mutex_unlock(&index->lock);
197 return ret;
198 }
199
200 static void index_destroy(struct relay_index *index)
201 {
202 free(index);
203 }
204
205 static void index_destroy_rcu(struct rcu_head *rcu_head)
206 {
207 struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node);
208
209 index_destroy(index);
210 }
211
212 /* Stream lock must be held by the caller. */
213 static void index_release(struct urcu_ref *ref)
214 {
215 struct relay_index *index = lttng::utils::container_of(ref, &relay_index::ref);
216 struct relay_stream *stream = index->stream;
217 int ret;
218 struct lttng_ht_iter iter;
219
220 if (index->index_file) {
221 lttng_index_file_put(index->index_file);
222 index->index_file = nullptr;
223 }
224 if (index->in_hash_table) {
225 /* Delete index from hash table. */
226 iter.iter.node = &index->index_n.node;
227 ret = lttng_ht_del(stream->indexes_ht, &iter);
228 LTTNG_ASSERT(!ret);
229 stream->indexes_in_flight--;
230 }
231
232 stream_put(index->stream);
233 index->stream = nullptr;
234
235 call_rcu(&index->rcu_node, index_destroy_rcu);
236 }
237
238 /*
239 * Called with stream mutex held.
240 *
241 * Stream lock must be held by the caller.
242 */
243 void relay_index_put(struct relay_index *index)
244 {
245 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
246 index->stream->stream_handle,
247 index->index_n.key,
248 (int) index->ref.refcount);
249 /*
250 * Ensure existence of index->lock for index unlock.
251 */
252 lttng::urcu::read_lock_guard read_lock;
253 /*
254 * Index lock ensures that concurrent test and update of stream
255 * ref is atomic.
256 */
257 LTTNG_ASSERT(index->ref.refcount != 0);
258 urcu_ref_put(&index->ref, index_release);
259 }
260
261 /*
262 * Try to flush index to disk. Releases self-reference to index once
263 * flush succeeds.
264 *
265 * Stream lock must be held by the caller.
266 * Return 0 on successful flush, a negative value on error, or positive
267 * value if no flush was performed.
268 */
269 int relay_index_try_flush(struct relay_index *index)
270 {
271 int ret = 1;
272 bool flushed = false;
273
274 pthread_mutex_lock(&index->lock);
275 if (index->flushed) {
276 goto skip;
277 }
278 /* Check if we are ready to flush. */
279 if (!index->has_index_data || !index->index_file) {
280 goto skip;
281 }
282
283 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
284 index->stream->stream_handle,
285 index->index_n.key);
286 flushed = true;
287 index->flushed = true;
288 ret = lttng_index_file_write(index->index_file, &index->index_data);
289 skip:
290 pthread_mutex_unlock(&index->lock);
291
292 if (flushed) {
293 /* Put self-ref from index now that it has been flushed. */
294 relay_index_put(index);
295 }
296 return ret;
297 }
298
299 /*
300 * Close every relay index within a given stream, without flushing
301 * them.
302 */
303 void relay_index_close_all(struct relay_stream *stream)
304 {
305 struct lttng_ht_iter iter;
306 struct relay_index *index;
307
308 lttng::urcu::read_lock_guard read_lock;
309
310 cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
311 /* Put self-ref from index. */
312 relay_index_put(index);
313 }
314 }
315
316 void relay_index_close_partial_fd(struct relay_stream *stream)
317 {
318 struct lttng_ht_iter iter;
319 struct relay_index *index;
320
321 lttng::urcu::read_lock_guard read_lock;
322
323 cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
324 if (!index->index_file) {
325 continue;
326 }
327 /*
328 * Partial index has its index_file: we have only
329 * received its info from the data socket.
330 * Put self-ref from index.
331 */
332 relay_index_put(index);
333 }
334 }
335
336 uint64_t relay_index_find_last(struct relay_stream *stream)
337 {
338 struct lttng_ht_iter iter;
339 struct relay_index *index;
340 uint64_t net_seq_num = -1ULL;
341
342 lttng::urcu::read_lock_guard read_lock;
343
344 cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
345 if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) {
346 net_seq_num = index->index_n.key;
347 }
348 }
349
350 return net_seq_num;
351 }
352
353 /*
354 * Update the index file of an already existing relay_index.
355 * Offsets by 'removed_data_count' the offset field of an index.
356 */
357 static int relay_index_switch_file(struct relay_index *index,
358 struct lttng_index_file *new_index_file,
359 uint64_t removed_data_count)
360 {
361 int ret = 0;
362 uint64_t offset;
363
364 pthread_mutex_lock(&index->lock);
365 if (!index->index_file) {
366 ERR("No index_file");
367 ret = 0;
368 goto end;
369 }
370
371 lttng_index_file_put(index->index_file);
372 lttng_index_file_get(new_index_file);
373 index->index_file = new_index_file;
374 offset = be64toh(index->index_data.offset);
375 index->index_data.offset = htobe64(offset - removed_data_count);
376
377 end:
378 pthread_mutex_unlock(&index->lock);
379 return ret;
380 }
381
382 /*
383 * Switch the index file of all pending indexes for a stream and update the
384 * data offset by substracting the last safe position.
385 * Stream lock must be held.
386 */
387 int relay_index_switch_all_files(struct relay_stream *stream)
388 {
389 struct lttng_ht_iter iter;
390 struct relay_index *index;
391 int ret = 0;
392
393 lttng::urcu::read_lock_guard read_lock;
394
395 cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
396 ret = relay_index_switch_file(
397 index, stream->index_file, stream->pos_after_last_complete_data_index);
398 if (ret) {
399 return ret;
400 }
401 }
402
403 return ret;
404 }
405
406 /*
407 * Set index data from the control port to a given index object.
408 */
409 int relay_index_set_control_data(struct relay_index *index,
410 const struct lttcomm_relayd_index *data,
411 unsigned int minor_version)
412 {
413 /* The index on disk is encoded in big endian. */
414 ctf_packet_index index_data{};
415
416 index_data.packet_size = htobe64(data->packet_size);
417 index_data.content_size = htobe64(data->content_size);
418 index_data.timestamp_begin = htobe64(data->timestamp_begin);
419 index_data.timestamp_end = htobe64(data->timestamp_end);
420 index_data.events_discarded = htobe64(data->events_discarded);
421 index_data.stream_id = htobe64(data->stream_id);
422
423 if (minor_version >= 8) {
424 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
425 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
426 } else {
427 uint64_t unset_value = -1ULL;
428
429 index->index_data.stream_instance_id = htobe64(unset_value);
430 index->index_data.packet_seq_num = htobe64(unset_value);
431 }
432
433 return relay_index_set_data(index, &index_data);
434 }
This page took 0.037539 seconds and 4 git commands to generate.