X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Findex.c;h=80a4bb94d2a0651dc24100425c52ca4047b06209;hp=b7507a022076e0e6be789523111c08f4fcad2d66;hb=719155c2bed581d85290537fd95bafc787229dbc;hpb=5c32d0aeb3f1e338ff61f9b5c865a01763a3ec4b diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index b7507a022..80a4bb94d 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,7 +17,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include @@ -24,190 +24,350 @@ #include #include "lttng-relayd.h" +#include "stream.h" #include "index.h" /* - * Deferred free of a relay index object. MUST only be called by a call RCU. + * Allocate a new relay index object. Pass the stream in which it is + * contained as parameter. The sequence number will be used as the hash + * table key. + * + * Called with stream mutex held. + * Return allocated object or else NULL on error. */ -static void deferred_free_relay_index(struct rcu_head *head) +static struct relay_index *relay_index_create(struct relay_stream *stream, + uint64_t net_seq_num) { - struct relay_index *index = - caa_container_of(head, struct relay_index, rcu_node); + struct relay_index *index; - if (index->to_close_fd >= 0) { - int ret; + DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64, + stream->stream_handle, net_seq_num); - ret = close(index->to_close_fd); - if (ret < 0) { - PERROR("Relay index to close fd %d", index->to_close_fd); - } + index = zmalloc(sizeof(*index)); + if (!index) { + PERROR("Relay index zmalloc"); + goto end; } + if (!stream_get(stream)) { + ERR("Cannot get stream"); + free(index); + index = NULL; + goto end; + } + index->stream = stream; + + lttng_ht_node_init_u64(&index->index_n, net_seq_num); + pthread_mutex_init(&index->lock, NULL); + pthread_mutex_init(&index->reflock, NULL); + urcu_ref_init(&index->ref); - relay_index_free(index); +end: + return index; } /* - * Allocate a new relay index object using the given stream ID and sequence - * number as the hash table key. + * Add unique relay index to the given hash table. In case of a collision, the + * already existing object is put in the given _index variable. * - * Return allocated object or else NULL on error. + * RCU read side lock MUST be acquired. */ -struct relay_index *relay_index_create(uint64_t stream_id, - uint64_t net_seq_num) +static struct relay_index *relay_index_add_unique(struct relay_stream *stream, + struct relay_index *index) { - struct relay_index *index; + struct cds_lfht_node *node_ptr; + struct relay_index *_index; - DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - stream_id, net_seq_num); + DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, + stream->stream_handle, index->index_n.key); - index = zmalloc(sizeof(*index)); - if (index == NULL) { - PERROR("Relay index zmalloc"); - goto error; + node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht, + stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), + stream->indexes_ht->match_fct, &index->index_n, + &index->index_n.node); + if (node_ptr != &index->index_n.node) { + _index = caa_container_of(node_ptr, struct relay_index, + index_n.node); + } else { + _index = NULL; } + return _index; +} - index->to_close_fd = -1; - lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num); +/* + * Should be called with RCU read-side lock held. + */ +static bool relay_index_get(struct relay_index *index) +{ + bool has_ref = false; -error: - return index; + DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", + index->stream->stream_handle, index->index_n.key, + (int) index->ref.refcount); + + /* Confirm that the index refcount has not reached 0. */ + pthread_mutex_lock(&index->reflock); + if (index->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&index->ref); + } + pthread_mutex_unlock(&index->reflock); + + return has_ref; } /* - * Find a relayd index in the given hash table. + * Get a relayd index in within the given stream, or create it if not + * present. * + * Called with stream mutex held. * Return index object or else NULL on error. */ -struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num) +struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, + uint64_t net_seq_num) { - struct lttng_ht_node_two_u64 *node; + struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct lttng_ht_two_u64 key; struct relay_index *index = NULL; DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream_id, net_seq_num); - - key.key1 = stream_id; - key.key2 = net_seq_num; + stream->stream_handle, net_seq_num); - lttng_ht_lookup(indexes_ht, (void *)(&key), &iter); - node = lttng_ht_iter_get_node_two_u64(&iter); - if (node == NULL) { - goto end; + rcu_read_lock(); + lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter); + node = lttng_ht_iter_get_node_u64(&iter); + if (node) { + index = caa_container_of(node, struct relay_index, index_n); + } else { + struct relay_index *oldindex; + + index = relay_index_create(stream, net_seq_num); + if (!index) { + ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64, + stream->stream_handle, net_seq_num); + goto end; + } + oldindex = relay_index_add_unique(stream, index); + if (oldindex) { + /* Added concurrently, keep old. */ + relay_index_put(index); + index = oldindex; + if (!relay_index_get(index)) { + index = NULL; + } + } else { + stream->indexes_in_flight++; + index->in_hash_table = true; + } } - index = caa_container_of(node, struct relay_index, index_n); - end: - DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, - (index == NULL) ? "NOT " : "", stream_id, net_seq_num); + rcu_read_unlock(); + DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, + (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num); return index; } -/* - * Add unique relay index to the given hash table. In case of a collision, the - * already existing object is put in the given _index variable. - * - * RCU read side lock MUST be acquired. - */ -void relay_index_add(struct relay_index *index, struct relay_index **_index) +int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd, + uint64_t data_offset) { - struct cds_lfht_node *node_ptr; + int ret = 0; - assert(index); + pthread_mutex_lock(&index->lock); + if (index->index_fd) { + ret = -1; + goto end; + } + stream_fd_get(index_fd); + index->index_fd = index_fd; + index->index_data.offset = data_offset; +end: + pthread_mutex_unlock(&index->lock); + return ret; +} - DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - index->index_n.key.key1, index->index_n.key.key2); +int relay_index_set_data(struct relay_index *index, + const struct ctf_packet_index *data) +{ + int ret = 0; - node_ptr = cds_lfht_add_unique(indexes_ht->ht, - indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed), - indexes_ht->match_fct, (void *) &index->index_n.key, - &index->index_n.node); - if (node_ptr != &index->index_n.node) { - *_index = caa_container_of(node_ptr, struct relay_index, index_n.node); + pthread_mutex_lock(&index->lock); + if (index->has_index_data) { + ret = -1; + goto end; } + /* Set everything except data_offset. */ + index->index_data.packet_size = data->packet_size; + index->index_data.content_size = data->content_size; + index->index_data.timestamp_begin = data->timestamp_begin; + index->index_data.timestamp_end = data->timestamp_end; + index->index_data.events_discarded = data->events_discarded; + index->index_data.stream_id = data->stream_id; + index->has_index_data = true; +end: + pthread_mutex_unlock(&index->lock); + return ret; } -/* - * Write index on disk to the given fd. Once done error or not, it is removed - * from the hash table and destroy the object. - * - * MUST be called with a RCU read side lock held. - * - * Return 0 on success else a negative value. - */ -int relay_index_write(int fd, struct relay_index *index) +static void index_destroy(struct relay_index *index) +{ + free(index); +} + +static void index_destroy_rcu(struct rcu_head *rcu_head) +{ + struct relay_index *index = + caa_container_of(rcu_head, struct relay_index, rcu_node); + + index_destroy(index); +} + +/* Stream lock must be held by the caller. */ +static void index_release(struct urcu_ref *ref) { + struct relay_index *index = caa_container_of(ref, struct relay_index, ref); + struct relay_stream *stream = index->stream; int ret; struct lttng_ht_iter iter; - DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 - " on fd %d", index->index_n.key.key1, - index->index_n.key.key2, fd); + if (index->index_fd) { + stream_fd_put(index->index_fd); + index->index_fd = NULL; + } + if (index->in_hash_table) { + /* Delete index from hash table. */ + iter.iter.node = &index->index_n.node; + ret = lttng_ht_del(stream->indexes_ht, &iter); + assert(!ret); + stream->indexes_in_flight--; + } - /* Delete index from hash table. */ - iter.iter.node = &index->index_n.node; - ret = lttng_ht_del(indexes_ht, &iter); - assert(!ret); - call_rcu(&index->rcu_node, deferred_free_relay_index); + stream_put(index->stream); + index->stream = NULL; - return index_write(fd, &index->index_data, sizeof(index->index_data)); + call_rcu(&index->rcu_node, index_destroy_rcu); } /* - * Free the given index. + * Called with stream mutex held. + * + * Stream lock must be held by the caller. */ -void relay_index_free(struct relay_index *index) +void relay_index_put(struct relay_index *index) { - free(index); + DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", + index->stream->stream_handle, index->index_n.key, + (int) index->ref.refcount); + /* + * Ensure existance of index->lock for index unlock. + */ + rcu_read_lock(); + /* + * Index lock ensures that concurrent test and update of stream + * ref is atomic. + */ + pthread_mutex_lock(&index->reflock); + assert(index->ref.refcount != 0); + urcu_ref_put(&index->ref, index_release); + pthread_mutex_unlock(&index->reflock); + rcu_read_unlock(); } /* - * Safely free the given index using a call RCU. + * Try to flush index to disk. Releases self-reference to index once + * flush succeeds. + * + * Stream lock must be held by the caller. + * Return 0 on successful flush, a negative value on error, or positive + * value if no flush was performed. */ -void relay_index_free_safe(struct relay_index *index) +int relay_index_try_flush(struct relay_index *index) { - if (!index) { - return; + int ret = 1; + bool flushed = false; + int fd; + + pthread_mutex_lock(&index->lock); + if (index->flushed) { + goto skip; + } + /* Check if we are ready to flush. */ + if (!index->has_index_data || !index->index_fd) { + goto skip; + } + fd = index->index_fd->fd; + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 + " on fd %d", index->stream->stream_handle, + index->index_n.key, fd); + flushed = true; + index->flushed = true; + ret = index_write(fd, &index->index_data, sizeof(index->index_data)); + if (ret == sizeof(index->index_data)) { + ret = 0; + } else { + ret = -1; } +skip: + pthread_mutex_unlock(&index->lock); - call_rcu(&index->rcu_node, deferred_free_relay_index); + if (flushed) { + /* Put self-ref from index now that it has been flushed. */ + relay_index_put(index); + } + return ret; } /* - * Delete index from the given hash table. - * - * RCU read side lock MUST be acquired. + * Close every relay index within a given stream, without flushing + * them. */ -void relay_index_delete(struct relay_index *index) +void relay_index_close_all(struct relay_stream *stream) { - int ret; struct lttng_ht_iter iter; + struct relay_index *index; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + /* Put self-ref from index. */ + relay_index_put(index); + } + rcu_read_unlock(); +} - DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64 - " deleted.", index->index_n.key.key1, - index->index_n.key.key2); +void relay_index_close_partial_fd(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; - /* Delete index from hash table. */ - iter.iter.node = &index->index_n.node; - ret = lttng_ht_del(indexes_ht, &iter); - assert(!ret); + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + if (!index->index_fd) { + continue; + } + /* + * Partial index has its index_fd: we have only + * received its info from the data socket. + * Put self-ref from index. + */ + relay_index_put(index); + } + rcu_read_unlock(); } -/* - * Destroy every relay index with the given stream id as part of the key. - */ -void relay_index_destroy_by_stream_id(uint64_t stream_id) +uint64_t relay_index_find_last(struct relay_stream *stream) { struct lttng_ht_iter iter; struct relay_index *index; + uint64_t net_seq_num = -1ULL; rcu_read_lock(); - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { - if (index->index_n.key.key1 == stream_id) { - relay_index_delete(index); - relay_index_free_safe(index); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + if (net_seq_num == -1ULL || + index->index_n.key > net_seq_num) { + net_seq_num = index->index_n.key; } } rcu_read_unlock(); + return net_seq_num; }