X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Findex.c;h=92d4581d124a95921694129cba29213c08a7bd90;hb=d3ecc5503007bc81faa8049fac945f163b6356f3;hp=cb7ae3db966e34ae3205c00a4f04f15853f3e247;hpb=3d07a857948f868354589ff742b0a2f6277f558f;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index cb7ae3db9..92d4581d1 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -17,12 +17,12 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include #include +#include #include "lttng-relayd.h" #include "stream.h" @@ -59,7 +59,6 @@ static struct relay_index *relay_index_create(struct relay_stream *stream, lttng_ht_node_init_u64(&index->index_n, net_seq_num); pthread_mutex_init(&index->lock, NULL); - pthread_mutex_init(&index->reflock, NULL); urcu_ref_init(&index->ref); end: @@ -99,21 +98,11 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream, */ static bool relay_index_get(struct relay_index *index) { - bool has_ref = false; - DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", index->stream->stream_handle, index->index_n.key, (int) index->ref.refcount); - /* Confirm that the index refcount has not reached 0. */ - pthread_mutex_lock(&index->reflock); - if (index->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&index->ref); - } - pthread_mutex_unlock(&index->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&index->ref); } /* @@ -144,7 +133,7 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, index = relay_index_create(stream, net_seq_num); if (!index) { ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64, - index->stream->stream_handle, net_seq_num); + stream->stream_handle, net_seq_num); goto end; } oldindex = relay_index_add_unique(stream, index); @@ -163,22 +152,23 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, end: rcu_read_unlock(); DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, - (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num); + (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num); return index; } -int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd, +int relay_index_set_file(struct relay_index *index, + struct lttng_index_file *index_file, uint64_t data_offset) { int ret = 0; pthread_mutex_lock(&index->lock); - if (index->index_fd) { + if (index->index_file) { ret = -1; goto end; } - stream_fd_get(index_fd); - index->index_fd = index_fd; + lttng_index_file_get(index_file); + index->index_file = index_file; index->index_data.offset = data_offset; end: pthread_mutex_unlock(&index->lock); @@ -229,9 +219,9 @@ static void index_release(struct urcu_ref *ref) int ret; struct lttng_ht_iter iter; - if (index->index_fd) { - stream_fd_put(index->index_fd); - index->index_fd = NULL; + if (index->index_file) { + lttng_index_file_put(index->index_file); + index->index_file = NULL; } if (index->in_hash_table) { /* Delete index from hash table. */ @@ -265,10 +255,8 @@ void relay_index_put(struct relay_index *index) * Index lock ensures that concurrent test and update of stream * ref is atomic. */ - pthread_mutex_lock(&index->reflock); assert(index->ref.refcount != 0); urcu_ref_put(&index->ref, index_release); - pthread_mutex_unlock(&index->reflock); rcu_read_unlock(); } @@ -291,21 +279,16 @@ int relay_index_try_flush(struct relay_index *index) goto skip; } /* Check if we are ready to flush. */ - if (!index->has_index_data || !index->index_fd) { + if (!index->has_index_data || !index->index_file) { goto skip; } - fd = index->index_fd->fd; + fd = index->index_file->fd; DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 " on fd %d", index->stream->stream_handle, index->index_n.key, fd); flushed = true; index->flushed = true; - ret = index_write(fd, &index->index_data, sizeof(index->index_data)); - if (ret == sizeof(index->index_data)) { - ret = 0; - } else { - ret = -1; - } + ret = lttng_index_file_write(index->index_file, &index->index_data); skip: pthread_mutex_unlock(&index->lock); @@ -342,11 +325,11 @@ void relay_index_close_partial_fd(struct relay_stream *stream) rcu_read_lock(); cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index, index_n.node) { - if (!index->index_fd) { + if (!index->index_file) { continue; } /* - * Partial index has its index_fd: we have only + * Partial index has its index_file: we have only * received its info from the data socket. * Put self-ref from index. */ @@ -372,3 +355,59 @@ uint64_t relay_index_find_last(struct relay_stream *stream) rcu_read_unlock(); return net_seq_num; } + +/* + * Update the index file of an already existing relay_index. + * Offsets by 'removed_data_count' the offset field of an index. + */ +static +int relay_index_switch_file(struct relay_index *index, + struct lttng_index_file *new_index_file, + uint64_t removed_data_count) +{ + int ret = 0; + uint64_t offset; + + pthread_mutex_lock(&index->lock); + if (!index->index_file) { + ERR("No index_file"); + ret = 0; + goto end; + } + + lttng_index_file_put(index->index_file); + lttng_index_file_get(new_index_file); + index->index_file = new_index_file; + offset = be64toh(index->index_data.offset); + index->index_data.offset = htobe64(offset - removed_data_count); + +end: + pthread_mutex_unlock(&index->lock); + return ret; +} + +/* + * Switch the index file of all pending indexes for a stream and update the + * data offset by substracting the last safe position. + * Stream lock must be held. + */ +int relay_index_switch_all_files(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + int ret = 0; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + DBG("Update index to fd %d", stream->index_file->fd); + ret = relay_index_switch_file(index, stream->index_file, + stream->pos_after_last_complete_data_index); + if (ret) { + goto end; + } + } +end: + rcu_read_unlock(); + return ret; +}