Channel rotate pipe between sessiond and the consumers
[lttng-tools.git] / src / bin / lttng-relayd / index.c
CommitLineData
1c20f0e2
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
7591bab1 4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2
JD
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
6c1c0768 20#define _LGPL_SOURCE
1c20f0e2
JD
21#include <assert.h>
22
23#include <common/common.h>
24#include <common/utils.h>
25
0a6518b0 26#include "lttng-relayd.h"
7591bab1 27#include "stream.h"
1c20f0e2
JD
28#include "index.h"
29
30/*
7591bab1
MD
31 * Allocate a new relay index object. Pass the stream in which it is
32 * contained as parameter. The sequence number will be used as the hash
33 * table key.
34 *
35 * Called with stream mutex held.
36 * Return allocated object or else NULL on error.
1c20f0e2 37 */
7591bab1
MD
38static struct relay_index *relay_index_create(struct relay_stream *stream,
39 uint64_t net_seq_num)
1c20f0e2 40{
7591bab1 41 struct relay_index *index;
1c20f0e2 42
7591bab1
MD
43 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
44 stream->stream_handle, net_seq_num);
1c20f0e2 45
7591bab1
MD
46 index = zmalloc(sizeof(*index));
47 if (!index) {
48 PERROR("Relay index zmalloc");
49 goto end;
50 }
51 if (!stream_get(stream)) {
52 ERR("Cannot get stream");
53 free(index);
54 index = NULL;
55 goto end;
1c20f0e2 56 }
7591bab1 57 index->stream = stream;
1c20f0e2 58
7591bab1
MD
59 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
60 pthread_mutex_init(&index->lock, NULL);
7591bab1
MD
61 urcu_ref_init(&index->ref);
62
63end:
64 return index;
1c20f0e2
JD
65}
66
67/*
7591bab1
MD
68 * Add unique relay index to the given hash table. In case of a collision, the
69 * already existing object is put in the given _index variable.
1c20f0e2 70 *
7591bab1 71 * RCU read side lock MUST be acquired.
1c20f0e2 72 */
7591bab1
MD
73static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
74 struct relay_index *index)
1c20f0e2 75{
7591bab1
MD
76 struct cds_lfht_node *node_ptr;
77 struct relay_index *_index;
1c20f0e2 78
7591bab1
MD
79 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
80 stream->stream_handle, index->index_n.key);
1c20f0e2 81
7591bab1
MD
82 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
83 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
84 stream->indexes_ht->match_fct, &index->index_n,
85 &index->index_n.node);
86 if (node_ptr != &index->index_n.node) {
87 _index = caa_container_of(node_ptr, struct relay_index,
88 index_n.node);
89 } else {
90 _index = NULL;
1c20f0e2 91 }
7591bab1
MD
92 return _index;
93}
94
95/*
96 * Should be called with RCU read-side lock held.
97 */
98static bool relay_index_get(struct relay_index *index)
99{
7591bab1
MD
100 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
101 index->stream->stream_handle, index->index_n.key,
102 (int) index->ref.refcount);
1c20f0e2 103
ce4d4083 104 return urcu_ref_get_unless_zero(&index->ref);
1c20f0e2
JD
105}
106
107/*
7591bab1
MD
108 * Get a relayd index in within the given stream, or create it if not
109 * present.
1c20f0e2 110 *
7591bab1 111 * Called with stream mutex held.
1c20f0e2
JD
112 * Return index object or else NULL on error.
113 */
7591bab1
MD
114struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
115 uint64_t net_seq_num)
1c20f0e2 116{
7591bab1 117 struct lttng_ht_node_u64 *node;
1c20f0e2 118 struct lttng_ht_iter iter;
1c20f0e2
JD
119 struct relay_index *index = NULL;
120
1c20f0e2 121 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
7591bab1 122 stream->stream_handle, net_seq_num);
1c20f0e2 123
7591bab1
MD
124 rcu_read_lock();
125 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
126 node = lttng_ht_iter_get_node_u64(&iter);
127 if (node) {
128 index = caa_container_of(node, struct relay_index, index_n);
129 } else {
130 struct relay_index *oldindex;
131
132 index = relay_index_create(stream, net_seq_num);
133 if (!index) {
134 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
811a4037 135 stream->stream_handle, net_seq_num);
7591bab1
MD
136 goto end;
137 }
138 oldindex = relay_index_add_unique(stream, index);
139 if (oldindex) {
140 /* Added concurrently, keep old. */
141 relay_index_put(index);
142 index = oldindex;
143 if (!relay_index_get(index)) {
144 index = NULL;
145 }
146 } else {
147 stream->indexes_in_flight++;
148 index->in_hash_table = true;
149 }
1c20f0e2 150 }
1c20f0e2 151end:
7591bab1
MD
152 rcu_read_unlock();
153 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
719155c2 154 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
1c20f0e2
JD
155 return index;
156}
157
f8f3885c
MD
158int relay_index_set_file(struct relay_index *index,
159 struct lttng_index_file *index_file,
7591bab1 160 uint64_t data_offset)
1c20f0e2 161{
7591bab1 162 int ret = 0;
1c20f0e2 163
7591bab1 164 pthread_mutex_lock(&index->lock);
f8f3885c 165 if (index->index_file) {
7591bab1
MD
166 ret = -1;
167 goto end;
168 }
f8f3885c
MD
169 lttng_index_file_get(index_file);
170 index->index_file = index_file;
7591bab1
MD
171 index->index_data.offset = data_offset;
172end:
173 pthread_mutex_unlock(&index->lock);
174 return ret;
175}
1c20f0e2 176
7591bab1
MD
177int relay_index_set_data(struct relay_index *index,
178 const struct ctf_packet_index *data)
179{
180 int ret = 0;
1c20f0e2 181
7591bab1
MD
182 pthread_mutex_lock(&index->lock);
183 if (index->has_index_data) {
184 ret = -1;
185 goto end;
1c20f0e2 186 }
7591bab1
MD
187 /* Set everything except data_offset. */
188 index->index_data.packet_size = data->packet_size;
189 index->index_data.content_size = data->content_size;
190 index->index_data.timestamp_begin = data->timestamp_begin;
191 index->index_data.timestamp_end = data->timestamp_end;
192 index->index_data.events_discarded = data->events_discarded;
193 index->index_data.stream_id = data->stream_id;
194 index->has_index_data = true;
195end:
196 pthread_mutex_unlock(&index->lock);
197 return ret;
1c20f0e2
JD
198}
199
7591bab1 200static void index_destroy(struct relay_index *index)
1c20f0e2 201{
7591bab1
MD
202 free(index);
203}
1c20f0e2 204
7591bab1
MD
205static void index_destroy_rcu(struct rcu_head *rcu_head)
206{
207 struct relay_index *index =
208 caa_container_of(rcu_head, struct relay_index, rcu_node);
1c20f0e2 209
7591bab1 210 index_destroy(index);
1c20f0e2
JD
211}
212
7591bab1
MD
213/* Stream lock must be held by the caller. */
214static void index_release(struct urcu_ref *ref)
1c20f0e2 215{
7591bab1
MD
216 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
217 struct relay_stream *stream = index->stream;
218 int ret;
219 struct lttng_ht_iter iter;
220
f8f3885c
MD
221 if (index->index_file) {
222 lttng_index_file_put(index->index_file);
223 index->index_file = NULL;
7591bab1
MD
224 }
225 if (index->in_hash_table) {
226 /* Delete index from hash table. */
227 iter.iter.node = &index->index_n.node;
228 ret = lttng_ht_del(stream->indexes_ht, &iter);
229 assert(!ret);
230 stream->indexes_in_flight--;
231 }
232
233 stream_put(index->stream);
234 index->stream = NULL;
235
236 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
237}
238
239/*
7591bab1
MD
240 * Called with stream mutex held.
241 *
242 * Stream lock must be held by the caller.
1c20f0e2 243 */
7591bab1 244void relay_index_put(struct relay_index *index)
1c20f0e2 245{
7591bab1
MD
246 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
247 index->stream->stream_handle, index->index_n.key,
248 (int) index->ref.refcount);
249 /*
250 * Ensure existance of index->lock for index unlock.
251 */
252 rcu_read_lock();
253 /*
254 * Index lock ensures that concurrent test and update of stream
255 * ref is atomic.
256 */
7591bab1
MD
257 assert(index->ref.refcount != 0);
258 urcu_ref_put(&index->ref, index_release);
7591bab1 259 rcu_read_unlock();
1c20f0e2
JD
260}
261
262/*
7591bab1
MD
263 * Try to flush index to disk. Releases self-reference to index once
264 * flush succeeds.
1c20f0e2 265 *
7591bab1
MD
266 * Stream lock must be held by the caller.
267 * Return 0 on successful flush, a negative value on error, or positive
268 * value if no flush was performed.
1c20f0e2 269 */
7591bab1 270int relay_index_try_flush(struct relay_index *index)
1c20f0e2 271{
7591bab1
MD
272 int ret = 1;
273 bool flushed = false;
274 int fd;
1c20f0e2 275
7591bab1
MD
276 pthread_mutex_lock(&index->lock);
277 if (index->flushed) {
278 goto skip;
279 }
280 /* Check if we are ready to flush. */
f8f3885c 281 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
282 goto skip;
283 }
f8f3885c 284 fd = index->index_file->fd;
7591bab1
MD
285 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
286 " on fd %d", index->stream->stream_handle,
287 index->index_n.key, fd);
288 flushed = true;
289 index->flushed = true;
f8f3885c 290 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
291skip:
292 pthread_mutex_unlock(&index->lock);
1c20f0e2 293
7591bab1
MD
294 if (flushed) {
295 /* Put self-ref from index now that it has been flushed. */
296 relay_index_put(index);
297 }
298 return ret;
1c20f0e2
JD
299}
300
301/*
7591bab1
MD
302 * Close every relay index within a given stream, without flushing
303 * them.
1c20f0e2 304 */
7591bab1 305void relay_index_close_all(struct relay_stream *stream)
1c20f0e2
JD
306{
307 struct lttng_ht_iter iter;
308 struct relay_index *index;
309
1c20f0e2 310 rcu_read_lock();
7591bab1
MD
311 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
312 index, index_n.node) {
313 /* Put self-ref from index. */
314 relay_index_put(index);
1c20f0e2
JD
315 }
316 rcu_read_unlock();
317}
3d07a857
MD
318
319void relay_index_close_partial_fd(struct relay_stream *stream)
320{
321 struct lttng_ht_iter iter;
322 struct relay_index *index;
323
324 rcu_read_lock();
325 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
326 index, index_n.node) {
f8f3885c 327 if (!index->index_file) {
3d07a857
MD
328 continue;
329 }
330 /*
f8f3885c 331 * Partial index has its index_file: we have only
3d07a857
MD
332 * received its info from the data socket.
333 * Put self-ref from index.
334 */
335 relay_index_put(index);
336 }
337 rcu_read_unlock();
338}
339
340uint64_t relay_index_find_last(struct relay_stream *stream)
341{
342 struct lttng_ht_iter iter;
343 struct relay_index *index;
344 uint64_t net_seq_num = -1ULL;
345
346 rcu_read_lock();
347 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
348 index, index_n.node) {
349 if (net_seq_num == -1ULL ||
350 index->index_n.key > net_seq_num) {
351 net_seq_num = index->index_n.key;
352 }
353 }
354 rcu_read_unlock();
355 return net_seq_num;
356}
This page took 0.050098 seconds and 4 git commands to generate.