#define _LGPL_SOURCE
-#include <common/common.hpp>
-#include <common/utils.hpp>
-#include <common/compat/endian.hpp>
-
+#include "connection.hpp"
+#include "index.hpp"
#include "lttng-relayd.hpp"
#include "stream.hpp"
-#include "index.hpp"
-#include "connection.hpp"
+
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/urcu.hpp>
+#include <common/utils.hpp>
/*
* Allocate a new relay index object. Pass the stream in which it is
* Called with stream mutex held.
* Return allocated object or else NULL on error.
*/
-static struct relay_index *relay_index_create(struct relay_stream *stream,
- uint64_t net_seq_num)
+static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num)
{
struct relay_index *index;
DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
- stream->stream_handle, net_seq_num);
+ stream->stream_handle,
+ net_seq_num);
- index = (relay_index *) zmalloc(sizeof(*index));
+ index = zmalloc<relay_index>();
if (!index) {
PERROR("Relay index zmalloc");
goto end;
if (!stream_get(stream)) {
ERR("Cannot get stream");
free(index);
- index = NULL;
+ index = nullptr;
goto end;
}
index->stream = stream;
lttng_ht_node_init_u64(&index->index_n, net_seq_num);
- pthread_mutex_init(&index->lock, NULL);
+ pthread_mutex_init(&index->lock, nullptr);
urcu_ref_init(&index->ref);
end:
* RCU read side lock MUST be acquired.
*/
static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
- struct relay_index *index)
+ struct relay_index *index)
{
struct cds_lfht_node *node_ptr;
struct relay_index *_index;
ASSERT_RCU_READ_LOCKED();
DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- stream->stream_handle, index->index_n.key);
+ stream->stream_handle,
+ index->index_n.key);
node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
- stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
- stream->indexes_ht->match_fct, &index->index_n,
- &index->index_n.node);
+ stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+ stream->indexes_ht->match_fct,
+ &index->index_n,
+ &index->index_n.node);
if (node_ptr != &index->index_n.node) {
- _index = caa_container_of(node_ptr, struct relay_index,
- index_n.node);
+ _index = caa_container_of(node_ptr, struct relay_index, index_n.node);
} else {
- _index = NULL;
+ _index = nullptr;
}
return _index;
}
ASSERT_RCU_READ_LOCKED();
DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
- index->stream->stream_handle, index->index_n.key,
- (int) index->ref.refcount);
+ index->stream->stream_handle,
+ index->index_n.key,
+ (int) index->ref.refcount);
return urcu_ref_get_unless_zero(&index->ref);
}
* Return index object or else NULL on error.
*/
struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
- uint64_t net_seq_num)
+ uint64_t net_seq_num)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- struct relay_index *index = NULL;
+ struct relay_index *index = nullptr;
DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
- stream->stream_handle, net_seq_num);
+ stream->stream_handle,
+ net_seq_num);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node) {
- index = caa_container_of(node, struct relay_index, index_n);
+ index = lttng::utils::container_of(node, &relay_index::index_n);
} else {
struct relay_index *oldindex;
index = relay_index_create(stream, net_seq_num);
if (!index) {
ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
- stream->stream_handle, net_seq_num);
+ stream->stream_handle,
+ net_seq_num);
goto end;
}
oldindex = relay_index_add_unique(stream, index);
relay_index_put(index);
index = oldindex;
if (!relay_index_get(index)) {
- index = NULL;
+ index = nullptr;
}
} else {
stream->indexes_in_flight++;
}
}
end:
- rcu_read_unlock();
DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
- (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
+ (index == NULL) ? "NOT " : "",
+ stream->stream_handle,
+ net_seq_num);
return index;
}
int relay_index_set_file(struct relay_index *index,
- struct lttng_index_file *index_file,
- uint64_t data_offset)
+ struct lttng_index_file *index_file,
+ uint64_t data_offset)
{
int ret = 0;
return ret;
}
-int relay_index_set_data(struct relay_index *index,
- const struct ctf_packet_index *data)
+int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data)
{
int ret = 0;
static void index_destroy_rcu(struct rcu_head *rcu_head)
{
- struct relay_index *index =
- caa_container_of(rcu_head, struct relay_index, rcu_node);
+ struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node);
index_destroy(index);
}
/* Stream lock must be held by the caller. */
static void index_release(struct urcu_ref *ref)
{
- struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+ struct relay_index *index = lttng::utils::container_of(ref, &relay_index::ref);
struct relay_stream *stream = index->stream;
int ret;
struct lttng_ht_iter iter;
if (index->index_file) {
lttng_index_file_put(index->index_file);
- index->index_file = NULL;
+ index->index_file = nullptr;
}
if (index->in_hash_table) {
/* Delete index from hash table. */
}
stream_put(index->stream);
- index->stream = NULL;
+ index->stream = nullptr;
call_rcu(&index->rcu_node, index_destroy_rcu);
}
void relay_index_put(struct relay_index *index)
{
DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
- index->stream->stream_handle, index->index_n.key,
- (int) index->ref.refcount);
+ index->stream->stream_handle,
+ index->index_n.key,
+ (int) index->ref.refcount);
/*
* Ensure existence of index->lock for index unlock.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/*
* Index lock ensures that concurrent test and update of stream
* ref is atomic.
*/
LTTNG_ASSERT(index->ref.refcount != 0);
urcu_ref_put(&index->ref, index_release);
- rcu_read_unlock();
}
/*
}
DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
- index->stream->stream_handle, index->index_n.key);
+ index->stream->stream_handle,
+ index->index_n.key);
flushed = true;
index->flushed = true;
ret = lttng_index_file_write(index->index_file, &index->index_data);
struct lttng_ht_iter iter;
struct relay_index *index;
- rcu_read_lock();
- cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
- index, index_n.node) {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
/* Put self-ref from index. */
relay_index_put(index);
}
- rcu_read_unlock();
}
void relay_index_close_partial_fd(struct relay_stream *stream)
struct lttng_ht_iter iter;
struct relay_index *index;
- rcu_read_lock();
- cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
- index, index_n.node) {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
if (!index->index_file) {
continue;
}
*/
relay_index_put(index);
}
- rcu_read_unlock();
}
uint64_t relay_index_find_last(struct relay_stream *stream)
struct relay_index *index;
uint64_t net_seq_num = -1ULL;
- rcu_read_lock();
- cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
- index, index_n.node) {
- if (net_seq_num == -1ULL ||
- index->index_n.key > net_seq_num) {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
+ if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) {
net_seq_num = index->index_n.key;
}
}
- rcu_read_unlock();
+
return net_seq_num;
}
* Update the index file of an already existing relay_index.
* Offsets by 'removed_data_count' the offset field of an index.
*/
-static
-int relay_index_switch_file(struct relay_index *index,
- struct lttng_index_file *new_index_file,
- uint64_t removed_data_count)
+static int relay_index_switch_file(struct relay_index *index,
+ struct lttng_index_file *new_index_file,
+ uint64_t removed_data_count)
{
int ret = 0;
uint64_t offset;
struct relay_index *index;
int ret = 0;
- rcu_read_lock();
- cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
- index, index_n.node) {
- ret = relay_index_switch_file(index, stream->index_file,
- stream->pos_after_last_complete_data_index);
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
+ ret = relay_index_switch_file(
+ index, stream->index_file, stream->pos_after_last_complete_data_index);
if (ret) {
- goto end;
+ return ret;
}
}
-end:
- rcu_read_unlock();
+
return ret;
}
* Set index data from the control port to a given index object.
*/
int relay_index_set_control_data(struct relay_index *index,
- const struct lttcomm_relayd_index *data,
- unsigned int minor_version)
+ const struct lttcomm_relayd_index *data,
+ unsigned int minor_version)
{
/* The index on disk is encoded in big endian. */
- ctf_packet_index index_data {};
+ ctf_packet_index index_data{};
index_data.packet_size = htobe64(data->packet_size);
index_data.content_size = htobe64(data->content_size);