relayd: replace uses of block FDs by the fs_handle interface
[lttng-tools.git] / src / bin / lttng-relayd / index.c
CommitLineData
1c20f0e2
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
7591bab1 4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2
JD
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
6c1c0768 20#define _LGPL_SOURCE
1c20f0e2
JD
21#include <assert.h>
22
23#include <common/common.h>
24#include <common/utils.h>
d3ecc550 25#include <common/compat/endian.h>
1c20f0e2 26
0a6518b0 27#include "lttng-relayd.h"
7591bab1 28#include "stream.h"
1c20f0e2 29#include "index.h"
c35f9726 30#include "connection.h"
1c20f0e2
JD
31
32/*
7591bab1
MD
33 * Allocate a new relay index object. Pass the stream in which it is
34 * contained as parameter. The sequence number will be used as the hash
35 * table key.
36 *
37 * Called with stream mutex held.
38 * Return allocated object or else NULL on error.
1c20f0e2 39 */
7591bab1
MD
40static struct relay_index *relay_index_create(struct relay_stream *stream,
41 uint64_t net_seq_num)
1c20f0e2 42{
7591bab1 43 struct relay_index *index;
1c20f0e2 44
7591bab1
MD
45 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
46 stream->stream_handle, net_seq_num);
1c20f0e2 47
7591bab1
MD
48 index = zmalloc(sizeof(*index));
49 if (!index) {
50 PERROR("Relay index zmalloc");
51 goto end;
52 }
53 if (!stream_get(stream)) {
54 ERR("Cannot get stream");
55 free(index);
56 index = NULL;
57 goto end;
1c20f0e2 58 }
7591bab1 59 index->stream = stream;
1c20f0e2 60
7591bab1
MD
61 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
62 pthread_mutex_init(&index->lock, NULL);
7591bab1
MD
63 urcu_ref_init(&index->ref);
64
65end:
66 return index;
1c20f0e2
JD
67}
68
69/*
7591bab1
MD
70 * Add unique relay index to the given hash table. In case of a collision, the
71 * already existing object is put in the given _index variable.
1c20f0e2 72 *
7591bab1 73 * RCU read side lock MUST be acquired.
1c20f0e2 74 */
7591bab1
MD
75static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
76 struct relay_index *index)
1c20f0e2 77{
7591bab1
MD
78 struct cds_lfht_node *node_ptr;
79 struct relay_index *_index;
1c20f0e2 80
7591bab1
MD
81 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
82 stream->stream_handle, index->index_n.key);
1c20f0e2 83
7591bab1
MD
84 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
85 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
86 stream->indexes_ht->match_fct, &index->index_n,
87 &index->index_n.node);
88 if (node_ptr != &index->index_n.node) {
89 _index = caa_container_of(node_ptr, struct relay_index,
90 index_n.node);
91 } else {
92 _index = NULL;
1c20f0e2 93 }
7591bab1
MD
94 return _index;
95}
96
97/*
98 * Should be called with RCU read-side lock held.
99 */
100static bool relay_index_get(struct relay_index *index)
101{
7591bab1
MD
102 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
103 index->stream->stream_handle, index->index_n.key,
104 (int) index->ref.refcount);
1c20f0e2 105
ce4d4083 106 return urcu_ref_get_unless_zero(&index->ref);
1c20f0e2
JD
107}
108
109/*
7591bab1
MD
110 * Get a relayd index in within the given stream, or create it if not
111 * present.
1c20f0e2 112 *
7591bab1 113 * Called with stream mutex held.
1c20f0e2
JD
114 * Return index object or else NULL on error.
115 */
7591bab1
MD
116struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
117 uint64_t net_seq_num)
1c20f0e2 118{
7591bab1 119 struct lttng_ht_node_u64 *node;
1c20f0e2 120 struct lttng_ht_iter iter;
1c20f0e2
JD
121 struct relay_index *index = NULL;
122
1c20f0e2 123 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
7591bab1 124 stream->stream_handle, net_seq_num);
1c20f0e2 125
7591bab1
MD
126 rcu_read_lock();
127 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
128 node = lttng_ht_iter_get_node_u64(&iter);
129 if (node) {
130 index = caa_container_of(node, struct relay_index, index_n);
131 } else {
132 struct relay_index *oldindex;
133
134 index = relay_index_create(stream, net_seq_num);
135 if (!index) {
136 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
811a4037 137 stream->stream_handle, net_seq_num);
7591bab1
MD
138 goto end;
139 }
140 oldindex = relay_index_add_unique(stream, index);
141 if (oldindex) {
142 /* Added concurrently, keep old. */
143 relay_index_put(index);
144 index = oldindex;
145 if (!relay_index_get(index)) {
146 index = NULL;
147 }
148 } else {
149 stream->indexes_in_flight++;
150 index->in_hash_table = true;
151 }
1c20f0e2 152 }
1c20f0e2 153end:
7591bab1
MD
154 rcu_read_unlock();
155 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
719155c2 156 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
1c20f0e2
JD
157 return index;
158}
159
f8f3885c
MD
160int relay_index_set_file(struct relay_index *index,
161 struct lttng_index_file *index_file,
7591bab1 162 uint64_t data_offset)
1c20f0e2 163{
7591bab1 164 int ret = 0;
1c20f0e2 165
7591bab1 166 pthread_mutex_lock(&index->lock);
f8f3885c 167 if (index->index_file) {
7591bab1
MD
168 ret = -1;
169 goto end;
170 }
f8f3885c
MD
171 lttng_index_file_get(index_file);
172 index->index_file = index_file;
7591bab1
MD
173 index->index_data.offset = data_offset;
174end:
175 pthread_mutex_unlock(&index->lock);
176 return ret;
177}
1c20f0e2 178
7591bab1
MD
179int relay_index_set_data(struct relay_index *index,
180 const struct ctf_packet_index *data)
181{
182 int ret = 0;
1c20f0e2 183
7591bab1
MD
184 pthread_mutex_lock(&index->lock);
185 if (index->has_index_data) {
186 ret = -1;
187 goto end;
1c20f0e2 188 }
7591bab1
MD
189 /* Set everything except data_offset. */
190 index->index_data.packet_size = data->packet_size;
191 index->index_data.content_size = data->content_size;
192 index->index_data.timestamp_begin = data->timestamp_begin;
193 index->index_data.timestamp_end = data->timestamp_end;
194 index->index_data.events_discarded = data->events_discarded;
195 index->index_data.stream_id = data->stream_id;
196 index->has_index_data = true;
197end:
198 pthread_mutex_unlock(&index->lock);
199 return ret;
1c20f0e2
JD
200}
201
7591bab1 202static void index_destroy(struct relay_index *index)
1c20f0e2 203{
7591bab1
MD
204 free(index);
205}
1c20f0e2 206
7591bab1
MD
207static void index_destroy_rcu(struct rcu_head *rcu_head)
208{
209 struct relay_index *index =
210 caa_container_of(rcu_head, struct relay_index, rcu_node);
1c20f0e2 211
7591bab1 212 index_destroy(index);
1c20f0e2
JD
213}
214
7591bab1
MD
215/* Stream lock must be held by the caller. */
216static void index_release(struct urcu_ref *ref)
1c20f0e2 217{
7591bab1
MD
218 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
219 struct relay_stream *stream = index->stream;
220 int ret;
221 struct lttng_ht_iter iter;
222
f8f3885c
MD
223 if (index->index_file) {
224 lttng_index_file_put(index->index_file);
225 index->index_file = NULL;
7591bab1
MD
226 }
227 if (index->in_hash_table) {
228 /* Delete index from hash table. */
229 iter.iter.node = &index->index_n.node;
230 ret = lttng_ht_del(stream->indexes_ht, &iter);
231 assert(!ret);
232 stream->indexes_in_flight--;
233 }
234
235 stream_put(index->stream);
236 index->stream = NULL;
237
238 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
239}
240
241/*
7591bab1
MD
242 * Called with stream mutex held.
243 *
244 * Stream lock must be held by the caller.
1c20f0e2 245 */
7591bab1 246void relay_index_put(struct relay_index *index)
1c20f0e2 247{
7591bab1
MD
248 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
249 index->stream->stream_handle, index->index_n.key,
250 (int) index->ref.refcount);
251 /*
252 * Ensure existance of index->lock for index unlock.
253 */
254 rcu_read_lock();
255 /*
256 * Index lock ensures that concurrent test and update of stream
257 * ref is atomic.
258 */
7591bab1
MD
259 assert(index->ref.refcount != 0);
260 urcu_ref_put(&index->ref, index_release);
7591bab1 261 rcu_read_unlock();
1c20f0e2
JD
262}
263
264/*
7591bab1
MD
265 * Try to flush index to disk. Releases self-reference to index once
266 * flush succeeds.
1c20f0e2 267 *
7591bab1
MD
268 * Stream lock must be held by the caller.
269 * Return 0 on successful flush, a negative value on error, or positive
270 * value if no flush was performed.
1c20f0e2 271 */
7591bab1 272int relay_index_try_flush(struct relay_index *index)
1c20f0e2 273{
7591bab1
MD
274 int ret = 1;
275 bool flushed = false;
1c20f0e2 276
7591bab1
MD
277 pthread_mutex_lock(&index->lock);
278 if (index->flushed) {
279 goto skip;
280 }
281 /* Check if we are ready to flush. */
f8f3885c 282 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
283 goto skip;
284 }
8bb66c3c
JG
285
286 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
287 index->stream->stream_handle, index->index_n.key);
7591bab1
MD
288 flushed = true;
289 index->flushed = true;
f8f3885c 290 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
291skip:
292 pthread_mutex_unlock(&index->lock);
1c20f0e2 293
7591bab1
MD
294 if (flushed) {
295 /* Put self-ref from index now that it has been flushed. */
296 relay_index_put(index);
297 }
298 return ret;
1c20f0e2
JD
299}
300
301/*
7591bab1
MD
302 * Close every relay index within a given stream, without flushing
303 * them.
1c20f0e2 304 */
7591bab1 305void relay_index_close_all(struct relay_stream *stream)
1c20f0e2
JD
306{
307 struct lttng_ht_iter iter;
308 struct relay_index *index;
309
1c20f0e2 310 rcu_read_lock();
7591bab1
MD
311 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
312 index, index_n.node) {
313 /* Put self-ref from index. */
314 relay_index_put(index);
1c20f0e2
JD
315 }
316 rcu_read_unlock();
317}
3d07a857
MD
318
319void relay_index_close_partial_fd(struct relay_stream *stream)
320{
321 struct lttng_ht_iter iter;
322 struct relay_index *index;
323
324 rcu_read_lock();
325 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
326 index, index_n.node) {
f8f3885c 327 if (!index->index_file) {
3d07a857
MD
328 continue;
329 }
330 /*
f8f3885c 331 * Partial index has its index_file: we have only
3d07a857
MD
332 * received its info from the data socket.
333 * Put self-ref from index.
334 */
335 relay_index_put(index);
336 }
337 rcu_read_unlock();
338}
339
340uint64_t relay_index_find_last(struct relay_stream *stream)
341{
342 struct lttng_ht_iter iter;
343 struct relay_index *index;
344 uint64_t net_seq_num = -1ULL;
345
346 rcu_read_lock();
347 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
348 index, index_n.node) {
349 if (net_seq_num == -1ULL ||
350 index->index_n.key > net_seq_num) {
351 net_seq_num = index->index_n.key;
352 }
353 }
354 rcu_read_unlock();
355 return net_seq_num;
356}
d3ecc550
JD
357
358/*
359 * Update the index file of an already existing relay_index.
360 * Offsets by 'removed_data_count' the offset field of an index.
361 */
362static
363int relay_index_switch_file(struct relay_index *index,
364 struct lttng_index_file *new_index_file,
365 uint64_t removed_data_count)
366{
367 int ret = 0;
368 uint64_t offset;
369
370 pthread_mutex_lock(&index->lock);
371 if (!index->index_file) {
372 ERR("No index_file");
373 ret = 0;
374 goto end;
375 }
376
377 lttng_index_file_put(index->index_file);
378 lttng_index_file_get(new_index_file);
379 index->index_file = new_index_file;
380 offset = be64toh(index->index_data.offset);
381 index->index_data.offset = htobe64(offset - removed_data_count);
382
383end:
384 pthread_mutex_unlock(&index->lock);
385 return ret;
386}
387
388/*
389 * Switch the index file of all pending indexes for a stream and update the
390 * data offset by substracting the last safe position.
391 * Stream lock must be held.
392 */
393int relay_index_switch_all_files(struct relay_stream *stream)
394{
395 struct lttng_ht_iter iter;
396 struct relay_index *index;
397 int ret = 0;
398
399 rcu_read_lock();
400 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
401 index, index_n.node) {
d3ecc550
JD
402 ret = relay_index_switch_file(index, stream->index_file,
403 stream->pos_after_last_complete_data_index);
404 if (ret) {
405 goto end;
406 }
407 }
408end:
409 rcu_read_unlock();
410 return ret;
411}
c35f9726
JG
412
413/*
414 * Set index data from the control port to a given index object.
415 */
416int relay_index_set_control_data(struct relay_index *index,
417 const struct lttcomm_relayd_index *data,
418 unsigned int minor_version)
419{
420 /* The index on disk is encoded in big endian. */
421 const struct ctf_packet_index index_data = {
422 .packet_size = htobe64(data->packet_size),
423 .content_size = htobe64(data->content_size),
424 .timestamp_begin = htobe64(data->timestamp_begin),
425 .timestamp_end = htobe64(data->timestamp_end),
426 .events_discarded = htobe64(data->events_discarded),
427 .stream_id = htobe64(data->stream_id),
428 };
429
430 if (minor_version >= 8) {
431 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
432 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
0f83d1cc
MD
433 } else {
434 uint64_t unset_value = -1ULL;
435
436 index->index_data.stream_instance_id = htobe64(unset_value);
437 index->index_data.packet_seq_num = htobe64(unset_value);
c35f9726
JG
438 }
439
440 return relay_index_set_data(index, &index_data);
441}
This page took 0.060557 seconds and 4 git commands to generate.