Implement the RELAYD_ROTATE_PENDING relay daemon command
[lttng-tools.git] / src / bin / lttng-relayd / index.c
index b15bbcd7702954e675a98c06e02ab31058ff7499..92d4581d124a95921694129cba29213c08a7bd90 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <common/common.h>
 #include <common/utils.h>
+#include <common/compat/endian.h>
 
 #include "lttng-relayd.h"
 #include "stream.h"
@@ -58,7 +59,6 @@ static struct relay_index *relay_index_create(struct relay_stream *stream,
 
        lttng_ht_node_init_u64(&index->index_n, net_seq_num);
        pthread_mutex_init(&index->lock, NULL);
-       pthread_mutex_init(&index->reflock, NULL);
        urcu_ref_init(&index->ref);
 
 end:
@@ -98,21 +98,11 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
  */
 static bool relay_index_get(struct relay_index *index)
 {
-       bool has_ref = false;
-
        DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
                        index->stream->stream_handle, index->index_n.key,
                        (int) index->ref.refcount);
 
-       /* Confirm that the index refcount has not reached 0. */
-       pthread_mutex_lock(&index->reflock);
-       if (index->ref.refcount != 0) {
-               has_ref = true;
-               urcu_ref_get(&index->ref);
-       }
-       pthread_mutex_unlock(&index->reflock);
-
-       return has_ref;
+       return urcu_ref_get_unless_zero(&index->ref);
 }
 
 /*
@@ -265,10 +255,8 @@ void relay_index_put(struct relay_index *index)
         * Index lock ensures that concurrent test and update of stream
         * ref is atomic.
         */
-       pthread_mutex_lock(&index->reflock);
        assert(index->ref.refcount != 0);
        urcu_ref_put(&index->ref, index_release);
-       pthread_mutex_unlock(&index->reflock);
        rcu_read_unlock();
 }
 
@@ -367,3 +355,59 @@ uint64_t relay_index_find_last(struct relay_stream *stream)
        rcu_read_unlock();
        return net_seq_num;
 }
+
+/*
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
+ */
+static
+int relay_index_switch_file(struct relay_index *index,
+               struct lttng_index_file *new_index_file,
+               uint64_t removed_data_count)
+{
+       int ret = 0;
+       uint64_t offset;
+
+       pthread_mutex_lock(&index->lock);
+       if (!index->index_file) {
+               ERR("No index_file");
+               ret = 0;
+               goto end;
+       }
+
+       lttng_index_file_put(index->index_file);
+       lttng_index_file_get(new_index_file);
+       index->index_file = new_index_file;
+       offset = be64toh(index->index_data.offset);
+       index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+       pthread_mutex_unlock(&index->lock);
+       return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+       int ret = 0;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               DBG("Update index to fd %d", stream->index_file->fd);
+               ret = relay_index_switch_file(index, stream->index_file,
+                               stream->pos_after_last_complete_data_index);
+               if (ret) {
+                       goto end;
+               }
+       }
+end:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.024439 seconds and 4 git commands to generate.