2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
12 #include <common/common.h>
13 #include <common/utils.h>
14 #include <common/compat/endian.h>
16 #include "lttng-relayd.h"
19 #include "connection.h"
22 * Allocate a new relay index object. Pass the stream in which it is
23 * contained as parameter. The sequence number will be used as the hash
26 * Called with stream mutex held.
27 * Return allocated object or else NULL on error.
29 static struct relay_index
*relay_index_create(struct relay_stream
*stream
,
32 struct relay_index
*index
;
34 DBG2("Creating relay index for stream id %" PRIu64
" and seqnum %" PRIu64
,
35 stream
->stream_handle
, net_seq_num
);
37 index
= (relay_index
*) zmalloc(sizeof(*index
));
39 PERROR("Relay index zmalloc");
42 if (!stream_get(stream
)) {
43 ERR("Cannot get stream");
48 index
->stream
= stream
;
50 lttng_ht_node_init_u64(&index
->index_n
, net_seq_num
);
51 pthread_mutex_init(&index
->lock
, NULL
);
52 urcu_ref_init(&index
->ref
);
59 * Add unique relay index to the given hash table. In case of a collision, the
60 * already existing object is put in the given _index variable.
62 * RCU read side lock MUST be acquired.
64 static struct relay_index
*relay_index_add_unique(struct relay_stream
*stream
,
65 struct relay_index
*index
)
67 struct cds_lfht_node
*node_ptr
;
68 struct relay_index
*_index
;
70 DBG2("Adding relay index with stream id %" PRIu64
" and seqnum %" PRIu64
,
71 stream
->stream_handle
, index
->index_n
.key
);
73 node_ptr
= cds_lfht_add_unique(stream
->indexes_ht
->ht
,
74 stream
->indexes_ht
->hash_fct(&index
->index_n
, lttng_ht_seed
),
75 stream
->indexes_ht
->match_fct
, &index
->index_n
,
76 &index
->index_n
.node
);
77 if (node_ptr
!= &index
->index_n
.node
) {
78 _index
= caa_container_of(node_ptr
, struct relay_index
,
87 * Should be called with RCU read-side lock held.
89 static bool relay_index_get(struct relay_index
*index
)
91 DBG2("index get for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
92 index
->stream
->stream_handle
, index
->index_n
.key
,
93 (int) index
->ref
.refcount
);
95 return urcu_ref_get_unless_zero(&index
->ref
);
99 * Get a relayd index in within the given stream, or create it if not
102 * Called with stream mutex held.
103 * Return index object or else NULL on error.
105 struct relay_index
*relay_index_get_by_id_or_create(struct relay_stream
*stream
,
106 uint64_t net_seq_num
)
108 struct lttng_ht_node_u64
*node
;
109 struct lttng_ht_iter iter
;
110 struct relay_index
*index
= NULL
;
112 DBG3("Finding index for stream id %" PRIu64
" and seq_num %" PRIu64
,
113 stream
->stream_handle
, net_seq_num
);
116 lttng_ht_lookup(stream
->indexes_ht
, &net_seq_num
, &iter
);
117 node
= lttng_ht_iter_get_node_u64(&iter
);
119 index
= caa_container_of(node
, struct relay_index
, index_n
);
121 struct relay_index
*oldindex
;
123 index
= relay_index_create(stream
, net_seq_num
);
125 ERR("Cannot create index for stream id %" PRIu64
" and seq_num %" PRIu64
,
126 stream
->stream_handle
, net_seq_num
);
129 oldindex
= relay_index_add_unique(stream
, index
);
131 /* Added concurrently, keep old. */
132 relay_index_put(index
);
134 if (!relay_index_get(index
)) {
138 stream
->indexes_in_flight
++;
139 index
->in_hash_table
= true;
144 DBG2("Index %sfound or created in HT for stream ID %" PRIu64
" and seqnum %" PRIu64
,
145 (index
== NULL
) ? "NOT " : "", stream
->stream_handle
, net_seq_num
);
149 int relay_index_set_file(struct relay_index
*index
,
150 struct lttng_index_file
*index_file
,
151 uint64_t data_offset
)
155 pthread_mutex_lock(&index
->lock
);
156 if (index
->index_file
) {
160 lttng_index_file_get(index_file
);
161 index
->index_file
= index_file
;
162 index
->index_data
.offset
= data_offset
;
164 pthread_mutex_unlock(&index
->lock
);
168 int relay_index_set_data(struct relay_index
*index
,
169 const struct ctf_packet_index
*data
)
173 pthread_mutex_lock(&index
->lock
);
174 if (index
->has_index_data
) {
178 /* Set everything except data_offset. */
179 index
->index_data
.packet_size
= data
->packet_size
;
180 index
->index_data
.content_size
= data
->content_size
;
181 index
->index_data
.timestamp_begin
= data
->timestamp_begin
;
182 index
->index_data
.timestamp_end
= data
->timestamp_end
;
183 index
->index_data
.events_discarded
= data
->events_discarded
;
184 index
->index_data
.stream_id
= data
->stream_id
;
185 index
->has_index_data
= true;
187 pthread_mutex_unlock(&index
->lock
);
191 static void index_destroy(struct relay_index
*index
)
196 static void index_destroy_rcu(struct rcu_head
*rcu_head
)
198 struct relay_index
*index
=
199 caa_container_of(rcu_head
, struct relay_index
, rcu_node
);
201 index_destroy(index
);
204 /* Stream lock must be held by the caller. */
205 static void index_release(struct urcu_ref
*ref
)
207 struct relay_index
*index
= caa_container_of(ref
, struct relay_index
, ref
);
208 struct relay_stream
*stream
= index
->stream
;
210 struct lttng_ht_iter iter
;
212 if (index
->index_file
) {
213 lttng_index_file_put(index
->index_file
);
214 index
->index_file
= NULL
;
216 if (index
->in_hash_table
) {
217 /* Delete index from hash table. */
218 iter
.iter
.node
= &index
->index_n
.node
;
219 ret
= lttng_ht_del(stream
->indexes_ht
, &iter
);
221 stream
->indexes_in_flight
--;
224 stream_put(index
->stream
);
225 index
->stream
= NULL
;
227 call_rcu(&index
->rcu_node
, index_destroy_rcu
);
231 * Called with stream mutex held.
233 * Stream lock must be held by the caller.
235 void relay_index_put(struct relay_index
*index
)
237 DBG2("index put for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
238 index
->stream
->stream_handle
, index
->index_n
.key
,
239 (int) index
->ref
.refcount
);
241 * Ensure existence of index->lock for index unlock.
245 * Index lock ensures that concurrent test and update of stream
248 LTTNG_ASSERT(index
->ref
.refcount
!= 0);
249 urcu_ref_put(&index
->ref
, index_release
);
254 * Try to flush index to disk. Releases self-reference to index once
257 * Stream lock must be held by the caller.
258 * Return 0 on successful flush, a negative value on error, or positive
259 * value if no flush was performed.
261 int relay_index_try_flush(struct relay_index
*index
)
264 bool flushed
= false;
266 pthread_mutex_lock(&index
->lock
);
267 if (index
->flushed
) {
270 /* Check if we are ready to flush. */
271 if (!index
->has_index_data
|| !index
->index_file
) {
275 DBG2("Writing index for stream ID %" PRIu64
" and seq num %" PRIu64
,
276 index
->stream
->stream_handle
, index
->index_n
.key
);
278 index
->flushed
= true;
279 ret
= lttng_index_file_write(index
->index_file
, &index
->index_data
);
281 pthread_mutex_unlock(&index
->lock
);
284 /* Put self-ref from index now that it has been flushed. */
285 relay_index_put(index
);
291 * Close every relay index within a given stream, without flushing
294 void relay_index_close_all(struct relay_stream
*stream
)
296 struct lttng_ht_iter iter
;
297 struct relay_index
*index
;
300 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
301 index
, index_n
.node
) {
302 /* Put self-ref from index. */
303 relay_index_put(index
);
308 void relay_index_close_partial_fd(struct relay_stream
*stream
)
310 struct lttng_ht_iter iter
;
311 struct relay_index
*index
;
314 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
315 index
, index_n
.node
) {
316 if (!index
->index_file
) {
320 * Partial index has its index_file: we have only
321 * received its info from the data socket.
322 * Put self-ref from index.
324 relay_index_put(index
);
329 uint64_t relay_index_find_last(struct relay_stream
*stream
)
331 struct lttng_ht_iter iter
;
332 struct relay_index
*index
;
333 uint64_t net_seq_num
= -1ULL;
336 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
337 index
, index_n
.node
) {
338 if (net_seq_num
== -1ULL ||
339 index
->index_n
.key
> net_seq_num
) {
340 net_seq_num
= index
->index_n
.key
;
348 * Update the index file of an already existing relay_index.
349 * Offsets by 'removed_data_count' the offset field of an index.
352 int relay_index_switch_file(struct relay_index
*index
,
353 struct lttng_index_file
*new_index_file
,
354 uint64_t removed_data_count
)
359 pthread_mutex_lock(&index
->lock
);
360 if (!index
->index_file
) {
361 ERR("No index_file");
366 lttng_index_file_put(index
->index_file
);
367 lttng_index_file_get(new_index_file
);
368 index
->index_file
= new_index_file
;
369 offset
= be64toh(index
->index_data
.offset
);
370 index
->index_data
.offset
= htobe64(offset
- removed_data_count
);
373 pthread_mutex_unlock(&index
->lock
);
378 * Switch the index file of all pending indexes for a stream and update the
379 * data offset by substracting the last safe position.
380 * Stream lock must be held.
382 int relay_index_switch_all_files(struct relay_stream
*stream
)
384 struct lttng_ht_iter iter
;
385 struct relay_index
*index
;
389 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
390 index
, index_n
.node
) {
391 ret
= relay_index_switch_file(index
, stream
->index_file
,
392 stream
->pos_after_last_complete_data_index
);
403 * Set index data from the control port to a given index object.
405 int relay_index_set_control_data(struct relay_index
*index
,
406 const struct lttcomm_relayd_index
*data
,
407 unsigned int minor_version
)
409 /* The index on disk is encoded in big endian. */
410 ctf_packet_index index_data
{};
412 index_data
.packet_size
= htobe64(data
->packet_size
);
413 index_data
.content_size
= htobe64(data
->content_size
);
414 index_data
.timestamp_begin
= htobe64(data
->timestamp_begin
);
415 index_data
.timestamp_end
= htobe64(data
->timestamp_end
);
416 index_data
.events_discarded
= htobe64(data
->events_discarded
);
417 index_data
.stream_id
= htobe64(data
->stream_id
);
419 if (minor_version
>= 8) {
420 index
->index_data
.stream_instance_id
= htobe64(data
->stream_instance_id
);
421 index
->index_data
.packet_seq_num
= htobe64(data
->packet_seq_num
);
423 uint64_t unset_value
= -1ULL;
425 index
->index_data
.stream_instance_id
= htobe64(unset_value
);
426 index
->index_data
.packet_seq_num
= htobe64(unset_value
);
429 return relay_index_set_data(index
, &index_data
);