X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Findex.c;h=ff27d958a0aebd6eb0f7816544eaa1f686ee192d;hp=8cacdd2f01eb8807d9a5d5de239277b311776256;hb=211d28db4ccebf1a2abcd04eabba31cf46af3060;hpb=0a6518b0685cb9e07cf156a4c882dc6ec40db35a diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index 8cacdd2f0..ff27d958a 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -1,210 +1,431 @@ /* - * Copyright (C) 2013 - Julien Desfossez - * David Goulet + * Copyright (C) 2013 Julien Desfossez + * Copyright (C) 2013 David Goulet + * Copyright (C) 2015 Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include +#include #include "lttng-relayd.h" +#include "stream.h" #include "index.h" +#include "connection.h" /* - * Deferred free of a relay index object. MUST only be called by a call RCU. + * Allocate a new relay index object. Pass the stream in which it is + * contained as parameter. The sequence number will be used as the hash + * table key. + * + * Called with stream mutex held. + * Return allocated object or else NULL on error. */ -static void deferred_free_relay_index(struct rcu_head *head) +static struct relay_index *relay_index_create(struct relay_stream *stream, + uint64_t net_seq_num) { - struct relay_index *index = - caa_container_of(head, struct relay_index, rcu_node); + struct relay_index *index; - if (index->to_close_fd >= 0) { - int ret; + DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64, + stream->stream_handle, net_seq_num); - ret = close(index->to_close_fd); - if (ret < 0) { - PERROR("Relay index to close fd %d", index->to_close_fd); - } + index = zmalloc(sizeof(*index)); + if (!index) { + PERROR("Relay index zmalloc"); + goto end; + } + if (!stream_get(stream)) { + ERR("Cannot get stream"); + free(index); + index = NULL; + goto end; } + index->stream = stream; + + lttng_ht_node_init_u64(&index->index_n, net_seq_num); + pthread_mutex_init(&index->lock, NULL); + urcu_ref_init(&index->ref); - relay_index_free(index); +end: + return index; } /* - * Allocate a new relay index object using the given stream ID and sequence - * number as the hash table key. + * Add unique relay index to the given hash table. In case of a collision, the + * already existing object is put in the given _index variable. * - * Return allocated object or else NULL on error. + * RCU read side lock MUST be acquired. */ -struct relay_index *relay_index_create(uint64_t stream_id, - uint64_t net_seq_num) +static struct relay_index *relay_index_add_unique(struct relay_stream *stream, + struct relay_index *index) { - struct relay_index *index; + struct cds_lfht_node *node_ptr; + struct relay_index *_index; - DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - stream_id, net_seq_num); + DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, + stream->stream_handle, index->index_n.key); - index = zmalloc(sizeof(*index)); - if (index == NULL) { - PERROR("Relay index zmalloc"); - goto error; + node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht, + stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), + stream->indexes_ht->match_fct, &index->index_n, + &index->index_n.node); + if (node_ptr != &index->index_n.node) { + _index = caa_container_of(node_ptr, struct relay_index, + index_n.node); + } else { + _index = NULL; } + return _index; +} - index->to_close_fd = -1; - lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num); +/* + * Should be called with RCU read-side lock held. + */ +static bool relay_index_get(struct relay_index *index) +{ + DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", + index->stream->stream_handle, index->index_n.key, + (int) index->ref.refcount); -error: - return index; + return urcu_ref_get_unless_zero(&index->ref); } /* - * Find a relayd index in the given hash table. + * Get a relayd index in within the given stream, or create it if not + * present. * + * Called with stream mutex held. * Return index object or else NULL on error. */ -struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num) +struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, + uint64_t net_seq_num) { - struct lttng_ht_node_two_u64 *node; + struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct lttng_ht_two_u64 key; struct relay_index *index = NULL; DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream_id, net_seq_num); - - key.key1 = stream_id; - key.key2 = net_seq_num; + stream->stream_handle, net_seq_num); - lttng_ht_lookup(indexes_ht, (void *)(&key), &iter); - node = lttng_ht_iter_get_node_two_u64(&iter); - if (node == NULL) { - goto end; + rcu_read_lock(); + lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter); + node = lttng_ht_iter_get_node_u64(&iter); + if (node) { + index = caa_container_of(node, struct relay_index, index_n); + } else { + struct relay_index *oldindex; + + index = relay_index_create(stream, net_seq_num); + if (!index) { + ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64, + stream->stream_handle, net_seq_num); + goto end; + } + oldindex = relay_index_add_unique(stream, index); + if (oldindex) { + /* Added concurrently, keep old. */ + relay_index_put(index); + index = oldindex; + if (!relay_index_get(index)) { + index = NULL; + } + } else { + stream->indexes_in_flight++; + index->in_hash_table = true; + } } - index = caa_container_of(node, struct relay_index, index_n); - end: - DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, - (index == NULL) ? "NOT " : "", stream_id, net_seq_num); + rcu_read_unlock(); + DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, + (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num); return index; } -/* - * Add unique relay index to the given hash table. In case of a collision, the - * already existing object is put in the given _index variable. - * - * RCU read side lock MUST be acquired. - */ -void relay_index_add(struct relay_index *index, struct relay_index **_index) +int relay_index_set_file(struct relay_index *index, + struct lttng_index_file *index_file, + uint64_t data_offset) { - struct cds_lfht_node *node_ptr; + int ret = 0; - assert(index); + pthread_mutex_lock(&index->lock); + if (index->index_file) { + ret = -1; + goto end; + } + lttng_index_file_get(index_file); + index->index_file = index_file; + index->index_data.offset = data_offset; +end: + pthread_mutex_unlock(&index->lock); + return ret; +} - DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - index->key.key1, index->key.key2); +int relay_index_set_data(struct relay_index *index, + const struct ctf_packet_index *data) +{ + int ret = 0; - node_ptr = cds_lfht_add_unique(indexes_ht->ht, - indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed), - indexes_ht->match_fct, (void *) &index->index_n.key, - &index->index_n.node); - if (node_ptr != &index->index_n.node) { - *_index = caa_container_of(node_ptr, struct relay_index, index_n.node); + pthread_mutex_lock(&index->lock); + if (index->has_index_data) { + ret = -1; + goto end; } + /* Set everything except data_offset. */ + index->index_data.packet_size = data->packet_size; + index->index_data.content_size = data->content_size; + index->index_data.timestamp_begin = data->timestamp_begin; + index->index_data.timestamp_end = data->timestamp_end; + index->index_data.events_discarded = data->events_discarded; + index->index_data.stream_id = data->stream_id; + index->has_index_data = true; +end: + pthread_mutex_unlock(&index->lock); + return ret; } -/* - * Write index on disk to the given fd. Once done error or not, it is removed - * from the hash table and destroy the object. - * - * MUST be called with a RCU read side lock held. - * - * Return 0 on success else a negative value. - */ -int relay_index_write(int fd, struct relay_index *index) +static void index_destroy(struct relay_index *index) +{ + free(index); +} + +static void index_destroy_rcu(struct rcu_head *rcu_head) +{ + struct relay_index *index = + caa_container_of(rcu_head, struct relay_index, rcu_node); + + index_destroy(index); +} + +/* Stream lock must be held by the caller. */ +static void index_release(struct urcu_ref *ref) { + struct relay_index *index = caa_container_of(ref, struct relay_index, ref); + struct relay_stream *stream = index->stream; int ret; struct lttng_ht_iter iter; - DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 - " on fd %d", index->key.key1, index->key.key2, fd); + if (index->index_file) { + lttng_index_file_put(index->index_file); + index->index_file = NULL; + } + if (index->in_hash_table) { + /* Delete index from hash table. */ + iter.iter.node = &index->index_n.node; + ret = lttng_ht_del(stream->indexes_ht, &iter); + assert(!ret); + stream->indexes_in_flight--; + } - /* Delete index from hash table. */ - iter.iter.node = &index->index_n.node; - ret = lttng_ht_del(indexes_ht, &iter); - assert(!ret); - call_rcu(&index->rcu_node, deferred_free_relay_index); + stream_put(index->stream); + index->stream = NULL; - return index_write(fd, &index->index_data, sizeof(index->index_data)); + call_rcu(&index->rcu_node, index_destroy_rcu); } /* - * Free the given index. + * Called with stream mutex held. + * + * Stream lock must be held by the caller. */ -void relay_index_free(struct relay_index *index) +void relay_index_put(struct relay_index *index) { - free(index); + DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", + index->stream->stream_handle, index->index_n.key, + (int) index->ref.refcount); + /* + * Ensure existance of index->lock for index unlock. + */ + rcu_read_lock(); + /* + * Index lock ensures that concurrent test and update of stream + * ref is atomic. + */ + assert(index->ref.refcount != 0); + urcu_ref_put(&index->ref, index_release); + rcu_read_unlock(); } /* - * Safely free the given index using a call RCU. + * Try to flush index to disk. Releases self-reference to index once + * flush succeeds. + * + * Stream lock must be held by the caller. + * Return 0 on successful flush, a negative value on error, or positive + * value if no flush was performed. */ -void relay_index_free_safe(struct relay_index *index) +int relay_index_try_flush(struct relay_index *index) { - if (!index) { - return; + int ret = 1; + bool flushed = false; + + pthread_mutex_lock(&index->lock); + if (index->flushed) { + goto skip; + } + /* Check if we are ready to flush. */ + if (!index->has_index_data || !index->index_file) { + goto skip; } - call_rcu(&index->rcu_node, deferred_free_relay_index); + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64, + index->stream->stream_handle, index->index_n.key); + flushed = true; + index->flushed = true; + ret = lttng_index_file_write(index->index_file, &index->index_data); +skip: + pthread_mutex_unlock(&index->lock); + + if (flushed) { + /* Put self-ref from index now that it has been flushed. */ + relay_index_put(index); + } + return ret; } /* - * Delete index from the given hash table. - * - * RCU read side lock MUST be acquired. + * Close every relay index within a given stream, without flushing + * them. */ -void relay_index_delete(struct relay_index *index) +void relay_index_close_all(struct relay_stream *stream) { - int ret; struct lttng_ht_iter iter; + struct relay_index *index; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + /* Put self-ref from index. */ + relay_index_put(index); + } + rcu_read_unlock(); +} - DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64 - "deleted.", index->key.key1, index->key.key2); +void relay_index_close_partial_fd(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; - /* Delete index from hash table. */ - iter.iter.node = &index->index_n.node; - ret = lttng_ht_del(indexes_ht, &iter); - assert(!ret); + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + if (!index->index_file) { + continue; + } + /* + * Partial index has its index_file: we have only + * received its info from the data socket. + * Put self-ref from index. + */ + relay_index_put(index); + } + rcu_read_unlock(); +} + +uint64_t relay_index_find_last(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + uint64_t net_seq_num = -1ULL; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + if (net_seq_num == -1ULL || + index->index_n.key > net_seq_num) { + net_seq_num = index->index_n.key; + } + } + rcu_read_unlock(); + return net_seq_num; } /* - * Destroy every relay index with the given stream id as part of the key. + * Update the index file of an already existing relay_index. + * Offsets by 'removed_data_count' the offset field of an index. */ -void relay_index_destroy_by_stream_id(uint64_t stream_id) +static +int relay_index_switch_file(struct relay_index *index, + struct lttng_index_file *new_index_file, + uint64_t removed_data_count) +{ + int ret = 0; + uint64_t offset; + + pthread_mutex_lock(&index->lock); + if (!index->index_file) { + ERR("No index_file"); + ret = 0; + goto end; + } + + lttng_index_file_put(index->index_file); + lttng_index_file_get(new_index_file); + index->index_file = new_index_file; + offset = be64toh(index->index_data.offset); + index->index_data.offset = htobe64(offset - removed_data_count); + +end: + pthread_mutex_unlock(&index->lock); + return ret; +} + +/* + * Switch the index file of all pending indexes for a stream and update the + * data offset by substracting the last safe position. + * Stream lock must be held. + */ +int relay_index_switch_all_files(struct relay_stream *stream) { struct lttng_ht_iter iter; struct relay_index *index; + int ret = 0; rcu_read_lock(); - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { - if (index->key.key1 == stream_id) { - relay_index_delete(index); - relay_index_free_safe(index); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + ret = relay_index_switch_file(index, stream->index_file, + stream->pos_after_last_complete_data_index); + if (ret) { + goto end; } } +end: rcu_read_unlock(); + return ret; +} + +/* + * Set index data from the control port to a given index object. + */ +int relay_index_set_control_data(struct relay_index *index, + const struct lttcomm_relayd_index *data, + unsigned int minor_version) +{ + /* The index on disk is encoded in big endian. */ + const struct ctf_packet_index index_data = { + .packet_size = htobe64(data->packet_size), + .content_size = htobe64(data->content_size), + .timestamp_begin = htobe64(data->timestamp_begin), + .timestamp_end = htobe64(data->timestamp_end), + .events_discarded = htobe64(data->events_discarded), + .stream_id = htobe64(data->stream_id), + }; + + if (minor_version >= 8) { + index->index_data.stream_instance_id = htobe64(data->stream_instance_id); + index->index_data.packet_seq_num = htobe64(data->packet_seq_num); + } else { + uint64_t unset_value = -1ULL; + + index->index_data.stream_instance_id = htobe64(unset_value); + index->index_data.packet_seq_num = htobe64(unset_value); + } + + return relay_index_set_data(index, &index_data); }