*
*/
-#include "common/index/ctf-index.h"
-#include <stdint.h>
#define _LGPL_SOURCE
+#include <common/align.hpp>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-testpoint.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/dynamic-array.hpp>
+#include <common/index/ctf-index.hpp>
+#include <common/index/index.hpp>
+#include <common/io-hint.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/string-utils/format.hpp>
+#include <common/time.hpp>
+#include <common/trace-chunk-registry.hpp>
+#include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
+
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
+#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
+#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
-#include <inttypes.h>
-#include <signal.h>
-
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/utils.h>
-#include <common/time.h>
-#include <common/compat/poll.h>
-#include <common/compat/endian.h>
-#include <common/index/index.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-testpoint.h>
-#include <common/align.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/trace-chunk.h>
-#include <common/trace-chunk-registry.h>
-#include <common/string-utils/format.h>
-#include <common/dynamic-array.h>
lttng_consumer_global_data the_consumer_data;
CONSUMER_CHANNEL_QUIT,
};
+namespace {
struct consumer_channel_msg {
enum consumer_channel_action action;
- struct lttng_consumer_channel *chan; /* add */
- uint64_t key; /* del */
+ struct lttng_consumer_channel *chan; /* add */
+ uint64_t key; /* del */
};
+/*
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht;
+struct lttng_ht *data_ht;
+} /* namespace */
+
/* Flag used to temporarily pause data consumption from testpoints. */
int data_consumption_paused;
*/
int consumer_quit;
-/*
- * Global hash table containing respectively metadata and data streams. The
- * stream element in this ht should only be updated by the metadata poll thread
- * for the metadata and the data poll thread for the data.
- */
-static struct lttng_ht *metadata_ht;
-static struct lttng_ht *data_ht;
-
-static const char *get_consumer_domain(void)
+static const char *get_consumer_domain()
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
*/
static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
{
- struct lttng_consumer_stream *null_stream = NULL;
+ struct lttng_consumer_stream *null_stream = nullptr;
LTTNG_ASSERT(pipe);
- (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+ (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+ pointer. */
}
static void notify_health_quit_pipe(int *pipe)
}
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *chan,
- uint64_t key,
- enum consumer_channel_action action)
+ struct lttng_consumer_channel *chan,
+ uint64_t key,
+ enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
ssize_t ret;
}
}
-void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
- uint64_t key)
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
{
- notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+ notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
}
static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel **chan,
- uint64_t *key,
- enum consumer_channel_action *action)
+ struct lttng_consumer_channel **chan,
+ uint64_t *key,
+ enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
ssize_t ret;
LTTNG_ASSERT(channel);
/* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
- cds_list_del(&stream->send_node);
+ cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
/*
* Once a stream is added to this list, the buffers were created so we
* have a guarantee that this call will succeed. Setting the monitor
* global hash table.
*/
stream->monitor = 0;
- consumer_stream_destroy(stream, NULL);
+ consumer_stream_destroy(stream, nullptr);
}
}
* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *find_stream(uint64_t key,
- struct lttng_ht *ht)
+static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
- struct lttng_consumer_stream *stream = NULL;
+ struct lttng_consumer_stream *stream = nullptr;
LTTNG_ASSERT(ht);
/* -1ULL keys are lookup failures */
if (key == (uint64_t) -1ULL) {
- return NULL;
+ return nullptr;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
- stream = caa_container_of(node, struct lttng_consumer_stream, node);
+ if (node != nullptr) {
+ stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
- rcu_read_unlock();
-
return stream;
}
{
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
stream = find_stream(key, ht);
if (stream) {
stream->key = (uint64_t) -1ULL;
*/
stream->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
/*
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
- struct lttng_consumer_channel *channel = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
ASSERT_RCU_READ_LOCKED();
/* -1ULL keys are lookup failures */
if (key == (uint64_t) -1ULL) {
- return NULL;
+ return nullptr;
}
lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
- channel = caa_container_of(node, struct lttng_consumer_channel, node);
+ if (node != nullptr) {
+ channel = lttng::utils::container_of(node, <tng_consumer_channel::node);
}
return channel;
{
struct lttng_consumer_channel *channel;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(key);
if (channel) {
channel->key = (uint64_t) -1ULL;
*/
channel->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
static void free_channel_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
+ struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct lttng_consumer_channel *channel =
- caa_container_of(node, struct lttng_consumer_channel, node);
+ lttng::utils::container_of(node, <tng_consumer_channel::node);
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
*/
static void free_relayd_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
+ struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct consumer_relayd_sock_pair *relayd =
- caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
/*
* Close all sockets. This is done in the call RCU since we don't want the
int ret;
struct lttng_ht_iter iter;
- if (relayd == NULL) {
+ if (relayd == nullptr) {
return;
}
consumer_timer_monitor_stop(channel);
}
+ /*
+ * Send a last buffer statistics sample to the session daemon
+ * to ensure it tracks the amount of data consumed by this channel.
+ */
+ sample_and_send_channel_buffer_stats(channel);
+
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
}
lttng_trace_chunk_put(channel->trace_chunk);
- channel->trace_chunk = NULL;
+ channel->trace_chunk = nullptr;
if (channel->is_published) {
int ret;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
LTTNG_ASSERT(!ret);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
- ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht,
- &iter);
+ ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
LTTNG_ASSERT(!ret);
- rcu_read_unlock();
}
channel->is_deleted = true;
* Iterate over the relayd hash table and destroy each element. Finally,
* destroy the whole hash table.
*/
-static void cleanup_relayd_ht(void)
+static void cleanup_relayd_ht()
{
struct lttng_ht_iter iter;
struct consumer_relayd_sock_pair *relayd;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
- relayd, node.node) {
- consumer_destroy_relayd(relayd);
+ cds_lfht_for_each_entry (
+ the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ consumer_destroy_relayd(relayd);
+ }
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.relayd_ht);
}
* because we handle the write/read race with a pipe wakeup for each thread.
*/
static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
- enum consumer_endpoint_status status)
+ enum consumer_endpoint_status status)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Let's begin with metadata */
- cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
/* Follow up by the data streams */
- cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to data stream %d", stream->wait_fd);
}
}
- rcu_read_unlock();
}
/*
*
* One this call returns, the stream object is not longer usable nor visible.
*/
-void consumer_del_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
{
consumer_stream_destroy(stream, ht);
}
consumer_stream_destroy(stream, metadata_ht);
}
-void consumer_stream_update_channel_attributes(
- struct lttng_consumer_stream *stream,
- struct lttng_consumer_channel *channel)
+void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel)
{
- stream->channel_read_only_attributes.tracefile_size =
- channel->tracefile_size;
+ stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
}
/*
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht,
- &stream->node_channel_id);
+ lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_u64(the_consumer_data.stream_list_ht,
- &stream->node_session_id);
+ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
the_consumer_data.stream_count++;
the_consumer_data.need_update = 1;
- rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
LTTNG_ASSERT(relayd);
ASSERT_RCU_READ_LOCKED();
- lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx,
- &iter);
+ lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
+ if (node != nullptr) {
goto end;
}
lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
/*
* Allocate and return a consumer relayd socket.
*/
-static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- uint64_t net_seq_idx)
+static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
{
- struct consumer_relayd_sock_pair *obj = NULL;
+ struct consumer_relayd_sock_pair *obj = nullptr;
/* net sequence index of -1 is a failure */
if (net_seq_idx == (uint64_t) -1ULL) {
goto error;
}
- obj = (consumer_relayd_sock_pair *) zmalloc(sizeof(struct consumer_relayd_sock_pair));
- if (obj == NULL) {
+ obj = zmalloc<consumer_relayd_sock_pair>();
+ if (obj == nullptr) {
PERROR("zmalloc relayd sock");
goto error;
}
obj->control_sock.sock.fd = -1;
obj->data_sock.sock.fd = -1;
lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
- pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
+ pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
error:
return obj;
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
- relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ if (node != nullptr) {
+ relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
}
error:
*
* Returns 0 on success, < 0 on error
*/
-int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
- char *path)
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
LTTNG_ASSERT(path);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
+ if (relayd != nullptr) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_add_stream(&relayd->control_sock, stream->name,
- get_consumer_domain(), path, &stream->relayd_stream_id,
- stream->chan->tracefile_size,
- stream->chan->tracefile_count,
- stream->trace_chunk);
+ ret = relayd_add_stream(&relayd->control_sock,
+ stream->name,
+ get_consumer_domain(),
+ path,
+ &stream->relayd_stream_id,
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count,
+ stream->trace_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto end;
}
stream->sent_to_relayd = 1;
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
- stream->key, stream->net_seq_idx);
+ stream->key,
+ stream->net_seq_idx);
ret = -1;
goto end;
}
DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
- stream->name, stream->key, stream->net_seq_idx);
+ stream->name,
+ stream->key,
+ stream->net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
LTTNG_ASSERT(net_seq_idx != -1ULL);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(net_seq_idx);
- if (relayd != NULL) {
+ if (relayd != nullptr) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_streams_sent(&relayd->control_sock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto end;
}
} else {
- ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
- net_seq_idx);
+ ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
ret = -1;
goto end;
}
DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
- rcu_read_unlock();
}
/*
* Return destination file descriptor or negative value on error.
*/
static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
- size_t data_size, unsigned long padding,
- struct consumer_relayd_sock_pair *relayd)
+ size_t data_size,
+ unsigned long padding,
+ struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
struct lttcomm_relayd_data_hdr data_hdr;
data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
- ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
- sizeof(data_hdr));
+ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
if (ret < 0) {
goto error;
}
{
int ret = 0;
- DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
- channel->name);
+ DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
if (channel->monitor && channel->metadata_stream) {
const char dummy = 'c';
- const ssize_t write_ret = lttng_write(
- channel->metadata_stream->ust_metadata_poll_pipe[1],
- &dummy, 1);
+ const ssize_t write_ret =
+ lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
if (write_ret < 1) {
if (errno == EWOULDBLOCK) {
*
* The caller must hold the channel and stream locks.
*/
-static
-int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
+static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
{
int ret;
return ret;
}
-static
-int lttng_consumer_channel_set_trace_chunk(
- struct lttng_consumer_channel *channel,
- struct lttng_trace_chunk *new_trace_chunk)
+static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
+ struct lttng_trace_chunk *new_trace_chunk)
{
pthread_mutex_lock(&channel->lock);
if (channel->is_deleted) {
* chunk is already held by the caller.
*/
if (new_trace_chunk) {
- const bool acquired_reference = lttng_trace_chunk_get(
- new_trace_chunk);
+ const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
LTTNG_ASSERT(acquired_reference);
}
* On error, return NULL.
*/
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
- uint64_t session_id,
- const uint64_t *chunk_id,
- const char *pathname,
- const char *name,
- uint64_t relayd_id,
- enum lttng_event_output output,
- uint64_t tracefile_size,
- uint64_t tracefile_count,
- uint64_t session_id_per_pid,
- unsigned int monitor,
- unsigned int live_timer_interval,
- bool is_in_live_session,
- const char *root_shm_path,
- const char *shm_path)
+ uint64_t session_id,
+ const uint64_t *chunk_id,
+ const char *pathname,
+ const char *name,
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval,
+ bool is_in_live_session,
+ const char *root_shm_path,
+ const char *shm_path)
{
- struct lttng_consumer_channel *channel = NULL;
- struct lttng_trace_chunk *trace_chunk = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
+ struct lttng_trace_chunk *trace_chunk = nullptr;
if (chunk_id) {
trace_chunk = lttng_trace_chunk_registry_find_chunk(
- the_consumer_data.chunk_registry, session_id,
- *chunk_id);
+ the_consumer_data.chunk_registry, session_id, *chunk_id);
if (!trace_chunk) {
ERR("Failed to find trace chunk reference during creation of channel");
goto end;
}
}
- channel = (lttng_consumer_channel *) zmalloc(sizeof(*channel));
- if (channel == NULL) {
+ channel = zmalloc<lttng_consumer_channel>();
+ if (channel == nullptr) {
PERROR("malloc struct lttng_consumer_channel");
goto end;
}
channel->monitor = monitor;
channel->live_timer_interval = live_timer_interval;
channel->is_live = is_in_live_session;
- pthread_mutex_init(&channel->lock, NULL);
- pthread_mutex_init(&channel->timer_lock, NULL);
+ pthread_mutex_init(&channel->lock, nullptr);
+ pthread_mutex_init(&channel->timer_lock, nullptr);
switch (output) {
case LTTNG_EVENT_SPLICE:
default:
abort();
free(channel);
- channel = NULL;
+ channel = nullptr;
goto end;
}
}
lttng_ht_node_init_u64(&channel->node, channel->key);
- lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
- channel->session_id);
+ lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
channel->wait_fd = -1;
CDS_INIT_LIST_HEAD(&channel->streams.head);
if (trace_chunk) {
- int ret = lttng_consumer_channel_set_trace_chunk(channel,
- trace_chunk);
+ int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
if (ret) {
goto error;
}
return channel;
error:
consumer_del_channel(channel);
- channel = NULL;
+ channel = nullptr;
goto end;
}
* Always return 0 indicating success.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&channel->lock);
*/
steal_channel_key(channel->key);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
- &channel->channels_by_session_id_ht_node);
- rcu_read_unlock();
+ &channel->channels_by_session_id_ht_node);
channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
* Returns the number of fds in the structures.
*/
static int update_poll_array(struct lttng_consumer_local_data *ctx,
- struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
- struct lttng_ht *ht, int *nb_inactive_fd)
+ struct pollfd **pollfd,
+ struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *ht,
+ int *nb_inactive_fd)
{
int i = 0;
struct lttng_ht_iter iter;
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
- rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- /*
- * Only active streams with an active end point can be added to the
- * poll set and local stream storage of the thread.
- *
- * There is a potential race here for endpoint_status to be updated
- * just after the check. However, this is OK since the stream(s) will
- * be deleted once the thread is notified that the end point state has
- * changed where this function will be called back again.
- *
- * We track the number of inactive FDs because they still need to be
- * closed by the polling thread after a wakeup on the data_pipe or
- * metadata_pipe.
- */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
- (*nb_inactive_fd)++;
- continue;
+
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
+ */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
+ continue;
+ }
+
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
}
- /*
- * This clobbers way too much the debug output. Uncomment that if you
- * need it for debugging purposes.
- */
- (*pollfd)[i].fd = stream->wait_fd;
- (*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] = stream;
- i++;
}
- rcu_read_unlock();
/*
* Insert the consumer_data_pipe at the end of the array and don't
/*
* Set the error socket.
*/
-void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
- int sock)
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
{
ctx->consumer_error_socket = sock;
}
/*
* Set the command socket path.
*/
-void lttng_consumer_set_command_sock_path(
- struct lttng_consumer_local_data *ctx, char *sock)
+void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
{
ctx->consumer_command_sock_path = sock;
}
int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
{
if (ctx->consumer_error_socket > 0) {
- return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
- sizeof(enum lttcomm_sessiond_command));
+ return lttcomm_send_unix_sock(
+ ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
}
return 0;
* Close all the tracefiles and stream fds and MUST be called when all
* instances are destroyed i.e. when all threads were joined and are ended.
*/
-void lttng_consumer_cleanup(void)
+void lttng_consumer_cleanup()
{
struct lttng_ht_iter iter;
struct lttng_consumer_channel *channel;
unsigned int trace_chunks_left;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
- channel, node.node) {
- consumer_del_channel(channel);
+ cds_lfht_for_each_entry (
+ the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ consumer_del_channel(channel);
+ }
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.channel_ht);
lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
* session daemon and not emptying the registry would cause an assertion
* to hit.
*/
- trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
- the_consumer_data.chunk_registry);
+ trace_chunks_left =
+ lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
if (trace_chunks_left) {
ERR("%u trace chunks are leaked by lttng-consumerd. "
- "This can be caused by an internal error of the session daemon.",
- trace_chunks_left);
+ "This can be caused by an internal error of the session daemon.",
+ trace_chunks_left);
}
/* Run all callbacks freeing each chunk. */
rcu_barrier();
DBG("Consumer flag that it should quit");
}
-
/*
* Flush pending writes to trace output disk file.
*/
-static
-void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
- off_t orig_offset)
+static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int ret;
int outfd = stream->out_fd;
/*
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE
- | SYNC_FILE_RANGE_WRITE
- | SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
- stream->max_sb_size, POSIX_FADV_DONTNEED);
- if (ret && ret != -ENOSYS) {
- errno = ret;
- PERROR("posix_fadvise on fd %i", outfd);
- }
+ lttng::io::hint_flush_range_dont_need_sync(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
}
/*
*
* Returns a pointer to the new context or NULL on error.
*/
-struct lttng_consumer_local_data *lttng_consumer_create(
- enum lttng_consumer_type type,
- ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx, bool locked_by_caller),
- int (*recv_channel)(struct lttng_consumer_channel *channel),
- int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(uint64_t stream_key, uint32_t state))
+struct lttng_consumer_local_data *
+lttng_consumer_create(enum lttng_consumer_type type,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller),
+ int (*recv_channel)(struct lttng_consumer_channel *channel),
+ int (*recv_stream)(struct lttng_consumer_stream *stream),
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
- the_consumer_data.type == type);
+ the_consumer_data.type == type);
the_consumer_data.type = type;
- ctx = (lttng_consumer_local_data *) zmalloc(sizeof(struct lttng_consumer_local_data));
- if (ctx == NULL) {
+ ctx = zmalloc<lttng_consumer_local_data>();
+ if (ctx == nullptr) {
PERROR("allocating context");
goto error;
}
ctx->consumer_error_socket = -1;
ctx->consumer_metadata_socket = -1;
- pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
+ pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
/* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
ctx->on_recv_channel = recv_channel;
error_poll_pipe:
free(ctx);
error:
- return NULL;
+ return nullptr;
}
/*
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- if (ht == NULL) {
+ if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_stream(stream, ht);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- if (ht == NULL) {
+ if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_metadata_stream(stream, ht);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
/*
* Write the metadata stream id on the specified file descriptor.
*/
-static int write_relayd_metadata_id(int fd,
- struct lttng_consumer_stream *stream,
- unsigned long padding)
+static int
+write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
{
ssize_t ret;
struct lttcomm_relayd_metadata_payload hdr;
goto end;
}
DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
- stream->relayd_stream_id, padding);
+ stream->relayd_stream_id,
+ padding);
end:
return (int) ret;
*
* Returns the number of bytes written
*/
-ssize_t lttng_consumer_on_read_subbuffer_mmap(
- struct lttng_consumer_stream *stream,
- const struct lttng_buffer_view *buffer,
- unsigned long padding)
+ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
+ const struct lttng_buffer_view *buffer,
+ unsigned long padding)
{
ssize_t ret = 0;
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
int outfd = stream->out_fd;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
unsigned int relayd_hang_up = 0;
const size_t subbuf_content_size = buffer->size - padding;
size_t write_len;
/* RCU lock for the relayd pointer */
- rcu_read_lock();
- LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL ||
- stream->trace_chunk);
+ lttng::urcu::read_lock_guard read_lock;
+ LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
ret = -EPIPE;
goto end;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->reset_metadata_flag) {
ret = relayd_reset_metadata(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->metadata_version);
+ stream->relayd_stream_id,
+ stream->metadata_version);
if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
* Check if we need to change the tracefile before writing the packet.
*/
if (stream->chan->tracefile_size > 0 &&
- (stream->tracefile_size_current + buffer->size) >
- stream->chan->tracefile_size) {
+ (stream->tracefile_size_current + buffer->size) >
+ stream->chan->tracefile_size) {
ret = consumer_stream_rotate_output_files(stream);
if (ret) {
goto end;
DBG("Consumer mmap write detected relayd hang up");
} else {
/* Unhandled error, print it and stop function right now. */
- PERROR("Error in write mmap (ret %zd != write_len %zu)", ret,
- write_len);
+ PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
}
goto write_error;
}
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, write_len,
- SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return ret;
}
*
* Returns the number of bytes spliced.
*/
-ssize_t lttng_consumer_on_read_subbuffer_splice(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding)
+ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream,
+ unsigned long len,
+ unsigned long padding)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
int *splice_pipe;
unsigned int relayd_hang_up = 0;
}
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
written = -ret;
goto end;
}
if (stream->reset_metadata_flag) {
ret = relayd_reset_metadata(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->metadata_version);
+ stream->relayd_stream_id,
+ stream->metadata_version);
if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
}
stream->reset_metadata_flag = 0;
}
- ret = write_relayd_metadata_id(splice_pipe[1], stream,
- padding);
+ ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
if (ret < 0) {
written = ret;
relayd_hang_up = 1;
* Check if we need to change the tracefile before writing the packet.
*/
if (stream->chan->tracefile_size > 0 &&
- (stream->tracefile_size_current + len) >
- stream->chan->tracefile_size) {
+ (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
ret = consumer_stream_rotate_output_files(stream);
if (ret < 0) {
written = ret;
while (len > 0) {
DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
- (unsigned long)offset, len, fd, splice_pipe[1]);
- ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
- SPLICE_F_MOVE | SPLICE_F_MORE);
+ (unsigned long) offset,
+ len,
+ fd,
+ splice_pipe[1]);
+ ret_splice = splice(
+ fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
if (ret_splice < 0) {
ret = errno;
}
/* Splice data out */
- ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
- ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
- outfd, ret_splice);
+ ret_splice = splice(splice_pipe[0],
+ nullptr,
+ outfd,
+ nullptr,
+ ret_splice,
+ SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
if (ret_splice < 0) {
ret = errno;
written = -ret;
*/
ret = errno;
written += ret_splice;
- PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
- len);
+ PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
goto splice_error;
} else {
/* All good, update current len and continue. */
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
- SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
/* Skip splice error so the consumer does not fail */
goto end;
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return written;
}
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos)
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos)
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
}
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
- int sock, struct pollfd *consumer_sockpoll)
+ int sock,
+ struct pollfd *consumer_sockpoll)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
}
}
-static
-void lttng_consumer_close_all_metadata(void)
+static void lttng_consumer_close_all_metadata()
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
/*
* Clean up a metadata stream and free its memory.
*/
-void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
{
- struct lttng_consumer_channel *channel = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
bool free_channel = false;
LTTNG_ASSERT(stream);
consumer_stream_delete(stream, ht);
/* Close down everything including the relayd if one. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
/* Destroy tracer buffers of the stream. */
consumer_stream_destroy_buffers(stream);
/* Atomically decrement channel refcount since other threads can use it. */
- if (!uatomic_sub_return(&channel->refcount, 1)
- && !uatomic_read(&channel->nb_init_stream_left)) {
+ if (!uatomic_sub_return(&channel->refcount, 1) &&
+ !uatomic_read(&channel->nb_init_stream_left)) {
/* Go for channel deletion! */
free_channel = true;
}
- stream->chan = NULL;
+ stream->chan = nullptr;
/*
* Nullify the stream reference so it is not used after deletion. The
* channel lock MUST be acquired before being able to check for a NULL
* pointer value.
*/
- channel->metadata_stream = NULL;
+ channel->metadata_stream = nullptr;
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
}
lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
+ stream->trace_chunk = nullptr;
consumer_stream_free(stream);
}
* after this point.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/*
* Lookup the stream just to make sure it does not exist in our internal
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht,
- &stream->node_channel_id);
+ lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_u64(the_consumer_data.stream_list_ht,
- &stream->node_session_id);
-
- rcu_read_unlock();
+ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
/*
* Delete data stream that are flagged for deletion (endpoint_status).
*/
-static void validate_endpoint_status_data_stream(void)
+static void validate_endpoint_status_data_stream()
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
DBG("Consumer delete flagged data stream");
- rcu_read_lock();
- cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
+ }
+ /* Delete it right now */
+ consumer_del_stream(stream, data_ht);
}
- /* Delete it right now */
- consumer_del_stream(stream, data_ht);
}
- rcu_read_unlock();
}
/*
* Delete metadata stream that are flagged for deletion (endpoint_status).
*/
-static void validate_endpoint_status_metadata_stream(
- struct lttng_poll_event *pollset)
+static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
LTTNG_ASSERT(pollset);
- rcu_read_lock();
- cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /*
- * Remove from pollset so the metadata thread can continue without
- * blocking on a deleted stream.
- */
- lttng_poll_del(pollset, stream->wait_fd);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
+ }
+ /*
+ * Remove from pollset so the metadata thread can continue without
+ * blocking on a deleted stream.
+ */
+ lttng_poll_del(pollset, stream->wait_fd);
- /* Delete it right now */
- consumer_del_metadata_stream(stream, metadata_ht);
+ /* Delete it right now */
+ consumer_del_metadata_stream(stream, metadata_ht);
+ }
}
- rcu_read_unlock();
}
/*
{
int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
- struct lttng_consumer_stream *stream = NULL;
+ struct lttng_consumer_stream *stream = nullptr;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
struct lttng_poll_event events;
goto end_poll;
}
- ret = lttng_poll_add(&events,
- lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
+ ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
if (ret < 0) {
goto end;
}
/* Main loop */
DBG("Metadata main loop started");
- while (1) {
-restart:
+ while (true) {
+ restart:
health_code_update();
health_poll_entry();
DBG("Metadata poll wait");
ret = lttng_poll_wait(&events, -1);
- DBG("Metadata poll return from wait with %d fd(s)",
- LTTNG_POLL_GETNB(&events));
+ DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
health_poll_exit();
DBG("Metadata event caught in thread");
if (ret < 0) {
goto restart;
}
if (LTTNG_POLL_GETNB(&events) == 0) {
- err = 0; /* All is OK */
+ err = 0; /* All is OK */
}
goto end;
}
ssize_t pipe_len;
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
- &stream, sizeof(stream));
- if (pipe_len < sizeof(stream)) {
+ &stream,
+ sizeof(stream)); /* NOLINT sizeof
+ used on a
+ pointer. */
+ if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+ pointer. */
if (pipe_len < 0) {
PERROR("read metadata stream");
}
/*
- * Remove the pipe from the poll set and continue the loop
- * since their might be data to consume.
+ * Remove the pipe from the poll set and continue
+ * the loop since their might be data to consume.
*/
- lttng_poll_del(&events,
- lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+ lttng_poll_del(
+ &events,
+ lttng_pipe_get_readfd(
+ ctx->consumer_metadata_pipe));
lttng_pipe_read_close(ctx->consumer_metadata_pipe);
continue;
}
/* A NULL stream means that the state has changed. */
- if (stream == NULL) {
+ if (stream == nullptr) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
goto restart;
}
DBG("Adding metadata stream %d to poll set",
- stream->wait_fd);
+ stream->wait_fd);
/* Add metadata stream to the global poll events list */
- lttng_poll_add(&events, stream->wait_fd,
- LPOLLIN | LPOLLPRI | LPOLLHUP);
+ lttng_poll_add(
+ &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
} else if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata thread pipe hung up");
/*
* Remove the pipe from the poll set and continue the loop
* since their might be data to consume.
*/
- lttng_poll_del(&events,
- lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+ lttng_poll_del(
+ &events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
lttng_pipe_read_close(ctx->consumer_metadata_pipe);
continue;
} else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ ERR("Unexpected poll events %u for sock %d",
+ revents,
+ pollfd);
goto end;
}
continue;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
node = lttng_ht_iter_get_node_u64(&iter);
LTTNG_ASSERT(node);
- stream = caa_container_of(node, struct lttng_consumer_stream,
- node);
+ stream = caa_container_of(node, struct lttng_consumer_stream, node);
if (revents & (LPOLLIN | LPOLLPRI)) {
/* Get the data out of the metadata file descriptor */
} else if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata fd %d is hup|err.", pollfd);
if (!stream->hangup_flush_done &&
- (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
- the_consumer_data.type ==
- LTTNG_CONSUMER64_UST)) {
+ (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
+ the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("Attempting to flush and consume the UST buffers");
lttng_ustconsumer_on_stream_hangup(stream);
len = ctx->on_buffer_ready(stream, ctx, false);
/*
- * We don't check the return value here since if we get
- * a negative len, it means an error occurred thus we
- * simply remove it from the poll set and free the
- * stream.
+ * We don't check the return value here since if we
+ * get a negative len, it means an error occurred
+ * thus we simply remove it from the poll set and
+ * free the stream.
*/
} while (len > 0);
}
consumer_del_metadata_stream(stream, metadata_ht);
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the stream looked up */
- rcu_read_unlock();
}
}
}
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
/*
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i, err = -1;
- struct pollfd *pollfd = NULL;
+ int num_rdy, high_prio, ret, i, err = -1;
+ struct pollfd *pollfd = nullptr;
/* local view of the streams */
- struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
+ struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
/* 2 for the consumer_data_pipe and wake up pipe */
health_code_update();
- local_stream = (lttng_consumer_stream **) zmalloc(sizeof(struct lttng_consumer_stream *));
- if (local_stream == NULL) {
+ local_stream = zmalloc<lttng_consumer_stream *>();
+ if (local_stream == nullptr) {
PERROR("local_stream malloc");
goto end;
}
- while (1) {
+ while (true) {
health_code_update();
high_prio = 0;
- num_hup = 0;
/*
* the fds set has been updated, we need to update our
pthread_mutex_lock(&the_consumer_data.lock);
if (the_consumer_data.need_update) {
free(pollfd);
- pollfd = NULL;
+ pollfd = nullptr;
free(local_stream);
- local_stream = NULL;
+ local_stream = nullptr;
/* Allocate for all fds */
- pollfd = (struct pollfd *) zmalloc((the_consumer_data.stream_count +
- nb_pipes_fd) *
- sizeof(struct pollfd));
- if (pollfd == NULL) {
+ pollfd =
+ calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
+ if (pollfd == nullptr) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
- local_stream = (lttng_consumer_stream **) zmalloc((the_consumer_data.stream_count +
- nb_pipes_fd) *
- sizeof(struct lttng_consumer_stream *));
- if (local_stream == NULL) {
+ local_stream = calloc<lttng_consumer_stream *>(
+ the_consumer_data.stream_count + nb_pipes_fd);
+ if (local_stream == nullptr) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
- ret = update_poll_array(ctx, &pollfd, local_stream,
- data_ht, &nb_inactive_fd);
+ ret = update_poll_array(
+ ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
pthread_mutex_unlock(&the_consumer_data.lock);
/* No FDs and consumer_quit, consumer_cleanup the thread */
- if (nb_fd == 0 && nb_inactive_fd == 0 &&
- CMM_LOAD_SHARED(consumer_quit) == 1) {
- err = 0; /* All is OK */
+ if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
+ err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
DBG("consumer_data_pipe wake up");
pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
- &new_stream, sizeof(new_stream));
- if (pipe_readlen < sizeof(new_stream)) {
+ &new_stream,
+ sizeof(new_stream)); /* NOLINT sizeof used on
+ a pointer. */
+ if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+ */
PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
continue;
* the sessiond poll thread changed the consumer_quit state and is
* waking us up to test it.
*/
- if (new_stream == NULL) {
+ if (new_stream == nullptr) {
validate_endpoint_status_data_stream();
continue;
}
char dummy;
ssize_t pipe_readlen;
- pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
- sizeof(dummy));
+ pipe_readlen =
+ lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
if (pipe_readlen < 0) {
PERROR("Consumer data wakeup pipe");
}
for (i = 0; i < nb_fd; i++) {
health_code_update();
- if (local_stream[i] == NULL) {
+ if (local_stream[i] == nullptr) {
continue;
}
if (pollfd[i].revents & POLLPRI) {
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
+ local_stream[i] = nullptr;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown =
+ 1;
}
}
}
for (i = 0; i < nb_fd; i++) {
health_code_update();
- if (local_stream[i] == NULL) {
+ if (local_stream[i] == nullptr) {
continue;
}
- if ((pollfd[i].revents & POLLIN) ||
- local_stream[i]->hangup_flush_done ||
- local_stream[i]->has_data) {
+ if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
+ local_stream[i]->has_data) {
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx, false);
/* it's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
+ local_stream[i] = nullptr;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown =
+ 1;
}
}
}
for (i = 0; i < nb_fd; i++) {
health_code_update();
- if (local_stream[i] == NULL) {
+ if (local_stream[i] == nullptr) {
continue;
}
- if (!local_stream[i]->hangup_flush_done
- && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
- && (the_consumer_data.type == LTTNG_CONSUMER32_UST
- || the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
+ if (!local_stream[i]->hangup_flush_done &&
+ (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
+ (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
+ the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
- pollfd[i].fd);
+ pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
/*
+ * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+ * performed. This type of flush ensures that a new packet is produced no
+ * matter the consumed/produced positions are.
+ *
+ * This, in turn, causes the next pass to see that data available for the
+ * stream. When we come back here, we can be assured that all available
+ * data has been consumed and we can finally destroy the stream.
+ *
* If the poll flag is HUP/ERR/NVAL and we have
* read no data in this pass, we can remove the
* stream from its hash table.
*/
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
- num_hup++;
+ local_stream[i] = nullptr;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
- num_hup++;
+ local_stream[i] = nullptr;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
- num_hup++;
+ local_stream[i] = nullptr;
}
}
- if (local_stream[i] != NULL) {
- local_stream[i]->data_read = 0;
+ if (local_stream[i] != nullptr) {
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
}
}
}
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
/*
* allow the poll() on the stream read-side to detect when the
* write-side (application) finally closes them.
*/
-static
-void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
+static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
{
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
ht = the_consumer_data.stream_per_chan_id_ht;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key,
- &iter.iter, stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
/*
* Protect against teardown with mutex.
*/
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
}
static void destroy_channel_ht(struct lttng_ht *ht)
struct lttng_consumer_channel *channel;
int ret;
- if (ht == NULL) {
+ if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
- ret = lttng_ht_del(ht, &iter);
- LTTNG_ASSERT(ret != 0);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
+ ret = lttng_ht_del(ht, &iter);
+ LTTNG_ASSERT(ret != 0);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
{
int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
- struct lttng_consumer_channel *chan = NULL;
+ struct lttng_consumer_channel *chan = nullptr;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
struct lttng_poll_event events;
/* Main loop */
DBG("Channel main loop started");
- while (1) {
-restart:
+ while (true) {
+ restart:
health_code_update();
DBG("Channel poll wait");
health_poll_entry();
ret = lttng_poll_wait(&events, -1);
- DBG("Channel poll return from wait with %d fd(s)",
- LTTNG_POLL_GETNB(&events));
+ DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
health_poll_exit();
DBG("Channel event caught in thread");
if (ret < 0) {
goto restart;
}
if (LTTNG_POLL_GETNB(&events) == 0) {
- err = 0; /* All is OK */
+ err = 0; /* All is OK */
}
goto end;
}
if (ret < 0) {
ERR("Error reading channel pipe");
}
- lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ lttng_poll_del(&events,
+ ctx->consumer_channel_pipe[0]);
continue;
}
switch (action) {
case CONSUMER_CHANNEL_ADD:
- DBG("Adding channel %d to poll set",
- chan->wait_fd);
+ {
+ DBG("Adding channel %d to poll set", chan->wait_fd);
lttng_ht_node_init_u64(&chan->wait_fd_node,
- chan->wait_fd);
- rcu_read_lock();
+ chan->wait_fd);
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(channel_ht,
- &chan->wait_fd_node);
- rcu_read_unlock();
+ &chan->wait_fd_node);
/* Add channel to the global poll events list */
- lttng_poll_add(&events, chan->wait_fd,
- LPOLLERR | LPOLLHUP);
+ // FIXME: Empty flag on a pipe pollset, this might
+ // hang on FreeBSD.
+ lttng_poll_add(&events, chan->wait_fd, 0);
break;
+ }
case CONSUMER_CHANNEL_DEL:
{
/*
- * This command should never be called if the channel
- * has streams monitored by either the data or metadata
- * thread. The consumer only notify this thread with a
- * channel del. command if it receives a destroy
- * channel command from the session daemon that send it
- * if a command prior to the GET_CHANNEL failed.
+ * This command should never be called if the
+ * channel has streams monitored by either the data
+ * or metadata thread. The consumer only notify this
+ * thread with a channel del. command if it receives
+ * a destroy channel command from the session daemon
+ * that send it if a command prior to the
+ * GET_CHANNEL failed.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
chan = consumer_find_channel(key);
if (!chan) {
- rcu_read_unlock();
- ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+ ERR("UST consumer get channel key %" PRIu64
+ " not found for del channel",
+ key);
break;
}
lttng_poll_del(&events, chan->wait_fd);
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
health_code_update();
- /* Destroy streams that might have been left in the stream list. */
+ /* Destroy streams that might have been left
+ * in the stream list. */
clean_channel_stream_list(chan);
break;
default:
}
/*
- * Release our own refcount. Force channel deletion even if
- * streams were not initialized.
+ * Release our own refcount. Force channel deletion
+ * even if streams were not initialized.
*/
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
- rcu_read_unlock();
goto restart;
}
case CONSUMER_CHANNEL_QUIT:
/*
- * Remove the pipe from the poll set and continue the loop
- * since their might be data to consume.
+ * Remove the pipe from the poll set and continue
+ * the loop since their might be data to consume.
*/
- lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ lttng_poll_del(&events,
+ ctx->consumer_channel_pipe[0]);
continue;
default:
ERR("Unknown action");
lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
continue;
} else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ ERR("Unexpected poll events %u for sock %d",
+ revents,
+ pollfd);
goto end;
}
continue;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
node = lttng_ht_iter_get_node_u64(&iter);
LTTNG_ASSERT(node);
- chan = caa_container_of(node, struct lttng_consumer_channel,
- wait_fd_node);
+ chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
consumer_close_channel_streams(chan);
/* Release our own refcount */
- if (!uatomic_sub_return(&chan->refcount, 1)
- && !uatomic_read(&chan->nb_init_stream_left)) {
+ if (!uatomic_sub_return(&chan->refcount, 1) &&
+ !uatomic_read(&chan->nb_init_stream_left)) {
consumer_del_channel(chan);
}
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the channel looked up */
- rcu_read_unlock();
}
}
}
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
- struct pollfd *sockpoll, int client_socket)
+ struct pollfd *sockpoll,
+ int client_socket)
{
int ret;
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
- while (1) {
+ while (true) {
health_code_update();
health_poll_entry();
}
if (CMM_LOAD_SHARED(consumer_quit)) {
DBG("consumer_thread_receive_fds received quit from signal");
- err = 0; /* All is OK */
+ err = 0; /* All is OK */
goto end;
}
DBG("Received command on sock");
*/
notify_thread_lttng_pipe(ctx->consumer_data_pipe);
- notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
+ notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
notify_health_quit_pipe(health_quit_pipe);
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
static int post_consume(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuffer,
- struct lttng_consumer_local_data *ctx)
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
{
size_t i;
int ret = 0;
- const size_t count = lttng_dynamic_array_get_count(
- &stream->read_subbuffer_ops.post_consume_cbs);
+ const size_t count =
+ lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
for (i = 0; i < count; i++) {
const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
- &stream->read_subbuffer_ops.post_consume_cbs,
- i);
+ &stream->read_subbuffer_ops.post_consume_cbs, i);
ret = op(stream, subbuffer, ctx);
if (ret) {
}
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller)
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller)
{
ssize_t ret, written_bytes = 0;
int rotation_ret;
*/
if (stream->rotate_ready) {
DBG("Rotate stream before consuming data");
- ret = lttng_consumer_rotate_stream(ctx, stream);
+ ret = lttng_consumer_rotate_stream(stream);
if (ret < 0) {
ERR("Stream rotation error before consuming data");
goto end;
}
}
- get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(
- stream, &subbuffer);
+ get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
switch (get_next_status) {
case GET_NEXT_SUBBUFFER_STATUS_OK:
break;
abort();
}
- ret = stream->read_subbuffer_ops.pre_consume_subbuffer(
- stream, &subbuffer);
+ ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
if (ret) {
goto error_put_subbuf;
}
- written_bytes = stream->read_subbuffer_ops.consume_subbuffer(
- ctx, stream, &subbuffer);
+ written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
if (written_bytes <= 0) {
ERR("Error consuming subbuffer: (%zd)", written_bytes);
ret = (int) written_bytes;
*/
rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
if (rotation_ret == 1) {
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ rotation_ret = lttng_consumer_rotate_stream(stream);
if (rotation_ret < 0) {
ret = rotation_ret;
ERR("Stream rotation error after consuming data");
/*
* Allocate and set consumer data hash tables.
*/
-int lttng_consumer_init(void)
+int lttng_consumer_init()
{
the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!the_consumer_data.channel_ht) {
goto error;
}
- the_consumer_data.channels_by_session_id_ht =
- lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!the_consumer_data.channels_by_session_id_ht) {
goto error;
}
goto error;
}
- the_consumer_data.stream_per_chan_id_ht =
- lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!the_consumer_data.stream_per_chan_id_ht) {
goto error;
}
* The caller MUST acquire a RCU read side lock before calling it.
*/
void consumer_add_relayd_socket(uint64_t net_seq_idx,
- int sock_type,
- struct lttng_consumer_local_data *ctx,
- int sock,
- struct pollfd *consumer_sockpoll,
- uint64_t sessiond_id,
- uint64_t relayd_session_id,
- uint32_t relayd_version_major,
- uint32_t relayd_version_minor,
- enum lttcomm_sock_proto relayd_socket_protocol)
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
+ struct pollfd *consumer_sockpoll,
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(sock >= 0);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- fd = -1; /* Just in case it gets set with an invalid value. */
+ fd = -1; /* Just in case it gets set with an invalid value. */
/*
* Failing to receive FDs might indicate a major problem such as
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
ret = lttcomm_populate_sock_from_open_socket(
- &relayd->control_sock.sock, fd,
- relayd_socket_protocol);
+ &relayd->control_sock.sock, fd, relayd_socket_protocol);
/* Assign version values. */
relayd->control_sock.major = relayd_version_major;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
ret = lttcomm_populate_sock_from_open_socket(
- &relayd->data_sock.sock, fd,
- relayd_socket_protocol);
+ &relayd->data_sock.sock, fd, relayd_socket_protocol);
/* Assign version values. */
relayd->data_sock.major = relayd_version_major;
relayd->data_sock.minor = relayd_version_minor;
}
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
- sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
- relayd->net_seq_idx, fd);
+ sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+ relayd->net_seq_idx,
+ fd);
/*
* We gave the ownership of the fd to the relayd structure. Set the
* fd to -1 so we don't call close() on it in the error path below.
static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
{
struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
ASSERT_RCU_READ_LOCKED();
/* Iterate over all relayd since they are indexed by net_seq_idx. */
- cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
- relayd, node.node) {
+ cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
/*
* Check by sessiond id which is unique here where the relayd session
* id might not be when having multiple relayd.
}
}
- return NULL;
+ return nullptr;
found:
return relayd;
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&the_consumer_data.lock);
switch (the_consumer_data.type) {
ht = the_consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct, &id,
- &iter.iter, stream, node_session_id.node) {
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct,
+ &id,
+ &iter.iter,
+ stream,
+ node_session_id.node)
+ {
pthread_mutex_lock(&stream->lock);
/*
/* Send init command for data pending. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
+ ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
if (ret < 0) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Communication error thus the relayd so no data pending. */
}
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct, &id,
- &iter.iter, stream, node_session_id.node) {
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct,
+ &id,
+ &iter.iter,
+ stream,
+ node_session_id.node)
+ {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
- stream->relayd_stream_id);
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->next_net_seq_num - 1);
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
}
if (ret == 1) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
} else if (ret < 0) {
- ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_not_pending;
}
/* Send end command for data pending. */
- ret = relayd_end_data_pending(&relayd->control_sock,
- relayd->relayd_session_id, &is_data_inflight);
+ ret = relayd_end_data_pending(
+ &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto data_not_pending;
}
data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 0;
data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 1;
}
*
* Return the sendmsg() return value.
*/
-int consumer_send_status_channel(int sock,
- struct lttng_consumer_channel *channel)
+int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
{
struct lttcomm_consumer_status_channel msg;
}
unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
- unsigned long produced_pos, uint64_t nb_packets_per_stream,
- uint64_t max_sb_size)
+ unsigned long produced_pos,
+ uint64_t nb_packets_per_stream,
+ uint64_t max_sb_size)
{
unsigned long start_pos;
if (!nb_packets_per_stream) {
- return consumed_pos; /* Grab everything */
+ return consumed_pos; /* Grab everything */
}
start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
start_pos -= max_sb_size * nb_packets_per_stream;
if ((long) (start_pos - consumed_pos) < 0) {
- return consumed_pos; /* Grab everything */
+ return consumed_pos; /* Grab everything */
}
return start_pos;
}
/* Stream lock must be held by the caller. */
static int sample_stream_positions(struct lttng_consumer_stream *stream,
- unsigned long *produced, unsigned long *consumed)
+ unsigned long *produced,
+ unsigned long *consumed)
{
int ret;
* Returns 0 on success, < 0 on error
*/
int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
- uint64_t key, uint64_t relayd_id, uint32_t metadata,
- struct lttng_consumer_local_data *ctx)
+ uint64_t key,
+ uint64_t relayd_id)
{
int ret;
struct lttng_consumer_stream *stream;
uint64_t next_chunk_id, stream_count = 0;
enum lttng_trace_chunk_status chunk_status;
const bool is_local_trace = relayd_id == -1ULL;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
bool rotating_to_new_chunk = true;
/* Array of `struct lttng_consumer_stream *` */
struct lttng_dynamic_pointer_array streams_packet_to_open;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
lttng_dynamic_array_init(&stream_rotation_positions,
- sizeof(struct relayd_stream_rotation_position), NULL);
- lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
+ sizeof(struct relayd_stream_rotation_position),
+ nullptr);
+ lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
- chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
- &next_chunk_id);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end_unlock_channel;
}
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
unsigned long produced_pos = 0, consumed_pos = 0;
health_code_update();
* for this stream during this trace
* chunk's lifetime.
*/
- ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+ ret = sample_stream_positions(
+ stream, &produced_pos, &consumed_pos);
if (ret) {
goto end_unlock_stream;
}
uint64_t trace_chunk_id;
chunk_status = lttng_trace_chunk_get_name(
- stream->trace_chunk,
- &trace_chunk_name,
- NULL);
+ stream->trace_chunk,
+ &trace_chunk_name,
+ nullptr);
if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
trace_chunk_name = "none";
}
* never anonymous.
*/
chunk_status = lttng_trace_chunk_get_id(
- stream->trace_chunk,
- &trace_chunk_id);
+ stream->trace_chunk, &trace_chunk_id);
LTTNG_ASSERT(chunk_status ==
- LTTNG_TRACE_CHUNK_STATUS_OK);
+ LTTNG_TRACE_CHUNK_STATUS_OK);
DBG("Unable to open packet for stream during trace chunk's lifetime. "
- "Flushing an empty packet to prevent an empty file from being created: "
- "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
- stream->key, trace_chunk_name, trace_chunk_id);
+ "Flushing an empty packet to prevent an empty file from being created: "
+ "stream id = %" PRIu64
+ ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+ stream->key,
+ trace_chunk_name,
+ trace_chunk_id);
}
}
}
ret = consumer_stream_flush_buffer(stream, flush_active);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
- stream->key);
+ stream->key);
goto end_unlock_stream;
}
}
goto end_unlock_stream;
}
if (!ret) {
- ret = lttng_consumer_get_produced_snapshot(stream,
- &produced_pos);
+ ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Failed to sample produced position during channel rotation");
goto end_unlock_stream;
}
- ret = lttng_consumer_get_consumed_snapshot(stream,
- &consumed_pos);
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Failed to sample consumed position during channel rotation");
goto end_unlock_stream;
produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
if (consumed_pos == produced_pos) {
DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
- stream->key, produced_pos, consumed_pos);
+ stream->key,
+ produced_pos,
+ consumed_pos);
stream->rotate_ready = true;
} else {
DBG("Different consumed and produced positions "
- "for stream %" PRIu64 " produced = %lu consumed = %lu",
- stream->key, produced_pos, consumed_pos);
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key,
+ produced_pos,
+ consumed_pos);
}
/*
* The rotation position is based on the packet_seq_num of the
* not implement packet sequence number.
*/
ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
- stream->key);
+ stream->key);
ret = -1;
goto end_unlock_stream;
}
stream->rotate_position = stream->last_sequence_number + 1 +
- ((produced_pos - consumed_pos) / stream->max_sb_size);
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
- stream->key, stream->rotate_position);
+ stream->key,
+ stream->rotate_position);
if (!is_local_trace) {
/*
.rotate_at_seq_num = stream->rotate_position,
};
- ret = lttng_dynamic_array_add_element(
- &stream_rotation_positions,
- &position);
+ ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
+ &position);
if (ret) {
ERR("Failed to allocate stream rotation position");
goto end_unlock_stream;
* is performed in a stream that has no active trace
* chunk.
*/
- ret = lttng_dynamic_pointer_array_add_pointer(
- &streams_packet_to_open, stream);
+ ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
+ stream);
if (ret) {
PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
ret = -1;
pthread_mutex_unlock(&stream->lock);
}
- stream = NULL;
+ stream = nullptr;
if (!is_local_trace) {
relayd = consumer_find_relayd(relayd_id);
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
- rotating_to_new_chunk ? &next_chunk_id : NULL,
- (const struct relayd_stream_rotation_position *)
- stream_rotation_positions.buffer
- .data);
+ ret = relayd_rotate_streams(&relayd->control_sock,
+ stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : nullptr,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
- relayd->net_seq_idx);
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto end_unlock_channel;
}
}
for (stream_idx = 0;
- stream_idx < lttng_dynamic_pointer_array_get_count(
- &streams_packet_to_open);
- stream_idx++) {
+ stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
+ stream_idx++) {
enum consumer_stream_open_packet_status status;
stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
- &streams_packet_to_open, stream_idx);
+ &streams_packet_to_open, stream_idx);
pthread_mutex_lock(&stream->lock);
status = consumer_stream_open_packet(stream);
case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet after a rotation: stream id = %" PRIu64
", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
break;
case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
/*
*/
DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
break;
case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/* Logged by callee. */
end_unlock_channel:
pthread_mutex_unlock(&channel->lock);
end:
- rcu_read_unlock();
lttng_dynamic_array_reset(&stream_rotation_positions);
lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
return ret;
}
-static
-int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
{
int ret = 0;
unsigned long consumed_pos_before, consumed_pos_after;
return ret;
}
-static
-int consumer_clear_stream(struct lttng_consumer_stream *stream)
+static int consumer_clear_stream(struct lttng_consumer_stream *stream)
{
int ret;
- ret = consumer_stream_flush_buffer(stream, 1);
+ ret = consumer_stream_flush_buffer(stream, true);
if (ret < 0) {
- ERR("Failed to flush stream %" PRIu64 " during channel clear",
- stream->key);
+ ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
ret = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
ret = consumer_clear_buffer(stream);
if (ret < 0) {
- ERR("Failed to clear stream %" PRIu64 " during channel clear",
- stream->key);
+ ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
ret = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
return ret;
}
-static
-int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
{
int ret;
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
pthread_mutex_lock(&stream->lock);
ret = consumer_clear_stream(stream);
pthread_mutex_unlock(&stream->lock);
}
pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
return 0;
error_unlock:
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
return ret;
}
*/
int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
{
- DBG("Check is rotate ready for stream %" PRIu64
- " ready %u rotate_position %" PRIu64
- " last_sequence_number %" PRIu64,
- stream->key, stream->rotate_ready,
- stream->rotate_position, stream->last_sequence_number);
+ DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
+ " last_sequence_number %" PRIu64,
+ stream->key,
+ stream->rotate_ready,
+ stream->rotate_position,
+ stream->last_sequence_number);
if (stream->rotate_ready) {
return 1;
}
*/
if (stream->sequence_number_unavailable) {
ERR("Internal error: rotation used on stream %" PRIu64
- " with unavailable sequence number",
- stream->key);
+ " with unavailable sequence number",
+ stream->key);
return -1;
}
- if (stream->rotate_position == -1ULL ||
- stream->last_sequence_number == -1ULL) {
+ if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
return 0;
}
* packet of the current chunk, hence the "rotate_position - 1".
*/
- DBG("Check is rotate ready for stream %" PRIu64
- " last_sequence_number %" PRIu64
- " rotate_position %" PRIu64,
- stream->key, stream->last_sequence_number,
- stream->rotate_position);
+ DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
+ " rotate_position %" PRIu64,
+ stream->key,
+ stream->last_sequence_number,
+ stream->rotate_position);
if (stream->last_sequence_number >= stream->rotate_position - 1) {
return 1;
}
*/
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
{
- DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64,
- stream->key);
+ DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
stream->rotate_position = -1ULL;
stream->rotate_ready = false;
}
/*
* Perform the rotation a local stream file.
*/
-static
-int rotate_local_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+static int rotate_local_stream(struct lttng_consumer_stream *stream)
{
int ret = 0;
DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
- stream->key,
- stream->chan->key);
+ stream->key,
+ stream->chan->key);
stream->tracefile_size_current = 0;
stream->tracefile_count_current = 0;
ret = close(stream->out_fd);
if (ret) {
PERROR("Failed to close stream out_fd of channel \"%s\"",
- stream->chan->name);
+ stream->chan->name);
}
stream->out_fd = -1;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
+ stream->index_file = nullptr;
}
if (!stream->trace_chunk) {
*
* Return 0 on success, a negative number of error.
*/
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
{
int ret;
* parent channel, becomes part of no chunk and can't output
* anything until a new trace chunk is created.
*/
- stream->trace_chunk = NULL;
- } else if (stream->chan->trace_chunk &&
- !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ stream->trace_chunk = nullptr;
+ } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
ret = -1;
goto error;
}
if (stream->net_seq_idx == (uint64_t) -1ULL) {
- ret = rotate_local_stream(ctx, stream);
+ ret = rotate_local_stream(stream);
if (ret < 0) {
ERR("Failed to rotate stream, ret = %i", ret);
goto error;
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
- uint64_t key, struct lttng_consumer_local_data *ctx)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
{
int ret;
struct lttng_consumer_stream *stream;
ASSERT_RCU_READ_LOCKED();
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
health_code_update();
pthread_mutex_lock(&stream->chan->lock);
}
DBG("Consumer rotate ready stream %" PRIu64, stream->key);
- ret = lttng_consumer_rotate_stream(ctx, stream);
+ ret = lttng_consumer_rotate_stream(stream);
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
if (ret) {
ret = 0;
end:
- rcu_read_unlock();
return ret;
}
-enum lttcomm_return_code lttng_consumer_init_command(
- struct lttng_consumer_local_data *ctx,
- const lttng_uuid sessiond_uuid)
+enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
+ const lttng_uuid& sessiond_uuid)
{
enum lttcomm_return_code ret;
char uuid_str[LTTNG_UUID_STR_LEN];
}
ctx->sessiond_uuid.is_set = true;
- memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ctx->sessiond_uuid.value = sessiond_uuid;
ret = LTTCOMM_CONSUMERD_SUCCESS;
lttng_uuid_to_str(sessiond_uuid, uuid_str);
DBG("Received session daemon UUID: %s", uuid_str);
return ret;
}
-enum lttcomm_return_code lttng_consumer_create_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id,
- time_t chunk_creation_timestamp,
- const char *chunk_override_name,
- const struct lttng_credentials *credentials,
- struct lttng_directory_handle *chunk_directory_handle)
+enum lttcomm_return_code
+lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
{
int ret;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
+ struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
enum lttng_trace_chunk_status chunk_status;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
char creation_timestamp_buffer[ISO8601_STR_LEN];
if (relayd_id) {
/* Only used for logging purposes. */
- ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
- "%" PRIu64, *relayd_id);
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
relayd_id_str = relayd_id_buffer;
} else {
/* Local protocol error. */
LTTNG_ASSERT(chunk_creation_timestamp);
ret = time_to_iso8601_str(chunk_creation_timestamp,
- creation_timestamp_buffer,
- sizeof(creation_timestamp_buffer));
- creation_timestamp_str = !ret ? creation_timestamp_buffer :
- "(formatting error)";
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
DBG("Consumer create trace chunk command: relay_id = %s"
- ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
- ", chunk_override_name = %s"
- ", chunk_creation_timestamp = %s",
- relayd_id_str, session_id, chunk_id,
- chunk_override_name ? : "(none)",
- creation_timestamp_str);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str,
+ session_id,
+ chunk_id,
+ chunk_override_name ?: "(none)",
+ creation_timestamp_str);
/*
* The trace chunk registry, as used by the consumer daemon, implicitly
* the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
* and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
*/
- created_chunk = lttng_trace_chunk_create(chunk_id,
- chunk_creation_timestamp, NULL);
+ created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
if (!created_chunk) {
ERR("Failed to create trace chunk");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
}
if (chunk_override_name) {
- chunk_status = lttng_trace_chunk_override_name(created_chunk,
- chunk_override_name);
+ chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
goto error;
}
if (chunk_directory_handle) {
- chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
- credentials);
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to set trace chunk credentials");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
* The consumer daemon has no ownership of the chunk output
* directory.
*/
- chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
- chunk_directory_handle);
- chunk_directory_handle = NULL;
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
+ chunk_directory_handle = nullptr;
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to set trace chunk's directory handle");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
}
published_chunk = lttng_trace_chunk_registry_publish_chunk(
- the_consumer_data.chunk_registry, session_id,
- created_chunk);
+ the_consumer_data.chunk_registry, session_id, created_chunk);
lttng_trace_chunk_put(created_chunk);
- created_chunk = NULL;
+ created_chunk = nullptr;
if (!published_chunk) {
ERR("Failed to publish trace chunk");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
goto error;
}
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry_duplicate(
the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(
- &session_id, lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
+ lttng_ht_seed),
the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id, &iter.iter, channel,
- channels_by_session_id_ht_node.node) {
- ret = lttng_consumer_channel_set_trace_chunk(channel,
- published_chunk);
- if (ret) {
- /*
- * Roll-back the creation of this chunk.
- *
- * This is important since the session daemon will
- * assume that the creation of this chunk failed and
- * will never ask for it to be closed, resulting
- * in a leak and an inconsistent state for some
- * channels.
- */
- enum lttcomm_return_code close_ret;
- char path[LTTNG_PATH_MAX];
+ &session_id,
+ &iter.iter,
+ channel,
+ channels_by_session_id_ht_node.node)
+ {
+ ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret =
+ lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ nullptr,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
- DBG("Failed to set new trace chunk on existing channels, rolling back");
- close_ret = lttng_consumer_close_trace_chunk(relayd_id,
- session_id, chunk_id,
- chunk_creation_timestamp, NULL,
- path);
- if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
- session_id, chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
}
-
- ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
- break;
}
}
relayd = consumer_find_relayd(*relayd_id);
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_create_trace_chunk(
- &relayd->control_sock, published_chunk);
+ ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
char path[LTTNG_PATH_MAX];
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
- session_id,
- chunk_id,
- chunk_creation_timestamp,
- NULL, path);
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ nullptr,
+ path);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
- session_id,
- chunk_id);
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
}
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
}
}
error_unlock:
- rcu_read_unlock();
error:
/* Release the reference returned by the "publish" operation. */
lttng_trace_chunk_put(published_chunk);
return ret_code;
}
-enum lttcomm_return_code lttng_consumer_close_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp,
- const enum lttng_trace_chunk_command_type *close_command,
- char *path)
+enum lttcomm_return_code
+lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
{
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_trace_chunk *chunk;
int ret;
/* Only used for logging purposes. */
- ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
- "%" PRIu64, *relayd_id);
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
relayd_id_str = relayd_id_buffer;
} else {
}
}
if (close_command) {
- close_command_name = lttng_trace_chunk_command_type_get_name(
- *close_command);
+ close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
}
DBG("Consumer close trace chunk command: relayd_id = %s"
- ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
- ", close command = %s",
- relayd_id_str, session_id, chunk_id,
- close_command_name);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
+ relayd_id_str,
+ session_id,
+ chunk_id,
+ close_command_name);
chunk = lttng_trace_chunk_registry_find_chunk(
- the_consumer_data.chunk_registry, session_id, chunk_id);
+ the_consumer_data.chunk_registry, session_id, chunk_id);
if (!chunk) {
- ERR("Failed to find chunk: session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- session_id, chunk_id);
+ ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
goto end;
}
- chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
- chunk_close_timestamp);
+ chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
goto end;
}
if (close_command) {
- chunk_status = lttng_trace_chunk_set_close_command(
- chunk, *close_command);
+ chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
goto end;
* it; it is only kept around to compare it (by address) to the
* current chunk found in the session's channels.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
- channel, node.node) {
- int ret;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (
+ the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ int ret;
- /*
- * Only change the channel's chunk to NULL if it still
- * references the chunk being closed. The channel may
- * reference a newer channel in the case of a session
- * rotation. When a session rotation occurs, the "next"
- * chunk is created before the "current" chunk is closed.
- */
- if (channel->trace_chunk != chunk) {
- continue;
- }
- ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
- if (ret) {
/*
- * Attempt to close the chunk on as many channels as
- * possible.
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
*/
- ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
}
}
-
if (relayd_id) {
int ret;
struct consumer_relayd_sock_pair *relayd;
relayd = consumer_find_relayd(*relayd_id);
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_close_trace_chunk(
- &relayd->control_sock, chunk,
- path);
+ ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
- ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
- *relayd_id);
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
}
if (!relayd || ret) {
}
}
error_unlock:
- rcu_read_unlock();
end:
/*
* Release the reference returned by the "find" operation and
return ret_code;
}
-enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id)
+enum lttcomm_return_code
+lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
{
int ret;
enum lttcomm_return_code ret_code;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
const bool is_local_trace = !relayd_id;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
bool chunk_exists_local, chunk_exists_remote;
+ lttng::urcu::read_lock_guard read_lock;
if (relayd_id) {
/* Only used for logging purposes. */
- ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
- "%" PRIu64, *relayd_id);
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
relayd_id_str = relayd_id_buffer;
} else {
}
DBG("Consumer trace chunk exists command: relayd_id = %s"
- ", chunk_id = %" PRIu64, relayd_id_str,
- chunk_id);
+ ", chunk_id = %" PRIu64,
+ relayd_id_str,
+ chunk_id);
ret = lttng_trace_chunk_registry_chunk_exists(
- the_consumer_data.chunk_registry, session_id, chunk_id,
- &chunk_exists_local);
+ the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
if (ret) {
/* Internal error. */
ERR("Failed to query the existence of a trace chunk");
ret_code = LTTCOMM_CONSUMERD_FATAL;
goto end;
}
- DBG("Trace chunk %s locally",
- chunk_exists_local ? "exists" : "does not exist");
+ DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
if (chunk_exists_local) {
ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
goto end;
goto end;
}
- rcu_read_lock();
relayd = consumer_find_relayd(*relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, *relayd_id);
}
DBG("Looking up existence of trace chunk on relay daemon");
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
- &chunk_exists_remote);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Failed to look-up the existence of trace chunk on relay daemon");
goto end_rcu_unlock;
}
- ret_code = chunk_exists_remote ?
- LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
- LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
- DBG("Trace chunk %s on relay daemon",
- chunk_exists_remote ? "exists" : "does not exist");
+ ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
+ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret_code;
}
-static
-int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
+static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
{
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
ht = the_consumer_data.stream_per_chan_id_ht;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key,
- &iter.iter, stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
/*
* Protect against teardown with mutex.
*/
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
return LTTCOMM_CONSUMERD_SUCCESS;
error_unlock:
pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
return ret;
}
return ret;
}
-enum lttcomm_return_code lttng_consumer_open_channel_packets(
- struct lttng_consumer_channel *channel)
+enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
{
struct lttng_consumer_stream *stream;
enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
goto end;
}
- rcu_read_lock();
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
- enum consumer_stream_open_packet_status status;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ enum consumer_stream_open_packet_status status;
- pthread_mutex_lock(&stream->lock);
- if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
- }
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
- status = consumer_stream_open_packet(stream);
- switch (status) {
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk = true;
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
- DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
- /*
- * Only unexpected internal errors can lead to this
- * failing. Report an unknown error.
- */
- ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
- ", channel id = %" PRIu64
- ", channel name = %s"
- ", session id = %" PRIu64,
- stream->key, channel->key,
- channel->name, channel->session_id);
- ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
- goto error_unlock;
- default:
- abort();
- }
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64 ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key,
+ channel->key,
+ channel->name,
+ channel->session_id);
+ ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ goto error_unlock;
+ default:
+ abort();
+ }
- next:
- pthread_mutex_unlock(&stream->lock);
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
}
-
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret;