Clean-up: relayd index: change space to tabs
[lttng-tools.git] / src / bin / lttng-relayd / index.c
index 8cacdd2f01eb8807d9a5d5de239277b311776256..ff27d958a0aebd6eb0f7816544eaa1f686ee192d 100644 (file)
 /*
- * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
- *                      David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 
 #include <common/common.h>
 #include <common/utils.h>
+#include <common/compat/endian.h>
 
 #include "lttng-relayd.h"
+#include "stream.h"
 #include "index.h"
+#include "connection.h"
 
 /*
- * Deferred free of a relay index object. MUST only be called by a call RCU.
+ * Allocate a new relay index object. Pass the stream in which it is
+ * contained as parameter. The sequence number will be used as the hash
+ * table key.
+ *
+ * Called with stream mutex held.
+ * Return allocated object or else NULL on error.
  */
-static void deferred_free_relay_index(struct rcu_head *head)
+static struct relay_index *relay_index_create(struct relay_stream *stream,
+               uint64_t net_seq_num)
 {
-       struct relay_index *index =
-               caa_container_of(head, struct relay_index, rcu_node);
+       struct relay_index *index;
 
-       if (index->to_close_fd >= 0) {
-               int ret;
+       DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
+                       stream->stream_handle, net_seq_num);
 
-               ret = close(index->to_close_fd);
-               if (ret < 0) {
-                       PERROR("Relay index to close fd %d", index->to_close_fd);
-               }
+       index = zmalloc(sizeof(*index));
+       if (!index) {
+               PERROR("Relay index zmalloc");
+               goto end;
+       }
+       if (!stream_get(stream)) {
+               ERR("Cannot get stream");
+               free(index);
+               index = NULL;
+               goto end;
        }
+       index->stream = stream;
+
+       lttng_ht_node_init_u64(&index->index_n, net_seq_num);
+       pthread_mutex_init(&index->lock, NULL);
+       urcu_ref_init(&index->ref);
 
-       relay_index_free(index);
+end:
+       return index;
 }
 
 /*
- * Allocate a new relay index object using the given stream ID and sequence
- * number as the hash table key.
+ * Add unique relay index to the given hash table. In case of a collision, the
+ * already existing object is put in the given _index variable.
  *
- * Return allocated object or else NULL on error.
+ * RCU read side lock MUST be acquired.
  */
-struct relay_index *relay_index_create(uint64_t stream_id,
-               uint64_t net_seq_num)
+static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
+               struct relay_index *index)
 {
-       struct relay_index *index;
+       struct cds_lfht_node *node_ptr;
+       struct relay_index *_index;
 
-       DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
-                       stream_id, net_seq_num);
+       DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+                       stream->stream_handle, index->index_n.key);
 
-       index = zmalloc(sizeof(*index));
-       if (index == NULL) {
-               PERROR("Relay index zmalloc");
-               goto error;
+       node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
+                       stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+                       stream->indexes_ht->match_fct, &index->index_n,
+                       &index->index_n.node);
+       if (node_ptr != &index->index_n.node) {
+               _index = caa_container_of(node_ptr, struct relay_index,
+                               index_n.node);
+       } else {
+               _index = NULL;
        }
+       return _index;
+}
 
-       index->to_close_fd = -1;
-       lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
+/*
+ * Should be called with RCU read-side lock held.
+ */
+static bool relay_index_get(struct relay_index *index)
+{
+       DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+                       index->stream->stream_handle, index->index_n.key,
+                       (int) index->ref.refcount);
 
-error:
-       return index;
+       return urcu_ref_get_unless_zero(&index->ref);
 }
 
 /*
- * Find a relayd index in the given hash table.
+ * Get a relayd index in within the given stream, or create it if not
+ * present.
  *
+ * Called with stream mutex held.
  * Return index object or else NULL on error.
  */
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
+                uint64_t net_seq_num)
 {
-       struct lttng_ht_node_two_u64 *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
-       struct lttng_ht_two_u64 key;
        struct relay_index *index = NULL;
 
        DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
-                       stream_id, net_seq_num);
-
-       key.key1 = stream_id;
-       key.key2 = net_seq_num;
+                       stream->stream_handle, net_seq_num);
 
-       lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
-       node = lttng_ht_iter_get_node_two_u64(&iter);
-       if (node == NULL) {
-               goto end;
+       rcu_read_lock();
+       lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
+       if (node) {
+               index = caa_container_of(node, struct relay_index, index_n);
+       } else {
+               struct relay_index *oldindex;
+
+               index = relay_index_create(stream, net_seq_num);
+               if (!index) {
+                       ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
+                               stream->stream_handle, net_seq_num);
+                       goto end;
+               }
+               oldindex = relay_index_add_unique(stream, index);
+               if (oldindex) {
+                       /* Added concurrently, keep old. */
+                       relay_index_put(index);
+                       index = oldindex;
+                       if (!relay_index_get(index)) {
+                               index = NULL;
+                       }
+               } else {
+                       stream->indexes_in_flight++;
+                       index->in_hash_table = true;
+               }
        }
