Clean-up: relayd index: change space to tabs
[lttng-tools.git] / src / bin / lttng-relayd / index.c
index b15bbcd7702954e675a98c06e02ab31058ff7499..ff27d958a0aebd6eb0f7816544eaa1f686ee192d 100644 (file)
@@ -1,20 +1,10 @@
 /*
- * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
- *                      David Goulet <dgoulet@efficios.com>
- *               2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #define _LGPL_SOURCE
 
 #include <common/common.h>
 #include <common/utils.h>
+#include <common/compat/endian.h>
 
 #include "lttng-relayd.h"
 #include "stream.h"
 #include "index.h"
+#include "connection.h"
 
 /*
  * Allocate a new relay index object. Pass the stream in which it is
@@ -58,7 +50,6 @@ static struct relay_index *relay_index_create(struct relay_stream *stream,
 
        lttng_ht_node_init_u64(&index->index_n, net_seq_num);
        pthread_mutex_init(&index->lock, NULL);
-       pthread_mutex_init(&index->reflock, NULL);
        urcu_ref_init(&index->ref);
 
 end:
@@ -98,21 +89,11 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
  */
 static bool relay_index_get(struct relay_index *index)
 {
-       bool has_ref = false;
-
        DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
                        index->stream->stream_handle, index->index_n.key,
                        (int) index->ref.refcount);
 
-       /* Confirm that the index refcount has not reached 0. */
-       pthread_mutex_lock(&index->reflock);
-       if (index->ref.refcount != 0) {
-               has_ref = true;
-               urcu_ref_get(&index->ref);
-       }
-       pthread_mutex_unlock(&index->reflock);
-
-       return has_ref;
+       return urcu_ref_get_unless_zero(&index->ref);
 }
 
 /*
@@ -186,7 +167,7 @@ end:
 }
 
 int relay_index_set_data(struct relay_index *index,
-                const struct ctf_packet_index *data)
+               const struct ctf_packet_index *data)
 {
        int ret = 0;
 
@@ -265,10 +246,8 @@ void relay_index_put(struct relay_index *index)
         * Index lock ensures that concurrent test and update of stream
         * ref is atomic.
         */
-       pthread_mutex_lock(&index->reflock);
        assert(index->ref.refcount != 0);
        urcu_ref_put(&index->ref, index_release);
-       pthread_mutex_unlock(&index->reflock);
        rcu_read_unlock();
 }
 
@@ -284,7 +263,6 @@ int relay_index_try_flush(struct relay_index *index)
 {
        int ret = 1;
        bool flushed = false;
-       int fd;
 
        pthread_mutex_lock(&index->lock);
        if (index->flushed) {
@@ -294,10 +272,9 @@ int relay_index_try_flush(struct relay_index *index)
        if (!index->has_index_data || !index->index_file) {
                goto skip;
        }
-       fd = index->index_file->fd;
-       DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
-                       " on fd %d", index->stream->stream_handle,
-                       index->index_n.key, fd);
+
+       DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
+                       index->stream->stream_handle, index->index_n.key);
        flushed = true;
        index->flushed = true;
        ret = lttng_index_file_write(index->index_file, &index->index_data);
@@ -367,3 +344,88 @@ uint64_t relay_index_find_last(struct relay_stream *stream)
        rcu_read_unlock();
        return net_seq_num;
 }
+
+/*
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
+ */
+static
+int relay_index_switch_file(struct relay_index *index,
+               struct lttng_index_file *new_index_file,
+               uint64_t removed_data_count)
+{
+       int ret = 0;
+       uint64_t offset;
+
+       pthread_mutex_lock(&index->lock);
+       if (!index->index_file) {
+               ERR("No index_file");
+               ret = 0;
+               goto end;
+       }
+
+       lttng_index_file_put(index->index_file);
+       lttng_index_file_get(new_index_file);
+       index->index_file = new_index_file;
+       offset = be64toh(index->index_data.offset);
+       index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+       pthread_mutex_unlock(&index->lock);
+       return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+       int ret = 0;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               ret = relay_index_switch_file(index, stream->index_file,
+                               stream->pos_after_last_complete_data_index);
+               if (ret) {
+                       goto end;
+               }
+       }
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Set index data from the control port to a given index object.
+ */
+int relay_index_set_control_data(struct relay_index *index,
+               const struct lttcomm_relayd_index *data,
+               unsigned int minor_version)
+{
+       /* The index on disk is encoded in big endian. */
+       const struct ctf_packet_index index_data = {
+               .packet_size = htobe64(data->packet_size),
+               .content_size = htobe64(data->content_size),
+               .timestamp_begin = htobe64(data->timestamp_begin),
+               .timestamp_end = htobe64(data->timestamp_end),
+               .events_discarded = htobe64(data->events_discarded),
+               .stream_id = htobe64(data->stream_id),
+       };
+
+       if (minor_version >= 8) {
+               index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
+               index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+       } else {
+               uint64_t unset_value = -1ULL;
+
+               index->index_data.stream_instance_id = htobe64(unset_value);
+               index->index_data.packet_seq_num = htobe64(unset_value);
+       }
+
+       return relay_index_set_data(index, &index_data);
+}
This page took 0.02502 seconds and 4 git commands to generate.