Add type-checked versions of allocation and deallocations functions
[lttng-tools.git] / src / bin / lttng-relayd / index.cpp
CommitLineData
1c20f0e2 1/*
ab5be9fa
MJ
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
1c20f0e2 7 *
1c20f0e2
JD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
1c20f0e2 11
c9e313bc
SM
12#include <common/common.hpp>
13#include <common/utils.hpp>
14#include <common/compat/endian.hpp>
1c20f0e2 15
c9e313bc
SM
16#include "lttng-relayd.hpp"
17#include "stream.hpp"
18#include "index.hpp"
19#include "connection.hpp"
1c20f0e2
JD
20
21/*
7591bab1
MD
22 * Allocate a new relay index object. Pass the stream in which it is
23 * contained as parameter. The sequence number will be used as the hash
24 * table key.
25 *
26 * Called with stream mutex held.
27 * Return allocated object or else NULL on error.
1c20f0e2 28 */
7591bab1
MD
29static struct relay_index *relay_index_create(struct relay_stream *stream,
30 uint64_t net_seq_num)
1c20f0e2 31{
7591bab1 32 struct relay_index *index;
1c20f0e2 33
7591bab1
MD
34 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
35 stream->stream_handle, net_seq_num);
1c20f0e2 36
64803277 37 index = zmalloc<relay_index>();
7591bab1
MD
38 if (!index) {
39 PERROR("Relay index zmalloc");
40 goto end;
41 }
42 if (!stream_get(stream)) {
43 ERR("Cannot get stream");
44 free(index);
45 index = NULL;
46 goto end;
1c20f0e2 47 }
7591bab1 48 index->stream = stream;
1c20f0e2 49
7591bab1
MD
50 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
51 pthread_mutex_init(&index->lock, NULL);
7591bab1
MD
52 urcu_ref_init(&index->ref);
53
54end:
55 return index;
1c20f0e2
JD
56}
57
58/*
7591bab1
MD
59 * Add unique relay index to the given hash table. In case of a collision, the
60 * already existing object is put in the given _index variable.
1c20f0e2 61 *
7591bab1 62 * RCU read side lock MUST be acquired.
1c20f0e2 63 */
7591bab1
MD
64static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
65 struct relay_index *index)
1c20f0e2 66{
7591bab1
MD
67 struct cds_lfht_node *node_ptr;
68 struct relay_index *_index;
1c20f0e2 69
48b7cdc2
FD
70 ASSERT_RCU_READ_LOCKED();
71
7591bab1
MD
72 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
73 stream->stream_handle, index->index_n.key);
1c20f0e2 74
7591bab1
MD
75 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
76 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
77 stream->indexes_ht->match_fct, &index->index_n,
78 &index->index_n.node);
79 if (node_ptr != &index->index_n.node) {
80 _index = caa_container_of(node_ptr, struct relay_index,
81 index_n.node);
82 } else {
83 _index = NULL;
1c20f0e2 84 }
7591bab1
MD
85 return _index;
86}
87
88/*
89 * Should be called with RCU read-side lock held.
90 */
91static bool relay_index_get(struct relay_index *index)
92{
48b7cdc2
FD
93 ASSERT_RCU_READ_LOCKED();
94
7591bab1
MD
95 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
96 index->stream->stream_handle, index->index_n.key,
97 (int) index->ref.refcount);
1c20f0e2 98
ce4d4083 99 return urcu_ref_get_unless_zero(&index->ref);
1c20f0e2
JD
100}
101
102/*
7591bab1
MD
103 * Get a relayd index in within the given stream, or create it if not
104 * present.
1c20f0e2 105 *
7591bab1 106 * Called with stream mutex held.
1c20f0e2
JD
107 * Return index object or else NULL on error.
108 */
7591bab1
MD
109struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
110 uint64_t net_seq_num)
1c20f0e2 111{
7591bab1 112 struct lttng_ht_node_u64 *node;
1c20f0e2 113 struct lttng_ht_iter iter;
1c20f0e2
JD
114 struct relay_index *index = NULL;
115
1c20f0e2 116 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
7591bab1 117 stream->stream_handle, net_seq_num);
1c20f0e2 118
7591bab1
MD
119 rcu_read_lock();
120 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
121 node = lttng_ht_iter_get_node_u64(&iter);
122 if (node) {
123 index = caa_container_of(node, struct relay_index, index_n);
124 } else {
125 struct relay_index *oldindex;
126
127 index = relay_index_create(stream, net_seq_num);
128 if (!index) {
129 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
811a4037 130 stream->stream_handle, net_seq_num);
7591bab1
MD
131 goto end;
132 }
133 oldindex = relay_index_add_unique(stream, index);
134 if (oldindex) {
135 /* Added concurrently, keep old. */
136 relay_index_put(index);
137 index = oldindex;
138 if (!relay_index_get(index)) {
139 index = NULL;
140 }
141 } else {
142 stream->indexes_in_flight++;
143 index->in_hash_table = true;
144 }
1c20f0e2 145 }
1c20f0e2 146end:
7591bab1
MD
147 rcu_read_unlock();
148 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
719155c2 149 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
1c20f0e2
JD
150 return index;
151}
152
f8f3885c
MD
153int relay_index_set_file(struct relay_index *index,
154 struct lttng_index_file *index_file,
7591bab1 155 uint64_t data_offset)
1c20f0e2 156{
7591bab1 157 int ret = 0;
1c20f0e2 158
7591bab1 159 pthread_mutex_lock(&index->lock);
f8f3885c 160 if (index->index_file) {
7591bab1
MD
161 ret = -1;
162 goto end;
163 }
f8f3885c
MD
164 lttng_index_file_get(index_file);
165 index->index_file = index_file;
7591bab1
MD
166 index->index_data.offset = data_offset;
167end:
168 pthread_mutex_unlock(&index->lock);
169 return ret;
170}
1c20f0e2 171
7591bab1 172int relay_index_set_data(struct relay_index *index,
211d28db 173 const struct ctf_packet_index *data)
7591bab1
MD
174{
175 int ret = 0;
1c20f0e2 176
7591bab1
MD
177 pthread_mutex_lock(&index->lock);
178 if (index->has_index_data) {
179 ret = -1;
180 goto end;
1c20f0e2 181 }
7591bab1
MD
182 /* Set everything except data_offset. */
183 index->index_data.packet_size = data->packet_size;
184 index->index_data.content_size = data->content_size;
185 index->index_data.timestamp_begin = data->timestamp_begin;
186 index->index_data.timestamp_end = data->timestamp_end;
187 index->index_data.events_discarded = data->events_discarded;
188 index->index_data.stream_id = data->stream_id;
189 index->has_index_data = true;
190end:
191 pthread_mutex_unlock(&index->lock);
192 return ret;
1c20f0e2
JD
193}
194
7591bab1 195static void index_destroy(struct relay_index *index)
1c20f0e2 196{
7591bab1
MD
197 free(index);
198}
1c20f0e2 199
7591bab1
MD
200static void index_destroy_rcu(struct rcu_head *rcu_head)
201{
202 struct relay_index *index =
203 caa_container_of(rcu_head, struct relay_index, rcu_node);
1c20f0e2 204
7591bab1 205 index_destroy(index);
1c20f0e2
JD
206}
207
7591bab1
MD
208/* Stream lock must be held by the caller. */
209static void index_release(struct urcu_ref *ref)
1c20f0e2 210{
7591bab1
MD
211 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
212 struct relay_stream *stream = index->stream;
213 int ret;
214 struct lttng_ht_iter iter;
215
f8f3885c
MD
216 if (index->index_file) {
217 lttng_index_file_put(index->index_file);
218 index->index_file = NULL;
7591bab1
MD
219 }
220 if (index->in_hash_table) {
221 /* Delete index from hash table. */
222 iter.iter.node = &index->index_n.node;
223 ret = lttng_ht_del(stream->indexes_ht, &iter);
a0377dfe 224 LTTNG_ASSERT(!ret);
7591bab1
MD
225 stream->indexes_in_flight--;
226 }
227
228 stream_put(index->stream);
229 index->stream = NULL;
230
231 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
232}
233
234/*
7591bab1
MD
235 * Called with stream mutex held.
236 *
237 * Stream lock must be held by the caller.
1c20f0e2 238 */
7591bab1 239void relay_index_put(struct relay_index *index)
1c20f0e2 240{
7591bab1
MD
241 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
242 index->stream->stream_handle, index->index_n.key,
243 (int) index->ref.refcount);
244 /*
0f1b1d25 245 * Ensure existence of index->lock for index unlock.
7591bab1
MD
246 */
247 rcu_read_lock();
248 /*
249 * Index lock ensures that concurrent test and update of stream
250 * ref is atomic.
251 */
a0377dfe 252 LTTNG_ASSERT(index->ref.refcount != 0);
7591bab1 253 urcu_ref_put(&index->ref, index_release);
7591bab1 254 rcu_read_unlock();
1c20f0e2
JD
255}
256
257/*
7591bab1
MD
258 * Try to flush index to disk. Releases self-reference to index once
259 * flush succeeds.
1c20f0e2 260 *
7591bab1
MD
261 * Stream lock must be held by the caller.
262 * Return 0 on successful flush, a negative value on error, or positive
263 * value if no flush was performed.
1c20f0e2 264 */
7591bab1 265int relay_index_try_flush(struct relay_index *index)
1c20f0e2 266{
7591bab1
MD
267 int ret = 1;
268 bool flushed = false;
1c20f0e2 269
7591bab1
MD
270 pthread_mutex_lock(&index->lock);
271 if (index->flushed) {
272 goto skip;
273 }
274 /* Check if we are ready to flush. */
f8f3885c 275 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
276 goto skip;
277 }
8bb66c3c
JG
278
279 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
280 index->stream->stream_handle, index->index_n.key);
7591bab1
MD
281 flushed = true;
282 index->flushed = true;
f8f3885c 283 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
284skip:
285 pthread_mutex_unlock(&index->lock);
1c20f0e2 286
7591bab1
MD
287 if (flushed) {
288 /* Put self-ref from index now that it has been flushed. */
289 relay_index_put(index);
290 }
291 return ret;
1c20f0e2
JD
292}
293
294/*
7591bab1
MD
295 * Close every relay index within a given stream, without flushing
296 * them.
1c20f0e2 297 */
7591bab1 298void relay_index_close_all(struct relay_stream *stream)
1c20f0e2
JD
299{
300 struct lttng_ht_iter iter;
301 struct relay_index *index;
302
1c20f0e2 303 rcu_read_lock();
7591bab1
MD
304 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
305 index, index_n.node) {
306 /* Put self-ref from index. */
307 relay_index_put(index);
1c20f0e2
JD
308 }
309 rcu_read_unlock();
310}
3d07a857
MD
311
312void relay_index_close_partial_fd(struct relay_stream *stream)
313{
314 struct lttng_ht_iter iter;
315 struct relay_index *index;
316
317 rcu_read_lock();
318 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
319 index, index_n.node) {
f8f3885c 320 if (!index->index_file) {
3d07a857
MD
321 continue;
322 }
323 /*
f8f3885c 324 * Partial index has its index_file: we have only
3d07a857
MD
325 * received its info from the data socket.
326 * Put self-ref from index.
327 */
328 relay_index_put(index);
329 }
330 rcu_read_unlock();
331}
332
333uint64_t relay_index_find_last(struct relay_stream *stream)
334{
335 struct lttng_ht_iter iter;
336 struct relay_index *index;
337 uint64_t net_seq_num = -1ULL;
338
339 rcu_read_lock();
340 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
341 index, index_n.node) {
342 if (net_seq_num == -1ULL ||
343 index->index_n.key > net_seq_num) {
344 net_seq_num = index->index_n.key;
345 }
346 }
347 rcu_read_unlock();
348 return net_seq_num;
349}
d3ecc550
JD
350
351/*
352 * Update the index file of an already existing relay_index.
353 * Offsets by 'removed_data_count' the offset field of an index.
354 */
355static
356int relay_index_switch_file(struct relay_index *index,
357 struct lttng_index_file *new_index_file,
358 uint64_t removed_data_count)
359{
360 int ret = 0;
361 uint64_t offset;
362
363 pthread_mutex_lock(&index->lock);
364 if (!index->index_file) {
365 ERR("No index_file");
366 ret = 0;
367 goto end;
368 }
369
370 lttng_index_file_put(index->index_file);
371 lttng_index_file_get(new_index_file);
372 index->index_file = new_index_file;
373 offset = be64toh(index->index_data.offset);
374 index->index_data.offset = htobe64(offset - removed_data_count);
375
376end:
377 pthread_mutex_unlock(&index->lock);
378 return ret;
379}
380
381/*
382 * Switch the index file of all pending indexes for a stream and update the
383 * data offset by substracting the last safe position.
384 * Stream lock must be held.
385 */
386int relay_index_switch_all_files(struct relay_stream *stream)
387{
388 struct lttng_ht_iter iter;
389 struct relay_index *index;
390 int ret = 0;
391
392 rcu_read_lock();
393 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
394 index, index_n.node) {
d3ecc550
JD
395 ret = relay_index_switch_file(index, stream->index_file,
396 stream->pos_after_last_complete_data_index);
397 if (ret) {
398 goto end;
399 }
400 }
401end:
402 rcu_read_unlock();
403 return ret;
404}
c35f9726
JG
405
406/*
407 * Set index data from the control port to a given index object.
408 */
409int relay_index_set_control_data(struct relay_index *index,
410 const struct lttcomm_relayd_index *data,
411 unsigned int minor_version)
412{
413 /* The index on disk is encoded in big endian. */
ac497a37
SM
414 ctf_packet_index index_data {};
415
416 index_data.packet_size = htobe64(data->packet_size);
417 index_data.content_size = htobe64(data->content_size);
418 index_data.timestamp_begin = htobe64(data->timestamp_begin);
419 index_data.timestamp_end = htobe64(data->timestamp_end);
420 index_data.events_discarded = htobe64(data->events_discarded);
421 index_data.stream_id = htobe64(data->stream_id);
c35f9726
JG
422
423 if (minor_version >= 8) {
424 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
425 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
0f83d1cc
MD
426 } else {
427 uint64_t unset_value = -1ULL;
428
429 index->index_data.stream_instance_id = htobe64(unset_value);
430 index->index_data.packet_seq_num = htobe64(unset_value);
c35f9726
JG
431 }
432
433 return relay_index_set_data(index, &index_data);
434}
This page took 0.072711 seconds and 4 git commands to generate.