#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
+#include <type_traits>
#include <unistd.h>
lttng_consumer_global_data the_consumer_data;
*/
static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
{
- struct lttng_consumer_stream *stream, *stmp;
-
LTTNG_ASSERT(channel);
/* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
- /*
- * Once a stream is added to this list, the buffers were created so we
- * have a guarantee that this call will succeed. Setting the monitor
- * mode to 0 so we don't lock nor try to delete the stream from the
- * global hash table.
- */
- stream->monitor = 0;
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
consumer_stream_destroy(stream, nullptr);
}
}
return nullptr;
}
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
{
struct lttng_consumer_stream *stream;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
stream = find_stream(key, ht);
if (stream) {
stream->key = (uint64_t) -1ULL;
}
lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
channel = lttng::utils::container_of(node, <tng_consumer_channel::node);
}
{
struct lttng_consumer_channel *channel;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(key);
if (channel) {
channel->key = (uint64_t) -1ULL;
ERR("Unknown consumer_data type");
abort();
}
- free(channel);
+
+ delete channel;
}
/*
if (channel->is_published) {
int ret;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
LTTNG_ASSERT(!ret);
*/
static void cleanup_relayd_ht()
{
- struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd;
-
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (
- the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
- consumer_destroy_relayd(relayd);
- }
+ for (auto *relayd :
+ lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+ decltype(consumer_relayd_sock_pair::node),
+ &consumer_relayd_sock_pair::node>(
+ *the_consumer_data.relayd_ht->ht)) {
+ consumer_destroy_relayd(relayd);
}
lttng_ht_destroy(the_consumer_data.relayd_ht);
static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
enum consumer_endpoint_status status)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- lttng::urcu::read_lock_guard read_lock;
-
/* Let's begin with metadata */
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ stream->chan->metadata_pushed_wait_queue.wake_all();
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
/* Follow up by the data streams */
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*data_ht->ht)) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to data stream %d", stream->wait_fd);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
*/
static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
- int ret = 0;
+ const int ret = 0;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
goto end;
}
}
lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
}
LTTNG_ASSERT(path);
/* The stream is not metadata. Get relayd reference if exists. */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
LTTNG_ASSERT(net_seq_idx != -1ULL);
/* The stream is not metadata. Get relayd reference if exists. */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
}
- channel = zmalloc<lttng_consumer_channel>();
- if (channel == nullptr) {
- PERROR("malloc struct lttng_consumer_channel");
+ try {
+ channel = new lttng_consumer_channel;
+ } catch (const std::bad_alloc& e) {
+ ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+ channel = nullptr;
goto end;
}
break;
default:
abort();
- free(channel);
+ delete channel;
channel = nullptr;
goto end;
}
CDS_INIT_LIST_HEAD(&channel->streams.head);
if (trace_chunk) {
- int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
+ const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
if (ret) {
goto error;
}
*/
steal_channel_key(channel->key);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
&channel->channels_by_session_id_ht_node);
int *nb_inactive_fd)
{
int i = 0;
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(ht);
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Only active streams with an active end point can be added to the
- * poll set and local stream storage of the thread.
- *
- * There is a potential race here for endpoint_status to be updated
- * just after the check. However, this is OK since the stream(s) will
- * be deleted once the thread is notified that the end point state has
- * changed where this function will be called back again.
- *
- * We track the number of inactive FDs because they still need to be
- * closed by the polling thread after a wakeup on the data_pipe or
- * metadata_pipe.
- */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
- (*nb_inactive_fd)++;
- continue;
- }
-
- (*pollfd)[i].fd = stream->wait_fd;
- (*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] = stream;
- i++;
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
+ */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
+ continue;
}
+
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
}
/*
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+ enum lttcomm_return_code error_code)
{
if (ctx->consumer_error_socket > 0) {
+ const std::int32_t comm_code = std::int32_t(error_code);
+
+ static_assert(
+ sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+ "Fixed-size communication type too small to accomodate lttcomm_return_code");
return lttcomm_send_unix_sock(
- ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+ ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
}
return 0;
*/
void lttng_consumer_cleanup()
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
unsigned int trace_chunks_left;
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (
- the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- consumer_del_channel(channel);
- }
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::node),
+ <tng_consumer_channel::node>(
+ *the_consumer_data.channel_ht->ht)) {
+ consumer_del_channel(channel);
}
lttng_ht_destroy(the_consumer_data.channel_ht);
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int outfd = stream->out_fd;
+ const int outfd = stream->out_fd;
/*
* This does a blocking write-and-wait on any page that belongs to the
*/
static void destroy_data_stream_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
if (ht == nullptr) {
return;
}
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_stream(stream, ht);
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
}
lttng_ht_destroy(ht);
*/
static void destroy_metadata_stream_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
if (ht == nullptr) {
return;
}
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_metadata_stream(stream, ht);
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
}
lttng_ht_destroy(ht);
size_t write_len;
/* RCU lock for the relayd pointer */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
off_t orig_offset = stream->out_fd_offset;
- int fd = stream->wait_fd;
+ const int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
struct consumer_relayd_sock_pair *relayd = nullptr;
}
/* RCU lock for the relayd pointer */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
/* Handle stream on the relayd if the output is on the network */
if (relayd && stream->metadata_flag) {
- size_t metadata_payload_size =
+ const size_t metadata_payload_size =
sizeof(struct lttcomm_relayd_metadata_payload);
/* Update counter to fit the spliced data */
/* Go for channel deletion! */
free_channel = true;
}
- stream->chan = nullptr;
/*
* Nullify the stream reference so it is not used after deletion. The
* pointer value.
*/
channel->metadata_stream = nullptr;
+ channel->metadata_pushed_wait_queue.wake_all();
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
* after this point.
*/
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/*
* Lookup the stream just to make sure it does not exist in our internal
* state. This should NEVER happen.
*/
lttng_ht_lookup(ht, &stream->key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(!node);
/*
*/
static void validate_endpoint_status_data_stream()
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer delete flagged data stream");
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /* Delete it right now */
- consumer_del_stream(stream, data_ht);
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*data_ht->ht)) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
}
+ /* Delete it right now */
+ consumer_del_stream(stream, data_ht);
}
}
*/
static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer delete flagged metadata stream");
LTTNG_ASSERT(pollset);
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /*
- * Remove from pollset so the metadata thread can continue without
- * blocking on a deleted stream.
- */
- lttng_poll_del(pollset, stream->wait_fd);
-
- /* Delete it right now */
- consumer_del_metadata_stream(stream, metadata_ht);
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
}
+ /*
+ * Remove from pollset so the metadata thread can continue without
+ * blocking on a deleted stream.
+ */
+ lttng_poll_del(pollset, stream->wait_fd);
+
+ /* Delete it right now */
+ consumer_del_metadata_stream(stream, metadata_ht);
}
}
continue;
}
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
}
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
- stream = caa_container_of(node, struct lttng_consumer_stream, node);
+ stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
if (revents & (LPOLLIN | LPOLLPRI)) {
/* Get the data out of the metadata file descriptor */
*/
static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
{
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
-
- ht = the_consumer_data.stream_per_chan_id_ht;
-
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
/*
* Protect against teardown with mutex.
*/
static void destroy_channel_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
- int ret;
-
if (ht == nullptr) {
return;
}
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
- ret = lttng_ht_del(ht, &iter);
- LTTNG_ASSERT(ret != 0);
- }
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::wait_fd_node),
+ <tng_consumer_channel::wait_fd_node>(*ht->ht)) {
+ const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
+ LTTNG_ASSERT(ret != 0);
}
lttng_ht_destroy(ht);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
/* Add channel to the global poll events list */
* GET_CHANNEL failed.
*/
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
chan = consumer_find_channel(key);
if (!chan) {
ERR("UST consumer get channel key %" PRIu64
continue;
}
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
lttng_ht_lookup(channel_ht, &tmp_id, &iter);
}
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
- chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
+ chan = lttng::utils::container_of(node,
+ <tng_consumer_channel::wait_fd_node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
*/
static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
{
- struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd = nullptr;
-
- ASSERT_RCU_READ_LOCKED();
-
/* Iterate over all relayd since they are indexed by net_seq_idx. */
- cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ for (auto *relayd :
+ lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+ decltype(consumer_relayd_sock_pair::node),
+ &consumer_relayd_sock_pair::node>(
+ *the_consumer_data.relayd_ht->ht)) {
/*
* Check by sessiond id which is unique here where the relayd session
* id might not be when having multiple relayd.
*/
if (relayd->sessiond_session_id == id) {
/* Found the relayd. There can be only one per id. */
- goto found;
+ return relayd;
}
}
return nullptr;
-
-found:
- return relayd;
}
/*
int consumer_data_pending(uint64_t id)
{
int ret;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
+ const auto ht = the_consumer_data.stream_list_ht;
struct consumer_relayd_sock_pair *relayd = nullptr;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
- lttng::urcu::read_lock_guard read_lock;
- pthread_mutex_lock(&the_consumer_data.lock);
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
abort();
}
- /* Ease our life a bit */
- ht = the_consumer_data.stream_list_ht;
-
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
- {
- pthread_mutex_lock(&stream->lock);
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
/*
* A removed node from the hash table indicates that the stream has
/* Check the stream if there is data in the buffers. */
ret = data_pending(stream);
if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
goto data_pending;
}
}
-
- pthread_mutex_unlock(&stream->lock);
}
relayd = find_relayd_by_session_id(id);
if (relayd) {
unsigned int is_data_inflight = 0;
+ const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
+
/* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
if (ret < 0) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Communication error thus the relayd so no data pending. */
goto data_not_pending;
}
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(
+ *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
}
if (ret == 1) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
} else if (ret < 0) {
ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_not_pending;
}
}
/* Send end command for data pending. */
ret = relayd_end_data_pending(
&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
relayd->net_seq_idx);
data_not_pending:
/* Data is available to be read by a viewer. */
- pthread_mutex_unlock(&the_consumer_data.lock);
return 0;
data_pending:
/* Data is still being extracted from buffers. */
- pthread_mutex_unlock(&the_consumer_data.lock);
return 1;
}
uint64_t relayd_id)
{
int ret;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
struct lttng_dynamic_array stream_rotation_positions;
uint64_t next_chunk_id, stream_count = 0;
enum lttng_trace_chunk_status chunk_status;
bool rotating_to_new_chunk = true;
/* Array of `struct lttng_consumer_stream *` */
struct lttng_dynamic_pointer_array streams_packet_to_open;
- size_t stream_idx;
ASSERT_RCU_READ_LOCKED();
nullptr);
lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::pthread::lock_guard channel_lock(channel->lock);
- pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
- goto end_unlock_channel;
+ goto end;
}
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
unsigned long produced_pos = 0, consumed_pos = 0;
health_code_update();
/*
* Lock stream because we are about to change its state.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (stream->trace_chunk == stream->chan->trace_chunk) {
rotating_to_new_chunk = false;
ret = sample_stream_positions(
stream, &produced_pos, &consumed_pos);
if (ret) {
- goto end_unlock_stream;
+ goto end;
}
/*
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
stream->key);
- goto end_unlock_stream;
+ goto end;
}
}
ret = lttng_consumer_take_snapshot(stream);
if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
ERR("Failed to sample snapshot position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
if (!ret) {
ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Failed to sample produced position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Failed to sample consumed position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
}
/*
ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
stream->key);
ret = -1;
- goto end_unlock_stream;
+ goto end;
}
stream->rotate_position = stream->last_sequence_number + 1 +
((produced_pos - consumed_pos) / stream->max_sb_size);
&position);
if (ret) {
ERR("Failed to allocate stream rotation position");
- goto end_unlock_stream;
+ goto end;
}
stream_count++;
}
if (ret) {
PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
ret = -1;
- goto end_unlock_stream;
+ goto end;
}
}
-
- pthread_mutex_unlock(&stream->lock);
}
- stream = nullptr;
if (!is_local_trace) {
relayd = consumer_find_relayd(relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, relayd_id);
ret = -1;
- goto end_unlock_channel;
+ goto end;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- goto end_unlock_channel;
+ goto end;
}
}
- for (stream_idx = 0;
+ for (std::size_t stream_idx = 0;
stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
stream_idx++) {
enum consumer_stream_open_packet_status status;
-
- stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
+ auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
&streams_packet_to_open, stream_idx);
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
status = consumer_stream_open_packet(stream);
- pthread_mutex_unlock(&stream->lock);
switch (status) {
case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet after a rotation: stream id = %" PRIu64
case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/* Logged by callee. */
ret = -1;
- goto end_unlock_channel;
+ goto end;
default:
abort();
}
}
- pthread_mutex_unlock(&channel->lock);
ret = 0;
- goto end;
-
-end_unlock_stream:
- pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
- pthread_mutex_unlock(&channel->lock);
end:
lttng_dynamic_array_reset(&stream_rotation_positions);
lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
{
- int ret;
- struct lttng_consumer_stream *stream;
+ const lttng::urcu::read_lock_guard read_lock;
+ const lttng::pthread::lock_guard channel_lock(channel->lock);
- lttng::urcu::read_lock_guard read_lock;
- pthread_mutex_lock(&channel->lock);
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
- pthread_mutex_lock(&stream->lock);
- ret = consumer_clear_stream(stream);
+
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
+ const auto ret = consumer_clear_stream(stream);
if (ret) {
- goto error_unlock;
+ return ret;
}
- pthread_mutex_unlock(&stream->lock);
}
- pthread_mutex_unlock(&channel->lock);
- return 0;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&channel->lock);
- return ret;
+ return 0;
}
/*
int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
{
int ret;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
ASSERT_RCU_READ_LOCKED();
- lttng::urcu::read_lock_guard read_lock;
-
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
- pthread_mutex_lock(&stream->chan->lock);
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (!stream->rotate_ready) {
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
continue;
}
- DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
ret = lttng_consumer_rotate_stream(stream);
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
if (ret) {
goto end;
}
char creation_timestamp_buffer[ISO8601_STR_LEN];
const char *relayd_id_str = "(none)";
const char *creation_timestamp_str;
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
if (relayd_id) {
/* Only used for logging purposes. */
goto error;
}
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry_duplicate(
- the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
- lttng_ht_seed),
- the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id,
- &iter.iter,
- channel,
- channels_by_session_id_ht_node.node)
- {
- ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
- if (ret) {
- /*
- * Roll-back the creation of this chunk.
- *
- * This is important since the session daemon will
- * assume that the creation of this chunk failed and
- * will never ask for it to be closed, resulting
- * in a leak and an inconsistent state for some
- * channels.
- */
- enum lttcomm_return_code close_ret;
- char path[LTTNG_PATH_MAX];
-
- DBG("Failed to set new trace chunk on existing channels, rolling back");
- close_ret =
- lttng_consumer_close_trace_chunk(relayd_id,
- session_id,
- chunk_id,
- chunk_creation_timestamp,
- nullptr,
- path);
- if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- session_id,
- chunk_id);
- }
+ for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_channel,
+ decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
+ <tng_consumer_channel::channels_by_session_id_ht_node,
+ std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
+ &session_id,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct)) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
- ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
- break;
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ nullptr,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
}
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
}
}
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
const char *close_command_name = "none";
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
enum lttng_trace_chunk_status chunk_status;
if (relayd_id) {
* it; it is only kept around to compare it (by address) to the
* current chunk found in the session's channels.
*/
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (
- the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- int ret;
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::node),
+ <tng_consumer_channel::node>(
+ *the_consumer_data.channel_ht->ht)) {
+ int ret;
+ /*
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
+ */
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+ if (ret) {
/*
- * Only change the channel's chunk to NULL if it still
- * references the chunk being closed. The channel may
- * reference a newer channel in the case of a session
- * rotation. When a session rotation occurs, the "next"
- * chunk is created before the "current" chunk is closed.
+ * Attempt to close the chunk on as many channels as
+ * possible.
*/
- if (channel->trace_chunk != chunk) {
- continue;
- }
- ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
- if (ret) {
- /*
- * Attempt to close the chunk on as many channels as
- * possible.
- */
- ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
- }
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
}
}
+
if (relayd_id) {
int ret;
struct consumer_relayd_sock_pair *relayd;
const bool is_local_trace = !relayd_id;
struct consumer_relayd_sock_pair *relayd = nullptr;
bool chunk_exists_local, chunk_exists_remote;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
if (relayd_id) {
/* Only used for logging purposes. */
static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
{
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
int ret;
-
- ht = the_consumer_data.stream_per_chan_id_ht;
-
- lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
/*
* Protect against teardown with mutex.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
+ continue;
}
+
ret = consumer_clear_stream(stream);
if (ret) {
- goto error_unlock;
+ return ret;
}
- next:
- pthread_mutex_unlock(&stream->lock);
}
- return LTTCOMM_CONSUMERD_SUCCESS;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- return ret;
+ return LTTCOMM_CONSUMERD_SUCCESS;
}
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
{
- struct lttng_consumer_stream *stream;
enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
if (channel->metadata_stream) {
ERR("Open channel packets command attempted on a metadata channel");
- ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
- goto end;
+ return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
}
- {
- lttng::urcu::read_lock_guard read_lock;
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
- enum consumer_stream_open_packet_status status;
-
- pthread_mutex_lock(&stream->lock);
- if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
- }
+ const lttng::urcu::read_lock_guard read_lock;
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
+ enum consumer_stream_open_packet_status status;
- status = consumer_stream_open_packet(stream);
- switch (status) {
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk = true;
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
- DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
- /*
- * Only unexpected internal errors can lead to this
- * failing. Report an unknown error.
- */
- ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
- ", channel id = %" PRIu64 ", channel name = %s"
- ", session id = %" PRIu64,
- stream->key,
- channel->key,
- channel->name,
- channel->session_id);
- ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
- goto error_unlock;
- default:
- abort();
- }
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ continue;
+ }
- next:
- pthread_mutex_unlock(&stream->lock);
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64 ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key,
+ channel->key,
+ channel->name,
+ channel->session_id);
+ return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ default:
+ abort();
}
}
-end_rcu_unlock:
-end:
- return ret;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- goto end_rcu_unlock;
+ return ret;
}
void lttng_consumer_sigbus_handle(void *addr)