X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Findex.cpp;fp=src%2Fbin%2Flttng-relayd%2Findex.cpp;h=426a34b2470e3b7acf0053bbba60fd58edc9b751;hp=a20ae2658a505cfab8431546e458e960ac83f203;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/bin/lttng-relayd/index.cpp b/src/bin/lttng-relayd/index.cpp index a20ae2658..426a34b24 100644 --- a/src/bin/lttng-relayd/index.cpp +++ b/src/bin/lttng-relayd/index.cpp @@ -9,14 +9,14 @@ #define _LGPL_SOURCE -#include -#include -#include - +#include "connection.hpp" +#include "index.hpp" #include "lttng-relayd.hpp" #include "stream.hpp" -#include "index.hpp" -#include "connection.hpp" + +#include +#include +#include /* * Allocate a new relay index object. Pass the stream in which it is @@ -26,13 +26,13 @@ * Called with stream mutex held. * Return allocated object or else NULL on error. */ -static struct relay_index *relay_index_create(struct relay_stream *stream, - uint64_t net_seq_num) +static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num) { struct relay_index *index; DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64, - stream->stream_handle, net_seq_num); + stream->stream_handle, + net_seq_num); index = zmalloc(); if (!index) { @@ -62,7 +62,7 @@ end: * RCU read side lock MUST be acquired. */ static struct relay_index *relay_index_add_unique(struct relay_stream *stream, - struct relay_index *index) + struct relay_index *index) { struct cds_lfht_node *node_ptr; struct relay_index *_index; @@ -70,15 +70,16 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream, ASSERT_RCU_READ_LOCKED(); DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - stream->stream_handle, index->index_n.key); + stream->stream_handle, + index->index_n.key); node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht, - stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), - stream->indexes_ht->match_fct, &index->index_n, - &index->index_n.node); + stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), + stream->indexes_ht->match_fct, + &index->index_n, + &index->index_n.node); if (node_ptr != &index->index_n.node) { - _index = caa_container_of(node_ptr, struct relay_index, - index_n.node); + _index = caa_container_of(node_ptr, struct relay_index, index_n.node); } else { _index = NULL; } @@ -93,8 +94,9 @@ static bool relay_index_get(struct relay_index *index) ASSERT_RCU_READ_LOCKED(); DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", - index->stream->stream_handle, index->index_n.key, - (int) index->ref.refcount); + index->stream->stream_handle, + index->index_n.key, + (int) index->ref.refcount); return urcu_ref_get_unless_zero(&index->ref); } @@ -107,14 +109,15 @@ static bool relay_index_get(struct relay_index *index) * Return index object or else NULL on error. */ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, - uint64_t net_seq_num) + uint64_t net_seq_num) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_index *index = NULL; DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream->stream_handle, net_seq_num); + stream->stream_handle, + net_seq_num); rcu_read_lock(); lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter); @@ -127,7 +130,8 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, index = relay_index_create(stream, net_seq_num); if (!index) { ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream->stream_handle, net_seq_num); + stream->stream_handle, + net_seq_num); goto end; } oldindex = relay_index_add_unique(stream, index); @@ -146,13 +150,15 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, end: rcu_read_unlock(); DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, - (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num); + (index == NULL) ? "NOT " : "", + stream->stream_handle, + net_seq_num); return index; } int relay_index_set_file(struct relay_index *index, - struct lttng_index_file *index_file, - uint64_t data_offset) + struct lttng_index_file *index_file, + uint64_t data_offset) { int ret = 0; @@ -169,8 +175,7 @@ end: return ret; } -int relay_index_set_data(struct relay_index *index, - const struct ctf_packet_index *data) +int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data) { int ret = 0; @@ -199,8 +204,7 @@ static void index_destroy(struct relay_index *index) static void index_destroy_rcu(struct rcu_head *rcu_head) { - struct relay_index *index = - lttng::utils::container_of(rcu_head, &relay_index::rcu_node); + struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node); index_destroy(index); } @@ -239,8 +243,9 @@ static void index_release(struct urcu_ref *ref) void relay_index_put(struct relay_index *index) { DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", - index->stream->stream_handle, index->index_n.key, - (int) index->ref.refcount); + index->stream->stream_handle, + index->index_n.key, + (int) index->ref.refcount); /* * Ensure existence of index->lock for index unlock. */ @@ -277,7 +282,8 @@ int relay_index_try_flush(struct relay_index *index) } DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64, - index->stream->stream_handle, index->index_n.key); + index->stream->stream_handle, + index->index_n.key); flushed = true; index->flushed = true; ret = lttng_index_file_write(index->index_file, &index->index_data); @@ -301,8 +307,7 @@ void relay_index_close_all(struct relay_stream *stream) struct relay_index *index; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { /* Put self-ref from index. */ relay_index_put(index); } @@ -315,8 +320,7 @@ void relay_index_close_partial_fd(struct relay_stream *stream) struct relay_index *index; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { if (!index->index_file) { continue; } @@ -337,10 +341,8 @@ uint64_t relay_index_find_last(struct relay_stream *stream) uint64_t net_seq_num = -1ULL; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { - if (net_seq_num == -1ULL || - index->index_n.key > net_seq_num) { + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { + if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) { net_seq_num = index->index_n.key; } } @@ -352,10 +354,9 @@ uint64_t relay_index_find_last(struct relay_stream *stream) * Update the index file of an already existing relay_index. * Offsets by 'removed_data_count' the offset field of an index. */ -static -int relay_index_switch_file(struct relay_index *index, - struct lttng_index_file *new_index_file, - uint64_t removed_data_count) +static int relay_index_switch_file(struct relay_index *index, + struct lttng_index_file *new_index_file, + uint64_t removed_data_count) { int ret = 0; uint64_t offset; @@ -390,10 +391,9 @@ int relay_index_switch_all_files(struct relay_stream *stream) int ret = 0; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { - ret = relay_index_switch_file(index, stream->index_file, - stream->pos_after_last_complete_data_index); + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { + ret = relay_index_switch_file( + index, stream->index_file, stream->pos_after_last_complete_data_index); if (ret) { goto end; } @@ -407,11 +407,11 @@ end: * Set index data from the control port to a given index object. */ int relay_index_set_control_data(struct relay_index *index, - const struct lttcomm_relayd_index *data, - unsigned int minor_version) + const struct lttcomm_relayd_index *data, + unsigned int minor_version) { /* The index on disk is encoded in big endian. */ - ctf_packet_index index_data {}; + ctf_packet_index index_data{}; index_data.packet_size = htobe64(data->packet_size); index_data.content_size = htobe64(data->content_size);