X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Findex.c;h=bdbd11330f638a05223c81d256da6ef2da646dfd;hp=b15bbcd7702954e675a98c06e02ab31058ff7499;hb=9c256b0146b021962e93fda7400c61ceb2b0d45f;hpb=f8f3885cc52af9d3c951da78989d6f4a25270411 diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index b15bbcd77..bdbd11330 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -22,10 +22,12 @@ #include #include +#include #include "lttng-relayd.h" #include "stream.h" #include "index.h" +#include "connection.h" /* * Allocate a new relay index object. Pass the stream in which it is @@ -58,7 +60,6 @@ static struct relay_index *relay_index_create(struct relay_stream *stream, lttng_ht_node_init_u64(&index->index_n, net_seq_num); pthread_mutex_init(&index->lock, NULL); - pthread_mutex_init(&index->reflock, NULL); urcu_ref_init(&index->ref); end: @@ -98,21 +99,11 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream, */ static bool relay_index_get(struct relay_index *index) { - bool has_ref = false; - DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", index->stream->stream_handle, index->index_n.key, (int) index->ref.refcount); - /* Confirm that the index refcount has not reached 0. */ - pthread_mutex_lock(&index->reflock); - if (index->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&index->ref); - } - pthread_mutex_unlock(&index->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&index->ref); } /* @@ -265,10 +256,8 @@ void relay_index_put(struct relay_index *index) * Index lock ensures that concurrent test and update of stream * ref is atomic. */ - pthread_mutex_lock(&index->reflock); assert(index->ref.refcount != 0); urcu_ref_put(&index->ref, index_release); - pthread_mutex_unlock(&index->reflock); rcu_read_unlock(); } @@ -367,3 +356,89 @@ uint64_t relay_index_find_last(struct relay_stream *stream) rcu_read_unlock(); return net_seq_num; } + +/* + * Update the index file of an already existing relay_index. + * Offsets by 'removed_data_count' the offset field of an index. + */ +static +int relay_index_switch_file(struct relay_index *index, + struct lttng_index_file *new_index_file, + uint64_t removed_data_count) +{ + int ret = 0; + uint64_t offset; + + pthread_mutex_lock(&index->lock); + if (!index->index_file) { + ERR("No index_file"); + ret = 0; + goto end; + } + + lttng_index_file_put(index->index_file); + lttng_index_file_get(new_index_file); + index->index_file = new_index_file; + offset = be64toh(index->index_data.offset); + index->index_data.offset = htobe64(offset - removed_data_count); + +end: + pthread_mutex_unlock(&index->lock); + return ret; +} + +/* + * Switch the index file of all pending indexes for a stream and update the + * data offset by substracting the last safe position. + * Stream lock must be held. + */ +int relay_index_switch_all_files(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + int ret = 0; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + DBG("Update index to fd %d", stream->index_file->fd); + ret = relay_index_switch_file(index, stream->index_file, + stream->pos_after_last_complete_data_index); + if (ret) { + goto end; + } + } +end: + rcu_read_unlock(); + return ret; +} + +/* + * Set index data from the control port to a given index object. + */ +int relay_index_set_control_data(struct relay_index *index, + const struct lttcomm_relayd_index *data, + unsigned int minor_version) +{ + /* The index on disk is encoded in big endian. */ + const struct ctf_packet_index index_data = { + .packet_size = htobe64(data->packet_size), + .content_size = htobe64(data->content_size), + .timestamp_begin = htobe64(data->timestamp_begin), + .timestamp_end = htobe64(data->timestamp_end), + .events_discarded = htobe64(data->events_discarded), + .stream_id = htobe64(data->stream_id), + }; + + if (minor_version >= 8) { + index->index_data.stream_instance_id = htobe64(data->stream_instance_id); + index->index_data.packet_seq_num = htobe64(data->packet_seq_num); + } else { + uint64_t unset_value = -1ULL; + + index->index_data.stream_instance_id = htobe64(unset_value); + index->index_data.packet_seq_num = htobe64(unset_value); + } + + return relay_index_set_data(index, &index_data); +}