-       index = caa_container_of(node, struct relay_index, index_n);
-
 end:
-       DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
-                       (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
+       rcu_read_unlock();
+       DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
+                       (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
        return index;
 }
 
-/*
- * Add unique relay index to the given hash table. In case of a collision, the
- * already existing object is put in the given _index variable.
- *
- * RCU read side lock MUST be acquired.
- */
-void relay_index_add(struct relay_index *index, struct relay_index **_index)
+int relay_index_set_file(struct relay_index *index,
+               struct lttng_index_file *index_file,
+               uint64_t data_offset)
 {
-       struct cds_lfht_node *node_ptr;
+       int ret = 0;
 
-       assert(index);
+       pthread_mutex_lock(&index->lock);
+       if (index->index_file) {
+               ret = -1;
+               goto end;
+       }
+       lttng_index_file_get(index_file);
+       index->index_file = index_file;
+       index->index_data.offset = data_offset;
+end:
+       pthread_mutex_unlock(&index->lock);
+       return ret;
+}
 
-       DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
-                       index->key.key1, index->key.key2);
+int relay_index_set_data(struct relay_index *index,
+               const struct ctf_packet_index *data)
+{
+       int ret = 0;
 
-       node_ptr = cds_lfht_add_unique(indexes_ht->ht,
-                       indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
-                       indexes_ht->match_fct, (void *) &index->index_n.key,
-                       &index->index_n.node);
-       if (node_ptr != &index->index_n.node) {
-               *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
+       pthread_mutex_lock(&index->lock);
+       if (index->has_index_data) {
+               ret = -1;
+               goto end;
        }
+       /* Set everything except data_offset. */
+       index->index_data.packet_size = data->packet_size;
+       index->index_data.content_size = data->content_size;
+       index->index_data.timestamp_begin = data->timestamp_begin;
+       index->index_data.timestamp_end = data->timestamp_end;
+       index->index_data.events_discarded = data->events_discarded;
+       index->index_data.stream_id = data->stream_id;
+       index->has_index_data = true;
+end:
+       pthread_mutex_unlock(&index->lock);
+       return ret;
 }
 
-/*
- * Write index on disk to the given fd. Once done error or not, it is removed
- * from the hash table and destroy the object.
- *
- * MUST be called with a RCU read side lock held.
- *
- * Return 0 on success else a negative value.
- */
-int relay_index_write(int fd, struct relay_index *index)
+static void index_destroy(struct relay_index *index)
+{
+       free(index);
+}
+
+static void index_destroy_rcu(struct rcu_head *rcu_head)
+{
+       struct relay_index *index =
+               caa_container_of(rcu_head, struct relay_index, rcu_node);
+
+       index_destroy(index);
+}
+
+/* Stream lock must be held by the caller. */
+static void index_release(struct urcu_ref *ref)
 {
+       struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+       struct relay_stream *stream = index->stream;
        int ret;
        struct lttng_ht_iter iter;
 
-       DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
-                       " on fd %d", index->key.key1, index->key.key2, fd);
+       if (index->index_file) {
+               lttng_index_file_put(index->index_file);
+               index->index_file = NULL;
+       }
+       if (index->in_hash_table) {
+               /* Delete index from hash table. */
+               iter.iter.node = &index->index_n.node;
+               ret = lttng_ht_del(stream->indexes_ht, &iter);
+               assert(!ret);
+               stream->indexes_in_flight--;
+       }
 
-       /* Delete index from hash table. */
-       iter.iter.node = &index->index_n.node;
-       ret = lttng_ht_del(indexes_ht, &iter);
-       assert(!ret);
-       call_rcu(&index->rcu_node, deferred_free_relay_index);
+       stream_put(index->stream);
+       index->stream = NULL;
 
-       return index_write(fd, &index->index_data, sizeof(index->index_data));
+       call_rcu(&index->rcu_node, index_destroy_rcu);
 }
 
 /*
- * Free the given index.
+ * Called with stream mutex held.
+ *
+ * Stream lock must be held by the caller.
  */
-void relay_index_free(struct relay_index *index)
+void relay_index_put(struct relay_index *index)
 {
-       free(index);
+       DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+                       index->stream->stream_handle, index->index_n.key,
+                       (int) index->ref.refcount);
+       /*
+        * Ensure existance of index->lock for index unlock.
+        */
+       rcu_read_lock();
+       /*
+        * Index lock ensures that concurrent test and update of stream
+        * ref is atomic.
+        */
+       assert(index->ref.refcount != 0);
+       urcu_ref_put(&index->ref, index_release);
+       rcu_read_unlock();
 }
 
 /*
- * Safely free the given index using a call RCU.
+ * Try to flush index to disk. Releases self-reference to index once
+ * flush succeeds.
+ *
+ * Stream lock must be held by the caller.
+ * Return 0 on successful flush, a negative value on error, or positive
+ * value if no flush was performed.
  */
