Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / bin / lttng-relayd / index.cpp
index f2f102e64d4714d945551435dc5f41e83cb394db..e3b8bde8230717a4f5e225d124f307aba135a4f1 100644 (file)
@@ -9,14 +9,15 @@
 
 #define _LGPL_SOURCE
 
-#include <common/common.h>
-#include <common/utils.h>
-#include <common/compat/endian.h>
+#include "connection.hpp"
+#include "index.hpp"
+#include "lttng-relayd.hpp"
+#include "stream.hpp"
 
-#include "lttng-relayd.h"
-#include "stream.h"
-#include "index.h"
-#include "connection.h"
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/urcu.hpp>
+#include <common/utils.hpp>
 
 /*
  * Allocate a new relay index object. Pass the stream in which it is
  * Called with stream mutex held.
  * Return allocated object or else NULL on error.
  */
-static struct relay_index *relay_index_create(struct relay_stream *stream,
-               uint64_t net_seq_num)
+static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num)
 {
        struct relay_index *index;
 
        DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
-                       stream->stream_handle, net_seq_num);
+            stream->stream_handle,
+            net_seq_num);
 
-       index = (relay_index *) zmalloc(sizeof(*index));
+       index = zmalloc<relay_index>();
        if (!index) {
                PERROR("Relay index zmalloc");
                goto end;
@@ -42,13 +43,13 @@ static struct relay_index *relay_index_create(struct relay_stream *stream,
        if (!stream_get(stream)) {
                ERR("Cannot get stream");
                free(index);
-               index = NULL;
+               index = nullptr;
                goto end;
        }
        index->stream = stream;
 
        lttng_ht_node_init_u64(&index->index_n, net_seq_num);
-       pthread_mutex_init(&index->lock, NULL);
+       pthread_mutex_init(&index->lock, nullptr);
        urcu_ref_init(&index->ref);
 
 end:
@@ -62,23 +63,26 @@ end:
  * RCU read side lock MUST be acquired.
  */
 static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
-               struct relay_index *index)
+                                                 struct relay_index *index)
 {
        struct cds_lfht_node *node_ptr;
        struct relay_index *_index;
 
+       ASSERT_RCU_READ_LOCKED();
+
        DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
-                       stream->stream_handle, index->index_n.key);
+            stream->stream_handle,
+            index->index_n.key);
 
        node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
-                       stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
-                       stream->indexes_ht->match_fct, &index->index_n,
-                       &index->index_n.node);
+                                      stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+                                      stream->indexes_ht->match_fct,
+                                      &index->index_n,
+                                      &index->index_n.node);
        if (node_ptr != &index->index_n.node) {
-               _index = caa_container_of(node_ptr, struct relay_index,
-                               index_n.node);
+               _index = caa_container_of(node_ptr, struct relay_index, index_n.node);
        } else {
-               _index = NULL;
+               _index = nullptr;
        }
        return _index;
 }
@@ -88,9 +92,12 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
  */
 static bool relay_index_get(struct relay_index *index)
 {
+       ASSERT_RCU_READ_LOCKED();
+
        DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
-                       index->stream->stream_handle, index->index_n.key,
-                       (int) index->ref.refcount);
+            index->stream->stream_handle,
+            index->index_n.key,
+            (int) index->ref.refcount);
 
        return urcu_ref_get_unless_zero(&index->ref);
 }
@@ -103,27 +110,29 @@ static bool relay_index_get(struct relay_index *index)
  * Return index object or else NULL on error.
  */
 struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
-                uint64_t net_seq_num)
+                                                   uint64_t net_seq_num)
 {
        struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
-       struct relay_index *index = NULL;
+       struct relay_index *index = nullptr;
 
        DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
-                       stream->stream_handle, net_seq_num);
+            stream->stream_handle,
+            net_seq_num);
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node) {
-               index = caa_container_of(node, struct relay_index, index_n);
+               index = lttng::utils::container_of(node, &relay_index::index_n);
        } else {
                struct relay_index *oldindex;
 
                index = relay_index_create(stream, net_seq_num);
                if (!index) {
                        ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
-                               stream->stream_handle, net_seq_num);
+                           stream->stream_handle,
+                           net_seq_num);
                        goto end;
                }
                oldindex = relay_index_add_unique(stream, index);
@@ -132,7 +141,7 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
                        relay_index_put(index);
                        index = oldindex;
                        if (!relay_index_get(index)) {
-                               index = NULL;
+                               index = nullptr;
                        }
                } else {
                        stream->indexes_in_flight++;
@@ -140,15 +149,16 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
                }
        }
 end:
-       rcu_read_unlock();
        DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
-                       (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
+            (index == NULL) ? "NOT " : "",
+            stream->stream_handle,
+            net_seq_num);
        return index;
 }
 
 int relay_index_set_file(struct relay_index *index,
-               struct lttng_index_file *index_file,
-               uint64_t data_offset)
+                        struct lttng_index_file *index_file,
+                        uint64_t data_offset)
 {
        int ret = 0;
 
@@ -165,8 +175,7 @@ end:
        return ret;
 }
 
-int relay_index_set_data(struct relay_index *index,
-               const struct ctf_packet_index *data)
+int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data)
 {
        int ret = 0;
 
@@ -195,8 +204,7 @@ static void index_destroy(struct relay_index *index)
 
 static void index_destroy_rcu(struct rcu_head *rcu_head)
 {
-       struct relay_index *index =
-               caa_container_of(rcu_head, struct relay_index, rcu_node);
+       struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node);
 
        index_destroy(index);
 }
