Implement the RELAYD_ROTATE_PENDING relay daemon command
[lttng-tools.git] / src / bin / lttng-relayd / index.c
CommitLineData
1c20f0e2
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
7591bab1 4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2
JD
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
6c1c0768 20#define _LGPL_SOURCE
1c20f0e2
JD
21#include <assert.h>
22
23#include <common/common.h>
24#include <common/utils.h>
d3ecc550 25#include <common/compat/endian.h>
1c20f0e2 26
0a6518b0 27#include "lttng-relayd.h"
7591bab1 28#include "stream.h"
1c20f0e2
JD
29#include "index.h"
30
31/*
7591bab1
MD
32 * Allocate a new relay index object. Pass the stream in which it is
33 * contained as parameter. The sequence number will be used as the hash
34 * table key.
35 *
36 * Called with stream mutex held.
37 * Return allocated object or else NULL on error.
1c20f0e2 38 */
7591bab1
MD
39static struct relay_index *relay_index_create(struct relay_stream *stream,
40 uint64_t net_seq_num)
1c20f0e2 41{
7591bab1 42 struct relay_index *index;
1c20f0e2 43
7591bab1
MD
44 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
45 stream->stream_handle, net_seq_num);
1c20f0e2 46
7591bab1
MD
47 index = zmalloc(sizeof(*index));
48 if (!index) {
49 PERROR("Relay index zmalloc");
50 goto end;
51 }
52 if (!stream_get(stream)) {
53 ERR("Cannot get stream");
54 free(index);
55 index = NULL;
56 goto end;
1c20f0e2 57 }
7591bab1 58 index->stream = stream;
1c20f0e2 59
7591bab1
MD
60 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
61 pthread_mutex_init(&index->lock, NULL);
7591bab1
MD
62 urcu_ref_init(&index->ref);
63
64end:
65 return index;
1c20f0e2
JD
66}
67
68/*
7591bab1
MD
69 * Add unique relay index to the given hash table. In case of a collision, the
70 * already existing object is put in the given _index variable.
1c20f0e2 71 *
7591bab1 72 * RCU read side lock MUST be acquired.
1c20f0e2 73 */
7591bab1
MD
74static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
75 struct relay_index *index)
1c20f0e2 76{
7591bab1
MD
77 struct cds_lfht_node *node_ptr;
78 struct relay_index *_index;
1c20f0e2 79
7591bab1
MD
80 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
81 stream->stream_handle, index->index_n.key);
1c20f0e2 82
7591bab1
MD
83 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
84 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
85 stream->indexes_ht->match_fct, &index->index_n,
86 &index->index_n.node);
87 if (node_ptr != &index->index_n.node) {
88 _index = caa_container_of(node_ptr, struct relay_index,
89 index_n.node);
90 } else {
91 _index = NULL;
1c20f0e2 92 }
7591bab1
MD
93 return _index;
94}
95
96/*
97 * Should be called with RCU read-side lock held.
98 */
99static bool relay_index_get(struct relay_index *index)
100{
7591bab1
MD
101 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
102 index->stream->stream_handle, index->index_n.key,
103 (int) index->ref.refcount);
1c20f0e2 104
ce4d4083 105 return urcu_ref_get_unless_zero(&index->ref);
1c20f0e2
JD
106}
107
108/*
7591bab1
MD
109 * Get a relayd index in within the given stream, or create it if not
110 * present.
1c20f0e2 111 *
7591bab1 112 * Called with stream mutex held.
1c20f0e2
JD
113 * Return index object or else NULL on error.
114 */
7591bab1
MD
115struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
116 uint64_t net_seq_num)
1c20f0e2 117{
7591bab1 118 struct lttng_ht_node_u64 *node;
1c20f0e2 119 struct lttng_ht_iter iter;
1c20f0e2
JD
120 struct relay_index *index = NULL;
121
1c20f0e2 122 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
7591bab1 123 stream->stream_handle, net_seq_num);
1c20f0e2 124
7591bab1
MD
125 rcu_read_lock();
126 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
127 node = lttng_ht_iter_get_node_u64(&iter);
128 if (node) {
129 index = caa_container_of(node, struct relay_index, index_n);
130 } else {
131 struct relay_index *oldindex;
132
133 index = relay_index_create(stream, net_seq_num);
134 if (!index) {
135 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
811a4037 136 stream->stream_handle, net_seq_num);
7591bab1
MD
137 goto end;
138 }
139 oldindex = relay_index_add_unique(stream, index);
140 if (oldindex) {
141 /* Added concurrently, keep old. */
142 relay_index_put(index);
143 index = oldindex;
144 if (!relay_index_get(index)) {
145 index = NULL;
146 }
147 } else {
148 stream->indexes_in_flight++;
149 index->in_hash_table = true;
150 }
1c20f0e2 151 }
1c20f0e2 152end:
7591bab1
MD
153 rcu_read_unlock();
154 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
719155c2 155 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
1c20f0e2
JD
156 return index;
157}
158
f8f3885c
MD
159int relay_index_set_file(struct relay_index *index,
160 struct lttng_index_file *index_file,
7591bab1 161 uint64_t data_offset)
1c20f0e2 162{
7591bab1 163 int ret = 0;
1c20f0e2 164
7591bab1 165 pthread_mutex_lock(&index->lock);
f8f3885c 166 if (index->index_file) {
7591bab1
MD
167 ret = -1;
168 goto end;
169 }
f8f3885c
MD
170 lttng_index_file_get(index_file);
171 index->index_file = index_file;
7591bab1
MD
172 index->index_data.offset = data_offset;
173end:
174 pthread_mutex_unlock(&index->lock);
175 return ret;
176}
1c20f0e2 177
7591bab1
MD
178int relay_index_set_data(struct relay_index *index,
179 const struct ctf_packet_index *data)
180{
181 int ret = 0;
1c20f0e2 182
7591bab1
MD
183 pthread_mutex_lock(&index->lock);
184 if (index->has_index_data) {
185 ret = -1;
186 goto end;
1c20f0e2 187 }
7591bab1
MD
188 /* Set everything except data_offset. */
189 index->index_data.packet_size = data->packet_size;
190 index->index_data.content_size = data->content_size;
191 index->index_data.timestamp_begin = data->timestamp_begin;
192 index->index_data.timestamp_end = data->timestamp_end;
193 index->index_data.events_discarded = data->events_discarded;
194 index->index_data.stream_id = data->stream_id;
195 index->has_index_data = true;
196end:
197 pthread_mutex_unlock(&index->lock);
198 return ret;
1c20f0e2
JD
199}
200
7591bab1 201static void index_destroy(struct relay_index *index)
1c20f0e2 202{
7591bab1
MD
203 free(index);
204}
1c20f0e2 205
7591bab1
MD
206static void index_destroy_rcu(struct rcu_head *rcu_head)
207{
208 struct relay_index *index =
209 caa_container_of(rcu_head, struct relay_index, rcu_node);
1c20f0e2 210
7591bab1 211 index_destroy(index);
1c20f0e2
JD
212}
213
7591bab1
MD
214/* Stream lock must be held by the caller. */
215static void index_release(struct urcu_ref *ref)
1c20f0e2 216{
7591bab1
MD
217 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
218 struct relay_stream *stream = index->stream;
219 int ret;
220 struct lttng_ht_iter iter;
221
f8f3885c
MD
222 if (index->index_file) {
223 lttng_index_file_put(index->index_file);
224 index->index_file = NULL;
7591bab1
MD
225 }
226 if (index->in_hash_table) {
227 /* Delete index from hash table. */
228 iter.iter.node = &index->index_n.node;
229 ret = lttng_ht_del(stream->indexes_ht, &iter);
230 assert(!ret);
231 stream->indexes_in_flight--;
232 }
233
234 stream_put(index->stream);
235 index->stream = NULL;
236
237 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
238}
239
240/*
7591bab1
MD
241 * Called with stream mutex held.
242 *
243 * Stream lock must be held by the caller.
1c20f0e2 244 */
7591bab1 245void relay_index_put(struct relay_index *index)
1c20f0e2 246{
7591bab1
MD
247 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
248 index->stream->stream_handle, index->index_n.key,
249 (int) index->ref.refcount);
250 /*
251 * Ensure existance of index->lock for index unlock.
252 */
253 rcu_read_lock();
254 /*
255 * Index lock ensures that concurrent test and update of stream
256 * ref is atomic.
257 */
7591bab1
MD
258 assert(index->ref.refcount != 0);
259 urcu_ref_put(&index->ref, index_release);
7591bab1 260 rcu_read_unlock();
1c20f0e2
JD
261}
262
263/*
7591bab1
MD
264 * Try to flush index to disk. Releases self-reference to index once
265 * flush succeeds.
1c20f0e2 266 *
7591bab1
MD
267 * Stream lock must be held by the caller.
268 * Return 0 on successful flush, a negative value on error, or positive
269 * value if no flush was performed.
1c20f0e2 270 */
7591bab1 271int relay_index_try_flush(struct relay_index *index)
1c20f0e2 272{
7591bab1
MD
273 int ret = 1;
274 bool flushed = false;
275 int fd;
1c20f0e2 276
7591bab1
MD
277 pthread_mutex_lock(&index->lock);
278 if (index->flushed) {
279 goto skip;
280 }
281 /* Check if we are ready to flush. */
f8f3885c 282 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
283 goto skip;
284 }
f8f3885c 285 fd = index->index_file->fd;
7591bab1
MD
286 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
287 " on fd %d", index->stream->stream_handle,
288 index->index_n.key, fd);
289 flushed = true;
290 index->flushed = true;
f8f3885c 291 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
292skip:
293 pthread_mutex_unlock(&index->lock);
1c20f0e2 294
7591bab1
MD
295 if (flushed) {
296 /* Put self-ref from index now that it has been flushed. */
297 relay_index_put(index);
298 }
299 return ret;
1c20f0e2
JD
300}
301
302/*
7591bab1
MD
303 * Close every relay index within a given stream, without flushing
304 * them.
1c20f0e2 305 */
7591bab1 306void relay_index_close_all(struct relay_stream *stream)
1c20f0e2
JD
307{
308 struct lttng_ht_iter iter;
309 struct relay_index *index;
310
1c20f0e2 311 rcu_read_lock();
7591bab1
MD
312 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
313 index, index_n.node) {
314 /* Put self-ref from index. */
315 relay_index_put(index);
1c20f0e2
JD
316 }
317 rcu_read_unlock();
318}
3d07a857
MD
319
320void relay_index_close_partial_fd(struct relay_stream *stream)
321{
322 struct lttng_ht_iter iter;
323 struct relay_index *index;
324
325 rcu_read_lock();
326 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
327 index, index_n.node) {
f8f3885c 328 if (!index->index_file) {
3d07a857
MD
329 continue;
330 }
331 /*
f8f3885c 332 * Partial index has its index_file: we have only
3d07a857
MD
333 * received its info from the data socket.
334 * Put self-ref from index.
335 */
336 relay_index_put(index);
337 }
338 rcu_read_unlock();
339}
340
341uint64_t relay_index_find_last(struct relay_stream *stream)
342{
343 struct lttng_ht_iter iter;
344 struct relay_index *index;
345 uint64_t net_seq_num = -1ULL;
346
347 rcu_read_lock();
348 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
349 index, index_n.node) {
350 if (net_seq_num == -1ULL ||
351 index->index_n.key > net_seq_num) {
352 net_seq_num = index->index_n.key;
353 }
354 }
355 rcu_read_unlock();
356 return net_seq_num;
357}
d3ecc550
JD
358
359/*
360 * Update the index file of an already existing relay_index.
361 * Offsets by 'removed_data_count' the offset field of an index.
362 */
363static
364int relay_index_switch_file(struct relay_index *index,
365 struct lttng_index_file *new_index_file,
366 uint64_t removed_data_count)
367{
368 int ret = 0;
369 uint64_t offset;
370
371 pthread_mutex_lock(&index->lock);
372 if (!index->index_file) {
373 ERR("No index_file");
374 ret = 0;
375 goto end;
376 }
377
378 lttng_index_file_put(index->index_file);
379 lttng_index_file_get(new_index_file);
380 index->index_file = new_index_file;
381 offset = be64toh(index->index_data.offset);
382 index->index_data.offset = htobe64(offset - removed_data_count);
383
384end:
385 pthread_mutex_unlock(&index->lock);
386 return ret;
387}
388
389/*
390 * Switch the index file of all pending indexes for a stream and update the
391 * data offset by substracting the last safe position.
392 * Stream lock must be held.
393 */
394int relay_index_switch_all_files(struct relay_stream *stream)
395{
396 struct lttng_ht_iter iter;
397 struct relay_index *index;
398 int ret = 0;
399
400 rcu_read_lock();
401 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
402 index, index_n.node) {
403 DBG("Update index to fd %d", stream->index_file->fd);
404 ret = relay_index_switch_file(index, stream->index_file,
405 stream->pos_after_last_complete_data_index);
406 if (ret) {
407 goto end;
408 }
409 }
410end:
411 rcu_read_unlock();
412 return ret;
413}
This page took 0.052343 seconds and 4 git commands to generate.