Fix: use "flush empty" ioctl for snapshots
[lttng-tools.git] / src / bin / lttng-relayd / index.c
CommitLineData
1c20f0e2
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
7591bab1 4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2
JD
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
6c1c0768 20#define _LGPL_SOURCE
1c20f0e2
JD
21#include <assert.h>
22
23#include <common/common.h>
24#include <common/utils.h>
25
0a6518b0 26#include "lttng-relayd.h"
7591bab1 27#include "stream.h"
1c20f0e2
JD
28#include "index.h"
29
30/*
7591bab1
MD
31 * Allocate a new relay index object. Pass the stream in which it is
32 * contained as parameter. The sequence number will be used as the hash
33 * table key.
34 *
35 * Called with stream mutex held.
36 * Return allocated object or else NULL on error.
1c20f0e2 37 */
7591bab1
MD
38static struct relay_index *relay_index_create(struct relay_stream *stream,
39 uint64_t net_seq_num)
1c20f0e2 40{
7591bab1 41 struct relay_index *index;
1c20f0e2 42
7591bab1
MD
43 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
44 stream->stream_handle, net_seq_num);
1c20f0e2 45
7591bab1
MD
46 index = zmalloc(sizeof(*index));
47 if (!index) {
48 PERROR("Relay index zmalloc");
49 goto end;
50 }
51 if (!stream_get(stream)) {
52 ERR("Cannot get stream");
53 free(index);
54 index = NULL;
55 goto end;
1c20f0e2 56 }
7591bab1 57 index->stream = stream;
1c20f0e2 58
7591bab1
MD
59 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
60 pthread_mutex_init(&index->lock, NULL);
61 pthread_mutex_init(&index->reflock, NULL);
62 urcu_ref_init(&index->ref);
63
64end:
65 return index;
1c20f0e2
JD
66}
67
68/*
7591bab1
MD
69 * Add unique relay index to the given hash table. In case of a collision, the
70 * already existing object is put in the given _index variable.
1c20f0e2 71 *
7591bab1 72 * RCU read side lock MUST be acquired.
1c20f0e2 73 */
7591bab1
MD
74static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
75 struct relay_index *index)
1c20f0e2 76{
7591bab1
MD
77 struct cds_lfht_node *node_ptr;
78 struct relay_index *_index;
1c20f0e2 79
7591bab1
MD
80 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
81 stream->stream_handle, index->index_n.key);
1c20f0e2 82
7591bab1
MD
83 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
84 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
85 stream->indexes_ht->match_fct, &index->index_n,
86 &index->index_n.node);
87 if (node_ptr != &index->index_n.node) {
88 _index = caa_container_of(node_ptr, struct relay_index,
89 index_n.node);
90 } else {
91 _index = NULL;
1c20f0e2 92 }
7591bab1
MD
93 return _index;
94}
95
96/*
97 * Should be called with RCU read-side lock held.
98 */
99static bool relay_index_get(struct relay_index *index)
100{
101 bool has_ref = false;
1c20f0e2 102
7591bab1
MD
103 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
104 index->stream->stream_handle, index->index_n.key,
105 (int) index->ref.refcount);
1c20f0e2 106
7591bab1
MD
107 /* Confirm that the index refcount has not reached 0. */
108 pthread_mutex_lock(&index->reflock);
109 if (index->ref.refcount != 0) {
110 has_ref = true;
111 urcu_ref_get(&index->ref);
112 }
113 pthread_mutex_unlock(&index->reflock);
114
115 return has_ref;
1c20f0e2
JD
116}
117
118/*
7591bab1
MD
119 * Get a relayd index in within the given stream, or create it if not
120 * present.
1c20f0e2 121 *
7591bab1 122 * Called with stream mutex held.
1c20f0e2
JD
123 * Return index object or else NULL on error.
124 */
7591bab1
MD
125struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
126 uint64_t net_seq_num)
1c20f0e2 127{
7591bab1 128 struct lttng_ht_node_u64 *node;
1c20f0e2 129 struct lttng_ht_iter iter;
1c20f0e2
JD
130 struct relay_index *index = NULL;
131
1c20f0e2 132 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
7591bab1 133 stream->stream_handle, net_seq_num);
1c20f0e2 134
7591bab1
MD
135 rcu_read_lock();
136 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
137 node = lttng_ht_iter_get_node_u64(&iter);
138 if (node) {
139 index = caa_container_of(node, struct relay_index, index_n);
140 } else {
141 struct relay_index *oldindex;
142
143 index = relay_index_create(stream, net_seq_num);
144 if (!index) {
145 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
811a4037 146 stream->stream_handle, net_seq_num);
7591bab1
MD
147 goto end;
148 }
149 oldindex = relay_index_add_unique(stream, index);
150 if (oldindex) {
151 /* Added concurrently, keep old. */
152 relay_index_put(index);
153 index = oldindex;
154 if (!relay_index_get(index)) {
155 index = NULL;
156 }
157 } else {
158 stream->indexes_in_flight++;
159 index->in_hash_table = true;
160 }
1c20f0e2 161 }
1c20f0e2 162end:
7591bab1
MD
163 rcu_read_unlock();
164 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
719155c2 165 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
1c20f0e2
JD
166 return index;
167}
168
f8f3885c
MD
169int relay_index_set_file(struct relay_index *index,
170 struct lttng_index_file *index_file,
7591bab1 171 uint64_t data_offset)
1c20f0e2 172{
7591bab1 173 int ret = 0;
1c20f0e2 174
7591bab1 175 pthread_mutex_lock(&index->lock);
f8f3885c 176 if (index->index_file) {
7591bab1
MD
177 ret = -1;
178 goto end;
179 }
f8f3885c
MD
180 lttng_index_file_get(index_file);
181 index->index_file = index_file;
7591bab1
MD
182 index->index_data.offset = data_offset;
183end:
184 pthread_mutex_unlock(&index->lock);
185 return ret;
186}
1c20f0e2 187
7591bab1
MD
188int relay_index_set_data(struct relay_index *index,
189 const struct ctf_packet_index *data)
190{
191 int ret = 0;
1c20f0e2 192
7591bab1
MD
193 pthread_mutex_lock(&index->lock);
194 if (index->has_index_data) {
195 ret = -1;
196 goto end;
1c20f0e2 197 }
7591bab1
MD
198 /* Set everything except data_offset. */
199 index->index_data.packet_size = data->packet_size;
200 index->index_data.content_size = data->content_size;
201 index->index_data.timestamp_begin = data->timestamp_begin;
202 index->index_data.timestamp_end = data->timestamp_end;
203 index->index_data.events_discarded = data->events_discarded;
204 index->index_data.stream_id = data->stream_id;
205 index->has_index_data = true;
206end:
207 pthread_mutex_unlock(&index->lock);
208 return ret;
1c20f0e2
JD
209}
210
7591bab1 211static void index_destroy(struct relay_index *index)
1c20f0e2 212{
7591bab1
MD
213 free(index);
214}
1c20f0e2 215
7591bab1
MD
216static void index_destroy_rcu(struct rcu_head *rcu_head)
217{
218 struct relay_index *index =
219 caa_container_of(rcu_head, struct relay_index, rcu_node);
1c20f0e2 220
7591bab1 221 index_destroy(index);
1c20f0e2
JD
222}
223
7591bab1
MD
224/* Stream lock must be held by the caller. */
225static void index_release(struct urcu_ref *ref)
1c20f0e2 226{
7591bab1
MD
227 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
228 struct relay_stream *stream = index->stream;
229 int ret;
230 struct lttng_ht_iter iter;
231
f8f3885c
MD
232 if (index->index_file) {
233 lttng_index_file_put(index->index_file);
234 index->index_file = NULL;
7591bab1
MD
235 }
236 if (index->in_hash_table) {
237 /* Delete index from hash table. */
238 iter.iter.node = &index->index_n.node;
239 ret = lttng_ht_del(stream->indexes_ht, &iter);
240 assert(!ret);
241 stream->indexes_in_flight--;
242 }
243
244 stream_put(index->stream);
245 index->stream = NULL;
246
247 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
248}
249
250/*
7591bab1
MD
251 * Called with stream mutex held.
252 *
253 * Stream lock must be held by the caller.
1c20f0e2 254 */
7591bab1 255void relay_index_put(struct relay_index *index)
1c20f0e2 256{
7591bab1
MD
257 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
258 index->stream->stream_handle, index->index_n.key,
259 (int) index->ref.refcount);
260 /*
261 * Ensure existance of index->lock for index unlock.
262 */
263 rcu_read_lock();
264 /*
265 * Index lock ensures that concurrent test and update of stream
266 * ref is atomic.
267 */
268 pthread_mutex_lock(&index->reflock);
269 assert(index->ref.refcount != 0);
270 urcu_ref_put(&index->ref, index_release);
271 pthread_mutex_unlock(&index->reflock);
272 rcu_read_unlock();
1c20f0e2
JD
273}
274
275/*
7591bab1
MD
276 * Try to flush index to disk. Releases self-reference to index once
277 * flush succeeds.
1c20f0e2 278 *
7591bab1
MD
279 * Stream lock must be held by the caller.
280 * Return 0 on successful flush, a negative value on error, or positive
281 * value if no flush was performed.
1c20f0e2 282 */
7591bab1 283int relay_index_try_flush(struct relay_index *index)
1c20f0e2 284{
7591bab1
MD
285 int ret = 1;
286 bool flushed = false;
287 int fd;
1c20f0e2 288
7591bab1
MD
289 pthread_mutex_lock(&index->lock);
290 if (index->flushed) {
291 goto skip;
292 }
293 /* Check if we are ready to flush. */
f8f3885c 294 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
295 goto skip;
296 }
f8f3885c 297 fd = index->index_file->fd;
7591bab1
MD
298 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
299 " on fd %d", index->stream->stream_handle,
300 index->index_n.key, fd);
301 flushed = true;
302 index->flushed = true;
f8f3885c 303 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
304skip:
305 pthread_mutex_unlock(&index->lock);
1c20f0e2 306
7591bab1
MD
307 if (flushed) {
308 /* Put self-ref from index now that it has been flushed. */
309 relay_index_put(index);
310 }
311 return ret;
1c20f0e2
JD
312}
313
314/*
7591bab1
MD
315 * Close every relay index within a given stream, without flushing
316 * them.
1c20f0e2 317 */
7591bab1 318void relay_index_close_all(struct relay_stream *stream)
1c20f0e2
JD
319{
320 struct lttng_ht_iter iter;
321 struct relay_index *index;
322
1c20f0e2 323 rcu_read_lock();
7591bab1
MD
324 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
325 index, index_n.node) {
326 /* Put self-ref from index. */
327 relay_index_put(index);
1c20f0e2
JD
328 }
329 rcu_read_unlock();
330}
3d07a857
MD
331
332void relay_index_close_partial_fd(struct relay_stream *stream)
333{
334 struct lttng_ht_iter iter;
335 struct relay_index *index;
336
337 rcu_read_lock();
338 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
339 index, index_n.node) {
f8f3885c 340 if (!index->index_file) {
3d07a857
MD
341 continue;
342 }
343 /*
f8f3885c 344 * Partial index has its index_file: we have only
3d07a857
MD
345 * received its info from the data socket.
346 * Put self-ref from index.
347 */
348 relay_index_put(index);
349 }
350 rcu_read_unlock();
351}
352
353uint64_t relay_index_find_last(struct relay_stream *stream)
354{
355 struct lttng_ht_iter iter;
356 struct relay_index *index;
357 uint64_t net_seq_num = -1ULL;
358
359 rcu_read_lock();
360 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
361 index, index_n.node) {
362 if (net_seq_num == -1ULL ||
363 index->index_n.key > net_seq_num) {
364 net_seq_num = index->index_n.key;
365 }
366 }
367 rcu_read_unlock();
368 return net_seq_num;
369}
This page took 0.049439 seconds and 4 git commands to generate.