@@ -204,14 +212,14 @@ static void index_destroy_rcu(struct rcu_head *rcu_head)
 /* Stream lock must be held by the caller. */
 static void index_release(struct urcu_ref *ref)
 {
-       struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+       struct relay_index *index = lttng::utils::container_of(ref, &relay_index::ref);
        struct relay_stream *stream = index->stream;
        int ret;
        struct lttng_ht_iter iter;
 
        if (index->index_file) {
                lttng_index_file_put(index->index_file);
-               index->index_file = NULL;
+               index->index_file = nullptr;
        }
        if (index->in_hash_table) {
                /* Delete index from hash table. */
@@ -222,7 +230,7 @@ static void index_release(struct urcu_ref *ref)
        }
 
        stream_put(index->stream);
-       index->stream = NULL;
+       index->stream = nullptr;
 
        call_rcu(&index->rcu_node, index_destroy_rcu);
 }
@@ -235,19 +243,19 @@ static void index_release(struct urcu_ref *ref)
 void relay_index_put(struct relay_index *index)
 {
        DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
-                       index->stream->stream_handle, index->index_n.key,
-                       (int) index->ref.refcount);
+            index->stream->stream_handle,
+            index->index_n.key,
+            (int) index->ref.refcount);
        /*
         * Ensure existence of index->lock for index unlock.
         */
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        /*
         * Index lock ensures that concurrent test and update of stream
         * ref is atomic.
         */
        LTTNG_ASSERT(index->ref.refcount != 0);
        urcu_ref_put(&index->ref, index_release);
-       rcu_read_unlock();
 }
 
 /*
@@ -273,7 +281,8 @@ int relay_index_try_flush(struct relay_index *index)
        }
 
        DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
-                       index->stream->stream_handle, index->index_n.key);
+            index->stream->stream_handle,
+            index->index_n.key);
        flushed = true;
        index->flushed = true;
        ret = lttng_index_file_write(index->index_file, &index->index_data);
@@ -296,13 +305,12 @@ void relay_index_close_all(struct relay_stream *stream)
        struct lttng_ht_iter iter;
        struct relay_index *index;
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
-                       index, index_n.node) {
+       lttng::urcu::read_lock_guard read_lock;
+
+       cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
                /* Put self-ref from index. */
                relay_index_put(index);
        }
-       rcu_read_unlock();
 }
 
 void relay_index_close_partial_fd(struct relay_stream *stream)
@@ -310,9 +318,9 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
        struct lttng_ht_iter iter;
        struct relay_index *index;
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
-                       index, index_n.node) {
+       lttng::urcu::read_lock_guard read_lock;
+
+       cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
                if (!index->index_file) {
                        continue;
                }
@@ -323,7 +331,6 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
                 */
                relay_index_put(index);
        }
-       rcu_read_unlock();
 }
 
 uint64_t relay_index_find_last(struct relay_stream *stream)
@@ -332,15 +339,14 @@ uint64_t relay_index_find_last(struct relay_stream *stream)
        struct relay_index *index;
        uint64_t net_seq_num = -1ULL;
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
-                       index, index_n.node) {
-               if (net_seq_num == -1ULL ||
-                               index->index_n.key > net_seq_num) {
+       lttng::urcu::read_lock_guard read_lock;
+
+       cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
+               if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) {
                        net_seq_num = index->index_n.key;
                }
        }
-       rcu_read_unlock();
+
        return net_seq_num;
 }
 
@@ -348,10 +354,9 @@ uint64_t relay_index_find_last(struct relay_stream *stream)
  * Update the index file of an already existing relay_index.
  * Offsets by 'removed_data_count' the offset field of an index.
  */
-static
-int relay_index_switch_file(struct relay_index *index,
-               struct lttng_index_file *new_index_file,
-               uint64_t removed_data_count)
+static int relay_index_switch_file(struct relay_index *index,
+                                  struct lttng_index_file *new_index_file,
+                                  uint64_t removed_data_count)
 {
        int ret = 0;
        uint64_t offset;
@@ -385,17 +390,16 @@ int relay_index_switch_all_files(struct relay_stream *stream)
        struct relay_index *index;
        int ret = 0;
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
-                       index, index_n.node) {
-               ret = relay_index_switch_file(index, stream->index_file,
-                               stream->pos_after_last_complete_data_index);
+       lttng::urcu::read_lock_guard read_lock;
+
+       cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
+               ret = relay_index_switch_file(
+                       index, stream->index_file, stream->pos_after_last_complete_data_index);
                if (ret) {
-                       goto end;
+                       return ret;
                }
        }
-end:
-       rcu_read_unlock();
+
        return ret;
 }
 
@@ -403,11 +407,11 @@ end:
  * Set index data from the control port to a given index object.
  */
 int relay_index_set_control_data(struct relay_index *index,
-               const struct lttcomm_relayd_index *data,
-               unsigned int minor_version)
+                                const struct lttcomm_relayd_index *data,
+                                unsigned int minor_version)
 {
        /* The index on disk is encoded in big endian. */
-       ctf_packet_index index_data {};
+       ctf_packet_index index_data{};
 
        index_data.packet_size = htobe64(data->packet_size);
        index_data.content_size = htobe64(data->content_size);
This page took 0.028459 seconds and 4 git commands to generate.