Clean-up: relayd session: change space to tabs
[lttng-tools.git] / src / bin / lttng-relayd / index.c
CommitLineData
1c20f0e2 1/*
ab5be9fa
MJ
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
1c20f0e2 7 *
1c20f0e2
JD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
1c20f0e2
JD
11#include <assert.h>
12
13#include <common/common.h>
14#include <common/utils.h>
d3ecc550 15#include <common/compat/endian.h>
1c20f0e2 16
0a6518b0 17#include "lttng-relayd.h"
7591bab1 18#include "stream.h"
1c20f0e2 19#include "index.h"
c35f9726 20#include "connection.h"
1c20f0e2
JD
21
22/*
7591bab1
MD
23 * Allocate a new relay index object. Pass the stream in which it is
24 * contained as parameter. The sequence number will be used as the hash
25 * table key.
26 *
27 * Called with stream mutex held.
28 * Return allocated object or else NULL on error.
1c20f0e2 29 */
7591bab1
MD
30static struct relay_index *relay_index_create(struct relay_stream *stream,
31 uint64_t net_seq_num)
1c20f0e2 32{
7591bab1 33 struct relay_index *index;
1c20f0e2 34
7591bab1
MD
35 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
36 stream->stream_handle, net_seq_num);
1c20f0e2 37
7591bab1
MD
38 index = zmalloc(sizeof(*index));
39 if (!index) {
40 PERROR("Relay index zmalloc");
41 goto end;
42 }
43 if (!stream_get(stream)) {
44 ERR("Cannot get stream");
45 free(index);
46 index = NULL;
47 goto end;
1c20f0e2 48 }
7591bab1 49 index->stream = stream;
1c20f0e2 50
7591bab1
MD
51 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
52 pthread_mutex_init(&index->lock, NULL);
7591bab1
MD
53 urcu_ref_init(&index->ref);
54
55end:
56 return index;
1c20f0e2
JD
57}
58
59/*
7591bab1
MD
60 * Add unique relay index to the given hash table. In case of a collision, the
61 * already existing object is put in the given _index variable.
1c20f0e2 62 *
7591bab1 63 * RCU read side lock MUST be acquired.
1c20f0e2 64 */
7591bab1
MD
65static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
66 struct relay_index *index)
1c20f0e2 67{
7591bab1
MD
68 struct cds_lfht_node *node_ptr;
69 struct relay_index *_index;
1c20f0e2 70
7591bab1
MD
71 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
72 stream->stream_handle, index->index_n.key);
1c20f0e2 73
7591bab1
MD
74 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
75 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
76 stream->indexes_ht->match_fct, &index->index_n,
77 &index->index_n.node);
78 if (node_ptr != &index->index_n.node) {
79 _index = caa_container_of(node_ptr, struct relay_index,
80 index_n.node);
81 } else {
82 _index = NULL;
1c20f0e2 83 }
7591bab1
MD
84 return _index;
85}
86
87/*
88 * Should be called with RCU read-side lock held.
89 */
90static bool relay_index_get(struct relay_index *index)
91{
7591bab1
MD
92 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
93 index->stream->stream_handle, index->index_n.key,
94 (int) index->ref.refcount);
1c20f0e2 95
ce4d4083 96 return urcu_ref_get_unless_zero(&index->ref);
1c20f0e2
JD
97}
98
99/*
7591bab1
MD
100 * Get a relayd index in within the given stream, or create it if not
101 * present.
1c20f0e2 102 *
7591bab1 103 * Called with stream mutex held.
1c20f0e2
JD
104 * Return index object or else NULL on error.
105 */
7591bab1
MD
106struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
107 uint64_t net_seq_num)
1c20f0e2 108{
7591bab1 109 struct lttng_ht_node_u64 *node;
1c20f0e2 110 struct lttng_ht_iter iter;
1c20f0e2
JD
111 struct relay_index *index = NULL;
112
1c20f0e2 113 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
7591bab1 114 stream->stream_handle, net_seq_num);
1c20f0e2 115
7591bab1
MD
116 rcu_read_lock();
117 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
118 node = lttng_ht_iter_get_node_u64(&iter);
119 if (node) {
120 index = caa_container_of(node, struct relay_index, index_n);
121 } else {
122 struct relay_index *oldindex;
123
124 index = relay_index_create(stream, net_seq_num);
125 if (!index) {
126 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
811a4037 127 stream->stream_handle, net_seq_num);
7591bab1
MD
128 goto end;
129 }
130 oldindex = relay_index_add_unique(stream, index);
131 if (oldindex) {
132 /* Added concurrently, keep old. */
133 relay_index_put(index);
134 index = oldindex;
135 if (!relay_index_get(index)) {
136 index = NULL;
137 }
138 } else {
139 stream->indexes_in_flight++;
140 index->in_hash_table = true;
141 }
1c20f0e2 142 }
1c20f0e2 143end:
7591bab1
MD
144 rcu_read_unlock();
145 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
719155c2 146 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
1c20f0e2
JD
147 return index;
148}
149
f8f3885c
MD
150int relay_index_set_file(struct relay_index *index,
151 struct lttng_index_file *index_file,
7591bab1 152 uint64_t data_offset)
1c20f0e2 153{
7591bab1 154 int ret = 0;
1c20f0e2 155
7591bab1 156 pthread_mutex_lock(&index->lock);
f8f3885c 157 if (index->index_file) {
7591bab1
MD
158 ret = -1;
159 goto end;
160 }
f8f3885c
MD
161 lttng_index_file_get(index_file);
162 index->index_file = index_file;
7591bab1
MD
163 index->index_data.offset = data_offset;
164end:
165 pthread_mutex_unlock(&index->lock);
166 return ret;
167}
1c20f0e2 168
7591bab1
MD
169int relay_index_set_data(struct relay_index *index,
170 const struct ctf_packet_index *data)
171{
172 int ret = 0;
1c20f0e2 173
7591bab1
MD
174 pthread_mutex_lock(&index->lock);
175 if (index->has_index_data) {
176 ret = -1;
177 goto end;
1c20f0e2 178 }
7591bab1
MD
179 /* Set everything except data_offset. */
180 index->index_data.packet_size = data->packet_size;
181 index->index_data.content_size = data->content_size;
182 index->index_data.timestamp_begin = data->timestamp_begin;
183 index->index_data.timestamp_end = data->timestamp_end;
184 index->index_data.events_discarded = data->events_discarded;
185 index->index_data.stream_id = data->stream_id;
186 index->has_index_data = true;
187end:
188 pthread_mutex_unlock(&index->lock);
189 return ret;
1c20f0e2
JD
190}
191
7591bab1 192static void index_destroy(struct relay_index *index)
1c20f0e2 193{
7591bab1
MD
194 free(index);
195}
1c20f0e2 196
7591bab1
MD
197static void index_destroy_rcu(struct rcu_head *rcu_head)
198{
199 struct relay_index *index =
200 caa_container_of(rcu_head, struct relay_index, rcu_node);
1c20f0e2 201
7591bab1 202 index_destroy(index);
1c20f0e2
JD
203}
204
7591bab1
MD
205/* Stream lock must be held by the caller. */
206static void index_release(struct urcu_ref *ref)
1c20f0e2 207{
7591bab1
MD
208 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
209 struct relay_stream *stream = index->stream;
210 int ret;
211 struct lttng_ht_iter iter;
212
f8f3885c
MD
213 if (index->index_file) {
214 lttng_index_file_put(index->index_file);
215 index->index_file = NULL;
7591bab1
MD
216 }
217 if (index->in_hash_table) {
218 /* Delete index from hash table. */
219 iter.iter.node = &index->index_n.node;
220 ret = lttng_ht_del(stream->indexes_ht, &iter);
221 assert(!ret);
222 stream->indexes_in_flight--;
223 }
224
225 stream_put(index->stream);
226 index->stream = NULL;
227
228 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
229}
230
231/*
7591bab1
MD
232 * Called with stream mutex held.
233 *
234 * Stream lock must be held by the caller.
1c20f0e2 235 */
7591bab1 236void relay_index_put(struct relay_index *index)
1c20f0e2 237{
7591bab1
MD
238 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
239 index->stream->stream_handle, index->index_n.key,
240 (int) index->ref.refcount);
241 /*
242 * Ensure existance of index->lock for index unlock.
243 */
244 rcu_read_lock();
245 /*
246 * Index lock ensures that concurrent test and update of stream
247 * ref is atomic.
248 */
7591bab1
MD
249 assert(index->ref.refcount != 0);
250 urcu_ref_put(&index->ref, index_release);
7591bab1 251 rcu_read_unlock();
1c20f0e2
JD
252}
253
254/*
7591bab1
MD
255 * Try to flush index to disk. Releases self-reference to index once
256 * flush succeeds.
1c20f0e2 257 *
7591bab1
MD
258 * Stream lock must be held by the caller.
259 * Return 0 on successful flush, a negative value on error, or positive
260 * value if no flush was performed.
1c20f0e2 261 */
7591bab1 262int relay_index_try_flush(struct relay_index *index)
1c20f0e2 263{
7591bab1
MD
264 int ret = 1;
265 bool flushed = false;
1c20f0e2 266
7591bab1
MD
267 pthread_mutex_lock(&index->lock);
268 if (index->flushed) {
269 goto skip;
270 }
271 /* Check if we are ready to flush. */
f8f3885c 272 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
273 goto skip;
274 }
8bb66c3c
JG
275
276 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
277 index->stream->stream_handle, index->index_n.key);
7591bab1
MD
278 flushed = true;
279 index->flushed = true;
f8f3885c 280 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
281skip:
282 pthread_mutex_unlock(&index->lock);
1c20f0e2 283
7591bab1
MD
284 if (flushed) {
285 /* Put self-ref from index now that it has been flushed. */
286 relay_index_put(index);
287 }
288 return ret;
1c20f0e2
JD
289}
290
291/*
7591bab1
MD
292 * Close every relay index within a given stream, without flushing
293 * them.
1c20f0e2 294 */
7591bab1 295void relay_index_close_all(struct relay_stream *stream)
1c20f0e2
JD
296{
297 struct lttng_ht_iter iter;
298 struct relay_index *index;
299
1c20f0e2 300 rcu_read_lock();
7591bab1
MD
301 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
302 index, index_n.node) {
303 /* Put self-ref from index. */
304 relay_index_put(index);
1c20f0e2
JD
305 }
306 rcu_read_unlock();
307}
3d07a857
MD
308
309void relay_index_close_partial_fd(struct relay_stream *stream)
310{
311 struct lttng_ht_iter iter;
312 struct relay_index *index;
313
314 rcu_read_lock();
315 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
316 index, index_n.node) {
f8f3885c 317 if (!index->index_file) {
3d07a857
MD
318 continue;
319 }
320 /*
f8f3885c 321 * Partial index has its index_file: we have only
3d07a857
MD
322 * received its info from the data socket.
323 * Put self-ref from index.
324 */
325 relay_index_put(index);
326 }
327 rcu_read_unlock();
328}
329
330uint64_t relay_index_find_last(struct relay_stream *stream)
331{
332 struct lttng_ht_iter iter;
333 struct relay_index *index;
334 uint64_t net_seq_num = -1ULL;
335
336 rcu_read_lock();
337 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
338 index, index_n.node) {
339 if (net_seq_num == -1ULL ||
340 index->index_n.key > net_seq_num) {
341 net_seq_num = index->index_n.key;
342 }
343 }
344 rcu_read_unlock();
345 return net_seq_num;
346}
d3ecc550
JD
347
348/*
349 * Update the index file of an already existing relay_index.
350 * Offsets by 'removed_data_count' the offset field of an index.
351 */
352static
353int relay_index_switch_file(struct relay_index *index,
354 struct lttng_index_file *new_index_file,
355 uint64_t removed_data_count)
356{
357 int ret = 0;
358 uint64_t offset;
359
360 pthread_mutex_lock(&index->lock);
361 if (!index->index_file) {
362 ERR("No index_file");
363 ret = 0;
364 goto end;
365 }
366
367 lttng_index_file_put(index->index_file);
368 lttng_index_file_get(new_index_file);
369 index->index_file = new_index_file;
370 offset = be64toh(index->index_data.offset);
371 index->index_data.offset = htobe64(offset - removed_data_count);
372
373end:
374 pthread_mutex_unlock(&index->lock);
375 return ret;
376}
377
378/*
379 * Switch the index file of all pending indexes for a stream and update the
380 * data offset by substracting the last safe position.
381 * Stream lock must be held.
382 */
383int relay_index_switch_all_files(struct relay_stream *stream)
384{
385 struct lttng_ht_iter iter;
386 struct relay_index *index;
387 int ret = 0;
388
389 rcu_read_lock();
390 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
391 index, index_n.node) {
d3ecc550
JD
392 ret = relay_index_switch_file(index, stream->index_file,
393 stream->pos_after_last_complete_data_index);
394 if (ret) {
395 goto end;
396 }
397 }
398end:
399 rcu_read_unlock();
400 return ret;
401}
c35f9726
JG
402
403/*
404 * Set index data from the control port to a given index object.
405 */
406int relay_index_set_control_data(struct relay_index *index,
407 const struct lttcomm_relayd_index *data,
408 unsigned int minor_version)
409{
410 /* The index on disk is encoded in big endian. */
411 const struct ctf_packet_index index_data = {
412 .packet_size = htobe64(data->packet_size),
413 .content_size = htobe64(data->content_size),
414 .timestamp_begin = htobe64(data->timestamp_begin),
415 .timestamp_end = htobe64(data->timestamp_end),
416 .events_discarded = htobe64(data->events_discarded),
417 .stream_id = htobe64(data->stream_id),
418 };
419
420 if (minor_version >= 8) {
421 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
422 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
0f83d1cc
MD
423 } else {
424 uint64_t unset_value = -1ULL;
425
426 index->index_data.stream_instance_id = htobe64(unset_value);
427 index->index_data.packet_seq_num = htobe64(unset_value);
c35f9726
JG
428 }
429
430 return relay_index_set_data(index, &index_data);
431}
This page took 0.06374 seconds and 4 git commands to generate.