Tests: Add test to check shared-memory FD leaks after relayd dies
[lttng-tools.git] / src / bin / lttng-relayd / index.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11
12 #include <common/common.h>
13 #include <common/utils.h>
14 #include <common/compat/endian.h>
15
16 #include "lttng-relayd.h"
17 #include "stream.h"
18 #include "index.h"
19 #include "connection.h"
20
21 /*
22 * Allocate a new relay index object. Pass the stream in which it is
23 * contained as parameter. The sequence number will be used as the hash
24 * table key.
25 *
26 * Called with stream mutex held.
27 * Return allocated object or else NULL on error.
28 */
29 static struct relay_index *relay_index_create(struct relay_stream *stream,
30 uint64_t net_seq_num)
31 {
32 struct relay_index *index;
33
34 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
35 stream->stream_handle, net_seq_num);
36
37 index = (relay_index *) zmalloc(sizeof(*index));
38 if (!index) {
39 PERROR("Relay index zmalloc");
40 goto end;
41 }
42 if (!stream_get(stream)) {
43 ERR("Cannot get stream");
44 free(index);
45 index = NULL;
46 goto end;
47 }
48 index->stream = stream;
49
50 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
51 pthread_mutex_init(&index->lock, NULL);
52 urcu_ref_init(&index->ref);
53
54 end:
55 return index;
56 }
57
58 /*
59 * Add unique relay index to the given hash table. In case of a collision, the
60 * already existing object is put in the given _index variable.
61 *
62 * RCU read side lock MUST be acquired.
63 */
64 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
65 struct relay_index *index)
66 {
67 struct cds_lfht_node *node_ptr;
68 struct relay_index *_index;
69
70 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
71 stream->stream_handle, index->index_n.key);
72
73 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
74 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
75 stream->indexes_ht->match_fct, &index->index_n,
76 &index->index_n.node);
77 if (node_ptr != &index->index_n.node) {
78 _index = caa_container_of(node_ptr, struct relay_index,
79 index_n.node);
80 } else {
81 _index = NULL;
82 }
83 return _index;
84 }
85
86 /*
87 * Should be called with RCU read-side lock held.
88 */
89 static bool relay_index_get(struct relay_index *index)
90 {
91 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
92 index->stream->stream_handle, index->index_n.key,
93 (int) index->ref.refcount);
94
95 return urcu_ref_get_unless_zero(&index->ref);
96 }
97
98 /*
99 * Get a relayd index in within the given stream, or create it if not
100 * present.
101 *
102 * Called with stream mutex held.
103 * Return index object or else NULL on error.
104 */
105 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
106 uint64_t net_seq_num)
107 {
108 struct lttng_ht_node_u64 *node;
109 struct lttng_ht_iter iter;
110 struct relay_index *index = NULL;
111
112 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
113 stream->stream_handle, net_seq_num);
114
115 rcu_read_lock();
116 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
117 node = lttng_ht_iter_get_node_u64(&iter);
118 if (node) {
119 index = caa_container_of(node, struct relay_index, index_n);
120 } else {
121 struct relay_index *oldindex;
122
123 index = relay_index_create(stream, net_seq_num);
124 if (!index) {
125 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
126 stream->stream_handle, net_seq_num);
127 goto end;
128 }
129 oldindex = relay_index_add_unique(stream, index);
130 if (oldindex) {
131 /* Added concurrently, keep old. */
132 relay_index_put(index);
133 index = oldindex;
134 if (!relay_index_get(index)) {
135 index = NULL;
136 }
137 } else {
138 stream->indexes_in_flight++;
139 index->in_hash_table = true;
140 }
141 }
142 end:
143 rcu_read_unlock();
144 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
145 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
146 return index;
147 }
148
149 int relay_index_set_file(struct relay_index *index,
150 struct lttng_index_file *index_file,
151 uint64_t data_offset)
152 {
153 int ret = 0;
154
155 pthread_mutex_lock(&index->lock);
156 if (index->index_file) {
157 ret = -1;
158 goto end;
159 }
160 lttng_index_file_get(index_file);
161 index->index_file = index_file;
162 index->index_data.offset = data_offset;
163 end:
164 pthread_mutex_unlock(&index->lock);
165 return ret;
166 }
167
168 int relay_index_set_data(struct relay_index *index,
169 const struct ctf_packet_index *data)
170 {
171 int ret = 0;
172
173 pthread_mutex_lock(&index->lock);
174 if (index->has_index_data) {
175 ret = -1;
176 goto end;
177 }
178 /* Set everything except data_offset. */
179 index->index_data.packet_size = data->packet_size;
180 index->index_data.content_size = data->content_size;
181 index->index_data.timestamp_begin = data->timestamp_begin;
182 index->index_data.timestamp_end = data->timestamp_end;
183 index->index_data.events_discarded = data->events_discarded;
184 index->index_data.stream_id = data->stream_id;
185 index->has_index_data = true;
186 end:
187 pthread_mutex_unlock(&index->lock);
188 return ret;
189 }
190
191 static void index_destroy(struct relay_index *index)
192 {
193 free(index);
194 }
195
196 static void index_destroy_rcu(struct rcu_head *rcu_head)
197 {
198 struct relay_index *index =
199 caa_container_of(rcu_head, struct relay_index, rcu_node);
200
201 index_destroy(index);
202 }
203
204 /* Stream lock must be held by the caller. */
205 static void index_release(struct urcu_ref *ref)
206 {
207 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
208 struct relay_stream *stream = index->stream;
209 int ret;
210 struct lttng_ht_iter iter;
211
212 if (index->index_file) {
213 lttng_index_file_put(index->index_file);
214 index->index_file = NULL;
215 }
216 if (index->in_hash_table) {
217 /* Delete index from hash table. */
218 iter.iter.node = &index->index_n.node;
219 ret = lttng_ht_del(stream->indexes_ht, &iter);
220 LTTNG_ASSERT(!ret);
221 stream->indexes_in_flight--;
222 }
223
224 stream_put(index->stream);
225 index->stream = NULL;
226
227 call_rcu(&index->rcu_node, index_destroy_rcu);
228 }
229
230 /*
231 * Called with stream mutex held.
232 *
233 * Stream lock must be held by the caller.
234 */
235 void relay_index_put(struct relay_index *index)
236 {
237 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
238 index->stream->stream_handle, index->index_n.key,
239 (int) index->ref.refcount);
240 /*
241 * Ensure existence of index->lock for index unlock.
242 */
243 rcu_read_lock();
244 /*
245 * Index lock ensures that concurrent test and update of stream
246 * ref is atomic.
247 */
248 LTTNG_ASSERT(index->ref.refcount != 0);
249 urcu_ref_put(&index->ref, index_release);
250 rcu_read_unlock();
251 }
252
253 /*
254 * Try to flush index to disk. Releases self-reference to index once
255 * flush succeeds.
256 *
257 * Stream lock must be held by the caller.
258 * Return 0 on successful flush, a negative value on error, or positive
259 * value if no flush was performed.
260 */
261 int relay_index_try_flush(struct relay_index *index)
262 {
263 int ret = 1;
264 bool flushed = false;
265
266 pthread_mutex_lock(&index->lock);
267 if (index->flushed) {
268 goto skip;
269 }
270 /* Check if we are ready to flush. */
271 if (!index->has_index_data || !index->index_file) {
272 goto skip;
273 }
274
275 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
276 index->stream->stream_handle, index->index_n.key);
277 flushed = true;
278 index->flushed = true;
279 ret = lttng_index_file_write(index->index_file, &index->index_data);
280 skip:
281 pthread_mutex_unlock(&index->lock);
282
283 if (flushed) {
284 /* Put self-ref from index now that it has been flushed. */
285 relay_index_put(index);
286 }
287 return ret;
288 }
289
290 /*
291 * Close every relay index within a given stream, without flushing
292 * them.
293 */
294 void relay_index_close_all(struct relay_stream *stream)
295 {
296 struct lttng_ht_iter iter;
297 struct relay_index *index;
298
299 rcu_read_lock();
300 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
301 index, index_n.node) {
302 /* Put self-ref from index. */
303 relay_index_put(index);
304 }
305 rcu_read_unlock();
306 }
307
308 void relay_index_close_partial_fd(struct relay_stream *stream)
309 {
310 struct lttng_ht_iter iter;
311 struct relay_index *index;
312
313 rcu_read_lock();
314 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
315 index, index_n.node) {
316 if (!index->index_file) {
317 continue;
318 }
319 /*
320 * Partial index has its index_file: we have only
321 * received its info from the data socket.
322 * Put self-ref from index.
323 */
324 relay_index_put(index);
325 }
326 rcu_read_unlock();
327 }
328
329 uint64_t relay_index_find_last(struct relay_stream *stream)
330 {
331 struct lttng_ht_iter iter;
332 struct relay_index *index;
333 uint64_t net_seq_num = -1ULL;
334
335 rcu_read_lock();
336 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
337 index, index_n.node) {
338 if (net_seq_num == -1ULL ||
339 index->index_n.key > net_seq_num) {
340 net_seq_num = index->index_n.key;
341 }
342 }
343 rcu_read_unlock();
344 return net_seq_num;
345 }
346
347 /*
348 * Update the index file of an already existing relay_index.
349 * Offsets by 'removed_data_count' the offset field of an index.
350 */
351 static
352 int relay_index_switch_file(struct relay_index *index,
353 struct lttng_index_file *new_index_file,
354 uint64_t removed_data_count)
355 {
356 int ret = 0;
357 uint64_t offset;
358
359 pthread_mutex_lock(&index->lock);
360 if (!index->index_file) {
361 ERR("No index_file");
362 ret = 0;
363 goto end;
364 }
365
366 lttng_index_file_put(index->index_file);
367 lttng_index_file_get(new_index_file);
368 index->index_file = new_index_file;
369 offset = be64toh(index->index_data.offset);
370 index->index_data.offset = htobe64(offset - removed_data_count);
371
372 end:
373 pthread_mutex_unlock(&index->lock);
374 return ret;
375 }
376
377 /*
378 * Switch the index file of all pending indexes for a stream and update the
379 * data offset by substracting the last safe position.
380 * Stream lock must be held.
381 */
382 int relay_index_switch_all_files(struct relay_stream *stream)
383 {
384 struct lttng_ht_iter iter;
385 struct relay_index *index;
386 int ret = 0;
387
388 rcu_read_lock();
389 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
390 index, index_n.node) {
391 ret = relay_index_switch_file(index, stream->index_file,
392 stream->pos_after_last_complete_data_index);
393 if (ret) {
394 goto end;
395 }
396 }
397 end:
398 rcu_read_unlock();
399 return ret;
400 }
401
402 /*
403 * Set index data from the control port to a given index object.
404 */
405 int relay_index_set_control_data(struct relay_index *index,
406 const struct lttcomm_relayd_index *data,
407 unsigned int minor_version)
408 {
409 /* The index on disk is encoded in big endian. */
410 ctf_packet_index index_data {};
411
412 index_data.packet_size = htobe64(data->packet_size);
413 index_data.content_size = htobe64(data->content_size);
414 index_data.timestamp_begin = htobe64(data->timestamp_begin);
415 index_data.timestamp_end = htobe64(data->timestamp_end);
416 index_data.events_discarded = htobe64(data->events_discarded);
417 index_data.stream_id = htobe64(data->stream_id);
418
419 if (minor_version >= 8) {
420 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
421 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
422 } else {
423 uint64_t unset_value = -1ULL;
424
425 index->index_data.stream_instance_id = htobe64(unset_value);
426 index->index_data.packet_seq_num = htobe64(unset_value);
427 }
428
429 return relay_index_set_data(index, &index_data);
430 }
This page took 0.036821 seconds and 4 git commands to generate.