relay: use urcu_ref_get_unless_zero
[lttng-tools.git] / src / bin / lttng-relayd / index.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22
23 #include <common/common.h>
24 #include <common/utils.h>
25
26 #include "lttng-relayd.h"
27 #include "stream.h"
28 #include "index.h"
29
30 /*
31 * Allocate a new relay index object. Pass the stream in which it is
32 * contained as parameter. The sequence number will be used as the hash
33 * table key.
34 *
35 * Called with stream mutex held.
36 * Return allocated object or else NULL on error.
37 */
38 static struct relay_index *relay_index_create(struct relay_stream *stream,
39 uint64_t net_seq_num)
40 {
41 struct relay_index *index;
42
43 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
44 stream->stream_handle, net_seq_num);
45
46 index = zmalloc(sizeof(*index));
47 if (!index) {
48 PERROR("Relay index zmalloc");
49 goto end;
50 }
51 if (!stream_get(stream)) {
52 ERR("Cannot get stream");
53 free(index);
54 index = NULL;
55 goto end;
56 }
57 index->stream = stream;
58
59 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
60 pthread_mutex_init(&index->lock, NULL);
61 urcu_ref_init(&index->ref);
62
63 end:
64 return index;
65 }
66
67 /*
68 * Add unique relay index to the given hash table. In case of a collision, the
69 * already existing object is put in the given _index variable.
70 *
71 * RCU read side lock MUST be acquired.
72 */
73 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
74 struct relay_index *index)
75 {
76 struct cds_lfht_node *node_ptr;
77 struct relay_index *_index;
78
79 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
80 stream->stream_handle, index->index_n.key);
81
82 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
83 stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
84 stream->indexes_ht->match_fct, &index->index_n,
85 &index->index_n.node);
86 if (node_ptr != &index->index_n.node) {
87 _index = caa_container_of(node_ptr, struct relay_index,
88 index_n.node);
89 } else {
90 _index = NULL;
91 }
92 return _index;
93 }
94
95 /*
96 * Should be called with RCU read-side lock held.
97 */
98 static bool relay_index_get(struct relay_index *index)
99 {
100 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
101 index->stream->stream_handle, index->index_n.key,
102 (int) index->ref.refcount);
103
104 return urcu_ref_get_unless_zero(&index->ref);
105 }
106
107 /*
108 * Get a relayd index in within the given stream, or create it if not
109 * present.
110 *
111 * Called with stream mutex held.
112 * Return index object or else NULL on error.
113 */
114 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
115 uint64_t net_seq_num)
116 {
117 struct lttng_ht_node_u64 *node;
118 struct lttng_ht_iter iter;
119 struct relay_index *index = NULL;
120
121 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
122 stream->stream_handle, net_seq_num);
123
124 rcu_read_lock();
125 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
126 node = lttng_ht_iter_get_node_u64(&iter);
127 if (node) {
128 index = caa_container_of(node, struct relay_index, index_n);
129 } else {
130 struct relay_index *oldindex;
131
132 index = relay_index_create(stream, net_seq_num);
133 if (!index) {
134 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
135 stream->stream_handle, net_seq_num);
136 goto end;
137 }
138 oldindex = relay_index_add_unique(stream, index);
139 if (oldindex) {
140 /* Added concurrently, keep old. */
141 relay_index_put(index);
142 index = oldindex;
143 if (!relay_index_get(index)) {
144 index = NULL;
145 }
146 } else {
147 stream->indexes_in_flight++;
148 index->in_hash_table = true;
149 }
150 }
151 end:
152 rcu_read_unlock();
153 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
154 (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
155 return index;
156 }
157
158 int relay_index_set_file(struct relay_index *index,
159 struct lttng_index_file *index_file,
160 uint64_t data_offset)
161 {
162 int ret = 0;
163
164 pthread_mutex_lock(&index->lock);
165 if (index->index_file) {
166 ret = -1;
167 goto end;
168 }
169 lttng_index_file_get(index_file);
170 index->index_file = index_file;
171 index->index_data.offset = data_offset;
172 end:
173 pthread_mutex_unlock(&index->lock);
174 return ret;
175 }
176
177 int relay_index_set_data(struct relay_index *index,
178 const struct ctf_packet_index *data)
179 {
180 int ret = 0;
181
182 pthread_mutex_lock(&index->lock);
183 if (index->has_index_data) {
184 ret = -1;
185 goto end;
186 }
187 /* Set everything except data_offset. */
188 index->index_data.packet_size = data->packet_size;
189 index->index_data.content_size = data->content_size;
190 index->index_data.timestamp_begin = data->timestamp_begin;
191 index->index_data.timestamp_end = data->timestamp_end;
192 index->index_data.events_discarded = data->events_discarded;
193 index->index_data.stream_id = data->stream_id;
194 index->has_index_data = true;
195 end:
196 pthread_mutex_unlock(&index->lock);
197 return ret;
198 }
199
200 static void index_destroy(struct relay_index *index)
201 {
202 free(index);
203 }
204
205 static void index_destroy_rcu(struct rcu_head *rcu_head)
206 {
207 struct relay_index *index =
208 caa_container_of(rcu_head, struct relay_index, rcu_node);
209
210 index_destroy(index);
211 }
212
213 /* Stream lock must be held by the caller. */
214 static void index_release(struct urcu_ref *ref)
215 {
216 struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
217 struct relay_stream *stream = index->stream;
218 int ret;
219 struct lttng_ht_iter iter;
220
221 if (index->index_file) {
222 lttng_index_file_put(index->index_file);
223 index->index_file = NULL;
224 }
225 if (index->in_hash_table) {
226 /* Delete index from hash table. */
227 iter.iter.node = &index->index_n.node;
228 ret = lttng_ht_del(stream->indexes_ht, &iter);
229 assert(!ret);
230 stream->indexes_in_flight--;
231 }
232
233 stream_put(index->stream);
234 index->stream = NULL;
235
236 call_rcu(&index->rcu_node, index_destroy_rcu);
237 }
238
239 /*
240 * Called with stream mutex held.
241 *
242 * Stream lock must be held by the caller.
243 */
244 void relay_index_put(struct relay_index *index)
245 {
246 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
247 index->stream->stream_handle, index->index_n.key,
248 (int) index->ref.refcount);
249 /*
250 * Ensure existance of index->lock for index unlock.
251 */
252 rcu_read_lock();
253 /*
254 * Index lock ensures that concurrent test and update of stream
255 * ref is atomic.
256 */
257 assert(index->ref.refcount != 0);
258 urcu_ref_put(&index->ref, index_release);
259 rcu_read_unlock();
260 }
261
262 /*
263 * Try to flush index to disk. Releases self-reference to index once
264 * flush succeeds.
265 *
266 * Stream lock must be held by the caller.
267 * Return 0 on successful flush, a negative value on error, or positive
268 * value if no flush was performed.
269 */
270 int relay_index_try_flush(struct relay_index *index)
271 {
272 int ret = 1;
273 bool flushed = false;
274 int fd;
275
276 pthread_mutex_lock(&index->lock);
277 if (index->flushed) {
278 goto skip;
279 }
280 /* Check if we are ready to flush. */
281 if (!index->has_index_data || !index->index_file) {
282 goto skip;
283 }
284 fd = index->index_file->fd;
285 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
286 " on fd %d", index->stream->stream_handle,
287 index->index_n.key, fd);
288 flushed = true;
289 index->flushed = true;
290 ret = lttng_index_file_write(index->index_file, &index->index_data);
291 skip:
292 pthread_mutex_unlock(&index->lock);
293
294 if (flushed) {
295 /* Put self-ref from index now that it has been flushed. */
296 relay_index_put(index);
297 }
298 return ret;
299 }
300
301 /*
302 * Close every relay index within a given stream, without flushing
303 * them.
304 */
305 void relay_index_close_all(struct relay_stream *stream)
306 {
307 struct lttng_ht_iter iter;
308 struct relay_index *index;
309
310 rcu_read_lock();
311 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
312 index, index_n.node) {
313 /* Put self-ref from index. */
314 relay_index_put(index);
315 }
316 rcu_read_unlock();
317 }
318
319 void relay_index_close_partial_fd(struct relay_stream *stream)
320 {
321 struct lttng_ht_iter iter;
322 struct relay_index *index;
323
324 rcu_read_lock();
325 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
326 index, index_n.node) {
327 if (!index->index_file) {
328 continue;
329 }
330 /*
331 * Partial index has its index_file: we have only
332 * received its info from the data socket.
333 * Put self-ref from index.
334 */
335 relay_index_put(index);
336 }
337 rcu_read_unlock();
338 }
339
340 uint64_t relay_index_find_last(struct relay_stream *stream)
341 {
342 struct lttng_ht_iter iter;
343 struct relay_index *index;
344 uint64_t net_seq_num = -1ULL;
345
346 rcu_read_lock();
347 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
348 index, index_n.node) {
349 if (net_seq_num == -1ULL ||
350 index->index_n.key > net_seq_num) {
351 net_seq_num = index->index_n.key;
352 }
353 }
354 rcu_read_unlock();
355 return net_seq_num;
356 }
This page took 0.036549 seconds and 5 git commands to generate.