Truncate exclusion names to have a terminal '\0'
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22
23 #include <common/common.h>
24 #include <common/utils.h>
25
26 #include "lttng-relayd.h"
27 #include "stream.h"
28 #include "index.h"
29
30 /*
31 * Allocate a new relay index object. Pass the stream in which it is
32 * contained as parameter. The sequence number will be used as the hash
33 * table key.
34 *
35 * Called with stream mutex held.
36 * Return allocated object or else NULL on error.
37 */
38 static struct relay_index *relay_index_create(struct relay_stream *stream,
39 uint64_t net_seq_num)
40 {
41 struct relay_index *index;
42
43 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
44 stream->stream_handle, net_seq_num);
45
46 index = zmalloc(sizeof(*index));
47 if (!index) {
48 PERROR("Relay index zmalloc");
49 goto end;
50 }
51 if (!stream_get(stream)) {
52 ERR("Cannot get stream");
53 free(index);
54 index = NULL;
55 goto end;
56 }
57 index->stream = stream;
58
59 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
60 pthread_mutex_init(&index->lock, NULL);
61 pthread_mutex_init(&index->reflock, NULL);
62 urcu_ref_init(&index->ref);
63
64 end:
65 return index;
66 }
67
68 /*
69 * Add unique relay index to the given hash table. In case of a collision, the
70 * already existing object is put in the given _index variable.
71 *
72 * RCU read side lock MUST be acquired.
73 */
74 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
75 struct relay_index *index)
76 {
77 struct cds_lfht_node *node_ptr;
78 struct relay_index *_index;
79
80 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
81 stream->stream_handle, index->index_n.key);
82
83 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
84 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
85 stream->indexes_ht->match_fct, &index->index_n,
86 &index->index_n.node);
87 if (node_ptr != &index->index_n.node) {
88 _index = caa_container_of(node_ptr, struct relay_index,
89 index_n.node);
90 } else {
91 _index = NULL;
92 }
93 return _index;
94 }
95
96 /*
97 * Should be called with RCU read-side lock held.
98 */
99 static bool relay_index_get(struct relay_index *index)
100 {
101 bool has_ref = false;
102
103 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
104 index->stream->stream_handle, index->index_n.key,
105 (int) index->ref.refcount);
106
107 /* Confirm that the index refcount has not reached 0. */
108 pthread_mutex_lock(&index->reflock);
109 if (index->ref.refcount != 0) {
110 has_ref = true;
111 urcu_ref_get(&index->ref);
112 }
113 pthread_mutex_unlock(&index->reflock);
114
115 return has_ref;
116 }
117
118 /*
119 * Get a relayd index in within the given stream, or create it if not
120 * present.
121 *
122 * Called with stream mutex held.
123 * Return index object or else NULL on error.
124 */
125 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
126 uint64_t net_seq_num)
127 {
128 struct lttng_ht_node_u64 *node;
129 struct lttng_ht_iter iter;
130 struct relay_index *index = NULL;
131
132 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
133 stream->stream_handle, net_seq_num);
134
135 rcu_read_lock();
136 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
137 node = lttng_ht_iter_get_node_u64(&iter);
138 if (node) {
139 index = caa_container_of(node, struct relay_index, index_n);
140 } else {
141 struct relay_index *oldindex;
142
143 index = relay_index_create(stream, net_seq_num);
144 if (!index) {
145 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
146 stream->stream_handle, net_seq_num);
147 goto end;
148 }
149 oldindex = relay_index_add_unique(stream, index);
150 if (oldindex) {
151 /* Added concurrently, keep old. */
152 relay_index_put(index);
153 index = oldindex;
154 if (!relay_index_get(index)) {
155 index = NULL;
156 }
157 } else {
158 stream->indexes_in_flight++;
159 index->in_hash_table = true;
160 }
161 }
162 end:
163 rcu_read_unlock();
164 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
165 (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
166 return index;
167 }
168
169 int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
170 uint64_t data_offset)
171 {
172 int ret = 0;
173
174 pthread_mutex_lock(&index->lock);
175 if (index->index_fd) {
176 ret = -1;
177 goto end;
178 }
179 stream_fd_get(index_fd);
180 index->index_fd = index_fd;
181 index->index_data.offset = data_offset;
182 end:
183 pthread_mutex_unlock(&index->lock);
184 return ret;
185 }
186
187 int relay_index_set_data(struct relay_index *index,
188 const struct ctf_packet_index *data)
189 {
190 int ret = 0;
191
192 pthread_mutex_lock(&index->lock);
193 if (index->has_index_data) {
194 ret = -1;
195 goto end;
196 }
197 /* Set everything except data_offset. */
198 index->index_data.packet_size = data->packet_size;
199 index->index_data.content_size = data->content_size;
200 index->index_data.timestamp_begin = data->timestamp_begin;
201 index->index_data.timestamp_end = data->timestamp_end;
202 index->index_data.events_discarded = data->events_discarded;
203 index->index_data.stream_id = data->stream_id;
204 index->has_index_data = true;
205 end:
206 pthread_mutex_unlock(&index->lock);
207 return ret;
208 }
209
210 static void index_destroy(struct relay_index *index)
211 {
212 free(index);
213 }
214
215 static void index_destroy_rcu(struct rcu_head *rcu_head)
216 {
217 struct relay_index *index =
218 caa_container_of(rcu_head, struct relay_index, rcu_node);
219
220 index_destroy(index);
221 }
222
223 /* Stream lock must be held by the caller. */
224 static void index_release(struct urcu_ref *ref)
225 {
226 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
227 struct relay_stream *stream = index->stream;
228 int ret;
229 struct lttng_ht_iter iter;
230
231 if (index->index_fd) {
232 stream_fd_put(index->index_fd);
233 index->index_fd = NULL;
234 }
235 if (index->in_hash_table) {
236 /* Delete index from hash table. */
237 iter.iter.node = &index->index_n.node;
238 ret = lttng_ht_del(stream->indexes_ht, &iter);
239 assert(!ret);
240 stream->indexes_in_flight--;
241 }
242
243 stream_put(index->stream);
244 index->stream = NULL;
245
246 call_rcu(&index->rcu_node, index_destroy_rcu);
247 }
248
249 /*
250 * Called with stream mutex held.
251 *
252 * Stream lock must be held by the caller.
253 */
254 void relay_index_put(struct relay_index *index)
255 {
256 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
257 index->stream->stream_handle, index->index_n.key,
258 (int) index->ref.refcount);
259 /*
260 * Ensure existance of index->lock for index unlock.
261 */
262 rcu_read_lock();
263 /*
264 * Index lock ensures that concurrent test and update of stream
265 * ref is atomic.
266 */
267 pthread_mutex_lock(&index->reflock);
268 assert(index->ref.refcount != 0);
269 urcu_ref_put(&index->ref, index_release);
270 pthread_mutex_unlock(&index->reflock);
271 rcu_read_unlock();
272 }
273
274 /*
275 * Try to flush index to disk. Releases self-reference to index once
276 * flush succeeds.
277 *
278 * Stream lock must be held by the caller.
279 * Return 0 on successful flush, a negative value on error, or positive
280 * value if no flush was performed.
281 */
282 int relay_index_try_flush(struct relay_index *index)
283 {
284 int ret = 1;
285 bool flushed = false;
286 int fd;
287
288 pthread_mutex_lock(&index->lock);
289 if (index->flushed) {
290 goto skip;
291 }
292 /* Check if we are ready to flush. */
293 if (!index->has_index_data || !index->index_fd) {
294 goto skip;
295 }
296 fd = index->index_fd->fd;
297 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
298 " on fd %d", index->stream->stream_handle,
299 index->index_n.key, fd);
300 flushed = true;
301 index->flushed = true;
302 ret = index_write(fd, &index->index_data, sizeof(index->index_data));
303 if (ret == sizeof(index->index_data)) {
304 ret = 0;
305 } else {
306 ret = -1;
307 }
308 skip:
309 pthread_mutex_unlock(&index->lock);
310
311 if (flushed) {
312 /* Put self-ref from index now that it has been flushed. */
313 relay_index_put(index);
314 }
315 return ret;
316 }
317
318 /*
319 * Close every relay index within a given stream, without flushing
320 * them.
321 */
322 void relay_index_close_all(struct relay_stream *stream)
323 {
324 struct lttng_ht_iter iter;
325 struct relay_index *index;
326
327 rcu_read_lock();
328 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
329 index, index_n.node) {
330 /* Put self-ref from index. */
331 relay_index_put(index);
332 }
333 rcu_read_unlock();
334 }
335
336 void relay_index_close_partial_fd(struct relay_stream *stream)
337 {
338 struct lttng_ht_iter iter;
339 struct relay_index *index;
340
341 rcu_read_lock();
342 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
343 index, index_n.node) {
344 if (!index->index_fd) {
345 continue;
346 }
347 /*
348 * Partial index has its index_fd: we have only
349 * received its info from the data socket.
350 * Put self-ref from index.
351 */
352 relay_index_put(index);
353 }
354 rcu_read_unlock();
355 }
356
357 uint64_t relay_index_find_last(struct relay_stream *stream)
358 {
359 struct lttng_ht_iter iter;
360 struct relay_index *index;
361 uint64_t net_seq_num = -1ULL;
362
363 rcu_read_lock();
364 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
365 index, index_n.node) {
366 if (net_seq_num == -1ULL ||
367 index->index_n.key > net_seq_num) {
368 net_seq_num = index->index_n.key;
369 }
370 }
371 rcu_read_unlock();
372 return net_seq_num;
373 }
This page took 0.036467 seconds and 4 git commands to generate.