92d4581d124a95921694129cba29213c08a7bd90
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22
23 #include <common/common.h>
24 #include <common/utils.h>
25 #include <common/compat/endian.h>
26
27 #include "lttng-relayd.h"
28 #include "stream.h"
29 #include "index.h"
30
31 /*
32 * Allocate a new relay index object. Pass the stream in which it is
33 * contained as parameter. The sequence number will be used as the hash
34 * table key.
35 *
36 * Called with stream mutex held.
37 * Return allocated object or else NULL on error.
38 */
39 static struct relay_index *relay_index_create(struct relay_stream *stream,
40 uint64_t net_seq_num)
41 {
42 struct relay_index *index;
43
44 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
45 stream->stream_handle, net_seq_num);
46
47 index = zmalloc(sizeof(*index));
48 if (!index) {
49 PERROR("Relay index zmalloc");
50 goto end;
51 }
52 if (!stream_get(stream)) {
53 ERR("Cannot get stream");
54 free(index);
55 index = NULL;
56 goto end;
57 }
58 index->stream = stream;
59
60 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
61 pthread_mutex_init(&index->lock, NULL);
62 urcu_ref_init(&index->ref);
63
64 end:
65 return index;
66 }
67
68 /*
69 * Add unique relay index to the given hash table. In case of a collision, the
70 * already existing object is put in the given _index variable.
71 *
72 * RCU read side lock MUST be acquired.
73 */
74 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
75 struct relay_index *index)
76 {
77 struct cds_lfht_node *node_ptr;
78 struct relay_index *_index;
79
80 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
81 stream->stream_handle, index->index_n.key);
82
83 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
84 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
85 stream->indexes_ht->match_fct, &index->index_n,
86 &index->index_n.node);
87 if (node_ptr != &index->index_n.node) {
88 _index = caa_container_of(node_ptr, struct relay_index,
89 index_n.node);
90 } else {
91 _index = NULL;
92 }
93 return _index;
94 }
95
96 /*
97 * Should be called with RCU read-side lock held.
98 */
99 static bool relay_index_get(struct relay_index *index)
100 {
101 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
102 index->stream->stream_handle, index->index_n.key,
103 (int) index->ref.refcount);
104
105 return urcu_ref_get_unless_zero(&index->ref);
106 }
107
108 /*
109 * Get a relayd index in within the given stream, or create it if not
110 * present.
111 *
112 * Called with stream mutex held.
113 * Return index object or else NULL on error.
114 */
115 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
116 uint64_t net_seq_num)
117 {
118 struct lttng_ht_node_u64 *node;
119 struct lttng_ht_iter iter;
120 struct relay_index *index = NULL;
121
122 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
123 stream->stream_handle, net_seq_num);
124
125 rcu_read_lock();
126 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
127 node = lttng_ht_iter_get_node_u64(&iter);
128 if (node) {
129 index = caa_container_of(node, struct relay_index, index_n);
130 } else {
131 struct relay_index *oldindex;
132
133 index = relay_index_create(stream, net_seq_num);
134 if (!index) {
135 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
136 stream->stream_handle, net_seq_num);
137 goto end;
138 }
139 oldindex = relay_index_add_unique(stream, index);
140 if (oldindex) {
141 /* Added concurrently, keep old. */
142 relay_index_put(index);
143 index = oldindex;
144 if (!relay_index_get(index)) {
145 index = NULL;
146 }
147 } else {
148 stream->indexes_in_flight++;
149 index->in_hash_table = true;
150 }
151 }
152 end:
153 rcu_read_unlock();
154 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
155 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
156 return index;
157 }
158
159 int relay_index_set_file(struct relay_index *index,
160 struct lttng_index_file *index_file,
161 uint64_t data_offset)
162 {
163 int ret = 0;
164
165 pthread_mutex_lock(&index->lock);
166 if (index->index_file) {
167 ret = -1;
168 goto end;
169 }
170 lttng_index_file_get(index_file);
171 index->index_file = index_file;
172 index->index_data.offset = data_offset;
173 end:
174 pthread_mutex_unlock(&index->lock);
175 return ret;
176 }
177
178 int relay_index_set_data(struct relay_index *index,
179 const struct ctf_packet_index *data)
180 {
181 int ret = 0;
182
183 pthread_mutex_lock(&index->lock);
184 if (index->has_index_data) {
185 ret = -1;
186 goto end;
187 }
188 /* Set everything except data_offset. */
189 index->index_data.packet_size = data->packet_size;
190 index->index_data.content_size = data->content_size;
191 index->index_data.timestamp_begin = data->timestamp_begin;
192 index->index_data.timestamp_end = data->timestamp_end;
193 index->index_data.events_discarded = data->events_discarded;
194 index->index_data.stream_id = data->stream_id;
195 index->has_index_data = true;
196 end:
197 pthread_mutex_unlock(&index->lock);
198 return ret;
199 }
200
201 static void index_destroy(struct relay_index *index)
202 {
203 free(index);
204 }
205
206 static void index_destroy_rcu(struct rcu_head *rcu_head)
207 {
208 struct relay_index *index =
209 caa_container_of(rcu_head, struct relay_index, rcu_node);
210
211 index_destroy(index);
212 }
213
214 /* Stream lock must be held by the caller. */
215 static void index_release(struct urcu_ref *ref)
216 {
217 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
218 struct relay_stream *stream = index->stream;
219 int ret;
220 struct lttng_ht_iter iter;
221
222 if (index->index_file) {
223 lttng_index_file_put(index->index_file);
224 index->index_file = NULL;
225 }
226 if (index->in_hash_table) {
227 /* Delete index from hash table. */
228 iter.iter.node = &index->index_n.node;
229 ret = lttng_ht_del(stream->indexes_ht, &iter);
230 assert(!ret);
231 stream->indexes_in_flight--;
232 }
233
234 stream_put(index->stream);
235 index->stream = NULL;
236
237 call_rcu(&index->rcu_node, index_destroy_rcu);
238 }
239
240 /*
241 * Called with stream mutex held.
242 *
243 * Stream lock must be held by the caller.
244 */
245 void relay_index_put(struct relay_index *index)
246 {
247 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
248 index->stream->stream_handle, index->index_n.key,
249 (int) index->ref.refcount);
250 /*
251 * Ensure existance of index->lock for index unlock.
252 */
253 rcu_read_lock();
254 /*
255 * Index lock ensures that concurrent test and update of stream
256 * ref is atomic.
257 */
258 assert(index->ref.refcount != 0);
259 urcu_ref_put(&index->ref, index_release);
260 rcu_read_unlock();
261 }
262
263 /*
264 * Try to flush index to disk. Releases self-reference to index once
265 * flush succeeds.
266 *
267 * Stream lock must be held by the caller.
268 * Return 0 on successful flush, a negative value on error, or positive
269 * value if no flush was performed.
270 */
271 int relay_index_try_flush(struct relay_index *index)
272 {
273 int ret = 1;
274 bool flushed = false;
275 int fd;
276
277 pthread_mutex_lock(&index->lock);
278 if (index->flushed) {
279 goto skip;
280 }
281 /* Check if we are ready to flush. */
282 if (!index->has_index_data || !index->index_file) {
283 goto skip;
284 }
285 fd = index->index_file->fd;
286 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
287 " on fd %d", index->stream->stream_handle,
288 index->index_n.key, fd);
289 flushed = true;
290 index->flushed = true;
291 ret = lttng_index_file_write(index->index_file, &index->index_data);
292 skip:
293 pthread_mutex_unlock(&index->lock);
294
295 if (flushed) {
296 /* Put self-ref from index now that it has been flushed. */
297 relay_index_put(index);
298 }
299 return ret;
300 }
301
302 /*
303 * Close every relay index within a given stream, without flushing
304 * them.
305 */
306 void relay_index_close_all(struct relay_stream *stream)
307 {
308 struct lttng_ht_iter iter;
309 struct relay_index *index;
310
311 rcu_read_lock();
312 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
313 index, index_n.node) {
314 /* Put self-ref from index. */
315 relay_index_put(index);
316 }
317 rcu_read_unlock();
318 }
319
320 void relay_index_close_partial_fd(struct relay_stream *stream)
321 {
322 struct lttng_ht_iter iter;
323 struct relay_index *index;
324
325 rcu_read_lock();
326 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
327 index, index_n.node) {
328 if (!index->index_file) {
329 continue;
330 }
331 /*
332 * Partial index has its index_file: we have only
333 * received its info from the data socket.
334 * Put self-ref from index.
335 */
336 relay_index_put(index);
337 }
338 rcu_read_unlock();
339 }
340
341 uint64_t relay_index_find_last(struct relay_stream *stream)
342 {
343 struct lttng_ht_iter iter;
344 struct relay_index *index;
345 uint64_t net_seq_num = -1ULL;
346
347 rcu_read_lock();
348 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
349 index, index_n.node) {
350 if (net_seq_num == -1ULL ||
351 index->index_n.key > net_seq_num) {
352 net_seq_num = index->index_n.key;
353 }
354 }
355 rcu_read_unlock();
356 return net_seq_num;
357 }
358
359 /*
360 * Update the index file of an already existing relay_index.
361 * Offsets by 'removed_data_count' the offset field of an index.
362 */
363 static
364 int relay_index_switch_file(struct relay_index *index,
365 struct lttng_index_file *new_index_file,
366 uint64_t removed_data_count)
367 {
368 int ret = 0;
369 uint64_t offset;
370
371 pthread_mutex_lock(&index->lock);
372 if (!index->index_file) {
373 ERR("No index_file");
374 ret = 0;
375 goto end;
376 }
377
378 lttng_index_file_put(index->index_file);
379 lttng_index_file_get(new_index_file);
380 index->index_file = new_index_file;
381 offset = be64toh(index->index_data.offset);
382 index->index_data.offset = htobe64(offset - removed_data_count);
383
384 end:
385 pthread_mutex_unlock(&index->lock);
386 return ret;
387 }
388
389 /*
390 * Switch the index file of all pending indexes for a stream and update the
391 * data offset by substracting the last safe position.
392 * Stream lock must be held.
393 */
394 int relay_index_switch_all_files(struct relay_stream *stream)
395 {
396 struct lttng_ht_iter iter;
397 struct relay_index *index;
398 int ret = 0;
399
400 rcu_read_lock();
401 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
402 index, index_n.node) {
403 DBG("Update index to fd %d", stream->index_file->fd);
404 ret = relay_index_switch_file(index, stream->index_file,
405 stream->pos_after_last_complete_data_index);
406 if (ret) {
407 goto end;
408 }
409 }
410 end:
411 rcu_read_unlock();
412 return ret;
413 }
This page took 0.035923 seconds and 3 git commands to generate.