relayd: replace uses of block FDs by the fs_handle interface
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22
23 #include <common/common.h>
24 #include <common/utils.h>
25 #include <common/compat/endian.h>
26
27 #include "lttng-relayd.h"
28 #include "stream.h"
29 #include "index.h"
30 #include "connection.h"
31
32 /*
33 * Allocate a new relay index object. Pass the stream in which it is
34 * contained as parameter. The sequence number will be used as the hash
35 * table key.
36 *
37 * Called with stream mutex held.
38 * Return allocated object or else NULL on error.
39 */
40 static struct relay_index *relay_index_create(struct relay_stream *stream,
41 uint64_t net_seq_num)
42 {
43 struct relay_index *index;
44
45 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
46 stream->stream_handle, net_seq_num);
47
48 index = zmalloc(sizeof(*index));
49 if (!index) {
50 PERROR("Relay index zmalloc");
51 goto end;
52 }
53 if (!stream_get(stream)) {
54 ERR("Cannot get stream");
55 free(index);
56 index = NULL;
57 goto end;
58 }
59 index->stream = stream;
60
61 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
62 pthread_mutex_init(&index->lock, NULL);
63 urcu_ref_init(&index->ref);
64
65 end:
66 return index;
67 }
68
69 /*
70 * Add unique relay index to the given hash table. In case of a collision, the
71 * already existing object is put in the given _index variable.
72 *
73 * RCU read side lock MUST be acquired.
74 */
75 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
76 struct relay_index *index)
77 {
78 struct cds_lfht_node *node_ptr;
79 struct relay_index *_index;
80
81 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
82 stream->stream_handle, index->index_n.key);
83
84 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
85 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
86 stream->indexes_ht->match_fct, &index->index_n,
87 &index->index_n.node);
88 if (node_ptr != &index->index_n.node) {
89 _index = caa_container_of(node_ptr, struct relay_index,
90 index_n.node);
91 } else {
92 _index = NULL;
93 }
94 return _index;
95 }
96
97 /*
98 * Should be called with RCU read-side lock held.
99 */
100 static bool relay_index_get(struct relay_index *index)
101 {
102 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
103 index->stream->stream_handle, index->index_n.key,
104 (int) index->ref.refcount);
105
106 return urcu_ref_get_unless_zero(&index->ref);
107 }
108
109 /*
110 * Get a relayd index in within the given stream, or create it if not
111 * present.
112 *
113 * Called with stream mutex held.
114 * Return index object or else NULL on error.
115 */
116 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
117 uint64_t net_seq_num)
118 {
119 struct lttng_ht_node_u64 *node;
120 struct lttng_ht_iter iter;
121 struct relay_index *index = NULL;
122
123 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
124 stream->stream_handle, net_seq_num);
125
126 rcu_read_lock();
127 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
128 node = lttng_ht_iter_get_node_u64(&iter);
129 if (node) {
130 index = caa_container_of(node, struct relay_index, index_n);
131 } else {
132 struct relay_index *oldindex;
133
134 index = relay_index_create(stream, net_seq_num);
135 if (!index) {
136 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
137 stream->stream_handle, net_seq_num);
138 goto end;
139 }
140 oldindex = relay_index_add_unique(stream, index);
141 if (oldindex) {
142 /* Added concurrently, keep old. */
143 relay_index_put(index);
144 index = oldindex;
145 if (!relay_index_get(index)) {
146 index = NULL;
147 }
148 } else {
149 stream->indexes_in_flight++;
150 index->in_hash_table = true;
151 }
152 }
153 end:
154 rcu_read_unlock();
155 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
156 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
157 return index;
158 }
159
160 int relay_index_set_file(struct relay_index *index,
161 struct lttng_index_file *index_file,
162 uint64_t data_offset)
163 {
164 int ret = 0;
165
166 pthread_mutex_lock(&index->lock);
167 if (index->index_file) {
168 ret = -1;
169 goto end;
170 }
171 lttng_index_file_get(index_file);
172 index->index_file = index_file;
173 index->index_data.offset = data_offset;
174 end:
175 pthread_mutex_unlock(&index->lock);
176 return ret;
177 }
178
179 int relay_index_set_data(struct relay_index *index,
180 const struct ctf_packet_index *data)
181 {
182 int ret = 0;
183
184 pthread_mutex_lock(&index->lock);
185 if (index->has_index_data) {
186 ret = -1;
187 goto end;
188 }
189 /* Set everything except data_offset. */
190 index->index_data.packet_size = data->packet_size;
191 index->index_data.content_size = data->content_size;
192 index->index_data.timestamp_begin = data->timestamp_begin;
193 index->index_data.timestamp_end = data->timestamp_end;
194 index->index_data.events_discarded = data->events_discarded;
195 index->index_data.stream_id = data->stream_id;
196 index->has_index_data = true;
197 end:
198 pthread_mutex_unlock(&index->lock);
199 return ret;
200 }
201
202 static void index_destroy(struct relay_index *index)
203 {
204 free(index);
205 }
206
207 static void index_destroy_rcu(struct rcu_head *rcu_head)
208 {
209 struct relay_index *index =
210 caa_container_of(rcu_head, struct relay_index, rcu_node);
211
212 index_destroy(index);
213 }
214
215 /* Stream lock must be held by the caller. */
216 static void index_release(struct urcu_ref *ref)
217 {
218 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
219 struct relay_stream *stream = index->stream;
220 int ret;
221 struct lttng_ht_iter iter;
222
223 if (index->index_file) {
224 lttng_index_file_put(index->index_file);
225 index->index_file = NULL;
226 }
227 if (index->in_hash_table) {
228 /* Delete index from hash table. */
229 iter.iter.node = &index->index_n.node;
230 ret = lttng_ht_del(stream->indexes_ht, &iter);
231 assert(!ret);
232 stream->indexes_in_flight--;
233 }
234
235 stream_put(index->stream);
236 index->stream = NULL;
237
238 call_rcu(&index->rcu_node, index_destroy_rcu);
239 }
240
241 /*
242 * Called with stream mutex held.
243 *
244 * Stream lock must be held by the caller.
245 */
246 void relay_index_put(struct relay_index *index)
247 {
248 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
249 index->stream->stream_handle, index->index_n.key,
250 (int) index->ref.refcount);
251 /*
252 * Ensure existance of index->lock for index unlock.
253 */
254 rcu_read_lock();
255 /*
256 * Index lock ensures that concurrent test and update of stream
257 * ref is atomic.
258 */
259 assert(index->ref.refcount != 0);
260 urcu_ref_put(&index->ref, index_release);
261 rcu_read_unlock();
262 }
263
264 /*
265 * Try to flush index to disk. Releases self-reference to index once
266 * flush succeeds.
267 *
268 * Stream lock must be held by the caller.
269 * Return 0 on successful flush, a negative value on error, or positive
270 * value if no flush was performed.
271 */
272 int relay_index_try_flush(struct relay_index *index)
273 {
274 int ret = 1;
275 bool flushed = false;
276
277 pthread_mutex_lock(&index->lock);
278 if (index->flushed) {
279 goto skip;
280 }
281 /* Check if we are ready to flush. */
282 if (!index->has_index_data || !index->index_file) {
283 goto skip;
284 }
285
286 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
287 index->stream->stream_handle, index->index_n.key);
288 flushed = true;
289 index->flushed = true;
290 ret = lttng_index_file_write(index->index_file, &index->index_data);
291 skip:
292 pthread_mutex_unlock(&index->lock);
293
294 if (flushed) {
295 /* Put self-ref from index now that it has been flushed. */
296 relay_index_put(index);
297 }
298 return ret;
299 }
300
301 /*
302 * Close every relay index within a given stream, without flushing
303 * them.
304 */
305 void relay_index_close_all(struct relay_stream *stream)
306 {
307 struct lttng_ht_iter iter;
308 struct relay_index *index;
309
310 rcu_read_lock();
311 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
312 index, index_n.node) {
313 /* Put self-ref from index. */
314 relay_index_put(index);
315 }
316 rcu_read_unlock();
317 }
318
319 void relay_index_close_partial_fd(struct relay_stream *stream)
320 {
321 struct lttng_ht_iter iter;
322 struct relay_index *index;
323
324 rcu_read_lock();
325 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
326 index, index_n.node) {
327 if (!index->index_file) {
328 continue;
329 }
330 /*
331 * Partial index has its index_file: we have only
332 * received its info from the data socket.
333 * Put self-ref from index.
334 */
335 relay_index_put(index);
336 }
337 rcu_read_unlock();
338 }
339
340 uint64_t relay_index_find_last(struct relay_stream *stream)
341 {
342 struct lttng_ht_iter iter;
343 struct relay_index *index;
344 uint64_t net_seq_num = -1ULL;
345
346 rcu_read_lock();
347 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
348 index, index_n.node) {
349 if (net_seq_num == -1ULL ||
350 index->index_n.key > net_seq_num) {
351 net_seq_num = index->index_n.key;
352 }
353 }
354 rcu_read_unlock();
355 return net_seq_num;
356 }
357
358 /*
359 * Update the index file of an already existing relay_index.
360 * Offsets by 'removed_data_count' the offset field of an index.
361 */
362 static
363 int relay_index_switch_file(struct relay_index *index,
364 struct lttng_index_file *new_index_file,
365 uint64_t removed_data_count)
366 {
367 int ret = 0;
368 uint64_t offset;
369
370 pthread_mutex_lock(&index->lock);
371 if (!index->index_file) {
372 ERR("No index_file");
373 ret = 0;
374 goto end;
375 }
376
377 lttng_index_file_put(index->index_file);
378 lttng_index_file_get(new_index_file);
379 index->index_file = new_index_file;
380 offset = be64toh(index->index_data.offset);
381 index->index_data.offset = htobe64(offset - removed_data_count);
382
383 end:
384 pthread_mutex_unlock(&index->lock);
385 return ret;
386 }
387
388 /*
389 * Switch the index file of all pending indexes for a stream and update the
390 * data offset by substracting the last safe position.
391 * Stream lock must be held.
392 */
393 int relay_index_switch_all_files(struct relay_stream *stream)
394 {
395 struct lttng_ht_iter iter;
396 struct relay_index *index;
397 int ret = 0;
398
399 rcu_read_lock();
400 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
401 index, index_n.node) {
402 ret = relay_index_switch_file(index, stream->index_file,
403 stream->pos_after_last_complete_data_index);
404 if (ret) {
405 goto end;
406 }
407 }
408 end:
409 rcu_read_unlock();
410 return ret;
411 }
412
413 /*
414 * Set index data from the control port to a given index object.
415 */
416 int relay_index_set_control_data(struct relay_index *index,
417 const struct lttcomm_relayd_index *data,
418 unsigned int minor_version)
419 {
420 /* The index on disk is encoded in big endian. */
421 const struct ctf_packet_index index_data = {
422 .packet_size = htobe64(data->packet_size),
423 .content_size = htobe64(data->content_size),
424 .timestamp_begin = htobe64(data->timestamp_begin),
425 .timestamp_end = htobe64(data->timestamp_end),
426 .events_discarded = htobe64(data->events_discarded),
427 .stream_id = htobe64(data->stream_id),
428 };
429
430 if (minor_version >= 8) {
431 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
432 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
433 } else {
434 uint64_t unset_value = -1ULL;
435
436 index->index_data.stream_instance_id = htobe64(unset_value);
437 index->index_data.packet_seq_num = htobe64(unset_value);
438 }
439
440 return relay_index_set_data(index, &index_data);
441 }
This page took 0.037945 seconds and 4 git commands to generate.