-void relay_index_free_safe(struct relay_index *index)
+int relay_index_try_flush(struct relay_index *index)
 {
-       if (!index) {
-               return;
+       int ret = 1;
+       bool flushed = false;
+
+       pthread_mutex_lock(&index->lock);
+       if (index->flushed) {
+               goto skip;
+       }
+       /* Check if we are ready to flush. */
+       if (!index->has_index_data || !index->index_file) {
+               goto skip;
        }
 
-       call_rcu(&index->rcu_node, deferred_free_relay_index);
+       DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
+                       index->stream->stream_handle, index->index_n.key);
+       flushed = true;
+       index->flushed = true;
+       ret = lttng_index_file_write(index->index_file, &index->index_data);
+skip:
+       pthread_mutex_unlock(&index->lock);
+
+       if (flushed) {
+               /* Put self-ref from index now that it has been flushed. */
+               relay_index_put(index);
+       }
+       return ret;
 }
 
 /*
- * Delete index from the given hash table.
- *
- * RCU read side lock MUST be acquired.
+ * Close every relay index within a given stream, without flushing
+ * them.
  */
-void relay_index_delete(struct relay_index *index)
+void relay_index_close_all(struct relay_stream *stream)
 {
-       int ret;
        struct lttng_ht_iter iter;
+       struct relay_index *index;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               /* Put self-ref from index. */
+               relay_index_put(index);
+       }
+       rcu_read_unlock();
+}
 
-       DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
-                       "deleted.", index->key.key1, index->key.key2);
+void relay_index_close_partial_fd(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
 
-       /* Delete index from hash table. */
-       iter.iter.node = &index->index_n.node;
-       ret = lttng_ht_del(indexes_ht, &iter);
-       assert(!ret);
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               if (!index->index_file) {
+                       continue;
+               }
+               /*
+                * Partial index has its index_file: we have only
+                * received its info from the data socket.
+                * Put self-ref from index.
+                */
+               relay_index_put(index);
+       }
+       rcu_read_unlock();
+}
+
+uint64_t relay_index_find_last(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+       uint64_t net_seq_num = -1ULL;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               if (net_seq_num == -1ULL ||
+                               index->index_n.key > net_seq_num) {
+                       net_seq_num = index->index_n.key;
+               }
+       }
+       rcu_read_unlock();
+       return net_seq_num;
 }
 
 /*
- * Destroy every relay index with the given stream id as part of the key.
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
  */
-void relay_index_destroy_by_stream_id(uint64_t stream_id)
+static
+int relay_index_switch_file(struct relay_index *index,
+               struct lttng_index_file *new_index_file,
+               uint64_t removed_data_count)
+{
+       int ret = 0;
+       uint64_t offset;
+
+       pthread_mutex_lock(&index->lock);
+       if (!index->index_file) {
+               ERR("No index_file");
+               ret = 0;
+               goto end;
+       }
+
+       lttng_index_file_put(index->index_file);
+       lttng_index_file_get(new_index_file);
+       index->index_file = new_index_file;
+       offset = be64toh(index->index_data.offset);
+       index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+       pthread_mutex_unlock(&index->lock);
+       return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
 {
        struct lttng_ht_iter iter;
        struct relay_index *index;
+       int ret = 0;
 
        rcu_read_lock();
-       cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
-               if (index->key.key1 == stream_id) {
-                       relay_index_delete(index);
-                       relay_index_free_safe(index);
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               ret = relay_index_switch_file(index, stream->index_file,
+                               stream->pos_after_last_complete_data_index);
+               if (ret) {
+                       goto end;
                }
        }
+end:
        rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Set index data from the control port to a given index object.
+ */
+int relay_index_set_control_data(struct relay_index *index,
+               const struct lttcomm_relayd_index *data,
+               unsigned int minor_version)
+{
+       /* The index on disk is encoded in big endian. */
+       const struct ctf_packet_index index_data = {
+               .packet_size = htobe64(data->packet_size),
+               .content_size = htobe64(data->content_size),
+               .timestamp_begin = htobe64(data->timestamp_begin),
+               .timestamp_end = htobe64(data->timestamp_end),
+               .events_discarded = htobe64(data->events_discarded),
+               .stream_id = htobe64(data->stream_id),
+       };
+
+       if (minor_version >= 8) {
+               index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
+               index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+       } else {
+               uint64_t unset_value = -1ULL;
+
+               index->index_data.stream_instance_id = htobe64(unset_value);
+               index->index_data.packet_seq_num = htobe64(unset_value);
+       }
+
+       return relay_index_set_data(index, &index_data);
 }
This page took 0.029333 seconds and 4 git commands to generate.