#include <common/dynamic-array.hpp>
#include <common/index/ctf-index.hpp>
#include <common/index/index.hpp>
+#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/time.hpp>
#include <common/trace-chunk-registry.hpp>
#include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
#include <common/ust-consumer/ust-consumer.hpp>
#include <common/utils.hpp>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
+#include <type_traits>
#include <unistd.h>
lttng_consumer_global_data the_consumer_data;
LTTNG_ASSERT(pipe);
- (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+ (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+ pointer. */
}
static void notify_health_quit_pipe(int *pipe)
*/
static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
{
- struct lttng_consumer_stream *stream, *stmp;
-
LTTNG_ASSERT(channel);
/* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
- /*
- * Once a stream is added to this list, the buffers were created so we
- * have a guarantee that this call will succeed. Setting the monitor
- * mode to 0 so we don't lock nor try to delete the stream from the
- * global hash table.
- */
- stream->monitor = 0;
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
consumer_stream_destroy(stream, nullptr);
}
}
return nullptr;
}
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
- rcu_read_unlock();
-
return stream;
}
{
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
stream = find_stream(key, ht);
if (stream) {
stream->key = (uint64_t) -1ULL;
*/
stream->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
/*
}
lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
channel = lttng::utils::container_of(node, <tng_consumer_channel::node);
}
{
struct lttng_consumer_channel *channel;
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(key);
if (channel) {
channel->key = (uint64_t) -1ULL;
*/
channel->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
static void free_channel_rcu(struct rcu_head *head)
ERR("Unknown consumer_data type");
abort();
}
- free(channel);
+
+ delete channel;
}
/*
if (channel->is_published) {
int ret;
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
LTTNG_ASSERT(!ret);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
LTTNG_ASSERT(!ret);
- rcu_read_unlock();
}
channel->is_deleted = true;
*/
static void cleanup_relayd_ht()
{
- struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd;
-
- rcu_read_lock();
-
- cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ for (auto *relayd :
+ lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+ decltype(consumer_relayd_sock_pair::node),
+ &consumer_relayd_sock_pair::node>(
+ *the_consumer_data.relayd_ht->ht)) {
consumer_destroy_relayd(relayd);
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.relayd_ht);
}
static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
enum consumer_endpoint_status status)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- rcu_read_lock();
-
/* Let's begin with metadata */
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ stream->chan->metadata_pushed_wait_queue.wake_all();
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
/* Follow up by the data streams */
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*data_ht->ht)) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to data stream %d", stream->wait_fd);
}
}
- rcu_read_unlock();
}
/*
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
the_consumer_data.stream_count++;
the_consumer_data.need_update = 1;
- rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
*/
static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
- int ret = 0;
+ const int ret = 0;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
goto end;
}
}
lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
}
LTTNG_ASSERT(path);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
stream->net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
LTTNG_ASSERT(net_seq_idx != -1ULL);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
- rcu_read_unlock();
}
/*
}
}
- channel = zmalloc<lttng_consumer_channel>();
- if (channel == nullptr) {
- PERROR("malloc struct lttng_consumer_channel");
+ try {
+ channel = new lttng_consumer_channel;
+ } catch (const std::bad_alloc& e) {
+ ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+ channel = nullptr;
goto end;
}
break;
default:
abort();
- free(channel);
+ delete channel;
channel = nullptr;
goto end;
}
CDS_INIT_LIST_HEAD(&channel->streams.head);
if (trace_chunk) {
- int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
+ const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
if (ret) {
goto error;
}
*/
steal_channel_key(channel->key);
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
&channel->channels_by_session_id_ht_node);
- rcu_read_unlock();
channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
int *nb_inactive_fd)
{
int i = 0;
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(ht);
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
/*
* Only active streams with an active end point can be added to the
* poll set and local stream storage of the thread.
(*nb_inactive_fd)++;
continue;
}
- /*
- * This clobbers way too much the debug output. Uncomment that if you
- * need it for debugging purposes.
- */
+
(*pollfd)[i].fd = stream->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
local_stream[i] = stream;
i++;
}
- rcu_read_unlock();
/*
* Insert the consumer_data_pipe at the end of the array and don't
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+ enum lttcomm_return_code error_code)
{
if (ctx->consumer_error_socket > 0) {
+ const std::int32_t comm_code = std::int32_t(error_code);
+
+ static_assert(
+ sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+ "Fixed-size communication type too small to accomodate lttcomm_return_code");
return lttcomm_send_unix_sock(
- ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+ ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
}
return 0;
*/
void lttng_consumer_cleanup()
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
unsigned int trace_chunks_left;
- rcu_read_lock();
-
- cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::node),
+ <tng_consumer_channel::node>(
+ *the_consumer_data.channel_ht->ht)) {
consumer_del_channel(channel);
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.channel_ht);
lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int ret;
- int outfd = stream->out_fd;
+ const int outfd = stream->out_fd;
/*
* This does a blocking write-and-wait on any page that belongs to the
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd,
- orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
- SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- ret = posix_fadvise(
- outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
- if (ret && ret != -ENOSYS) {
- errno = ret;
- PERROR("posix_fadvise on fd %i", outfd);
- }
+ lttng::io::hint_flush_range_dont_need_sync(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
}
/*
*/
static void destroy_data_stream_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
/*
* Ignore return value since we are currently cleaning up so any error
* can't be handled.
*/
(void) consumer_del_stream(stream, ht);
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
*/
static void destroy_metadata_stream_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
/*
* Ignore return value since we are currently cleaning up so any error
* can't be handled.
*/
(void) consumer_del_metadata_stream(stream, ht);
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
size_t write_len;
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return ret;
}
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
off_t orig_offset = stream->out_fd_offset;
- int fd = stream->wait_fd;
+ const int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
struct consumer_relayd_sock_pair *relayd = nullptr;
}
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
/* Handle stream on the relayd if the output is on the network */
if (relayd && stream->metadata_flag) {
- size_t metadata_payload_size =
+ const size_t metadata_payload_size =
sizeof(struct lttcomm_relayd_metadata_payload);
/* Update counter to fit the spliced data */
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return written;
}
/* Go for channel deletion! */
free_channel = true;
}
- stream->chan = nullptr;
/*
* Nullify the stream reference so it is not used after deletion. The
* pointer value.
*/
channel->metadata_stream = nullptr;
+ channel->metadata_pushed_wait_queue.wake_all();
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
* after this point.
*/
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
/*
* Lookup the stream just to make sure it does not exist in our internal
* state. This should NEVER happen.
*/
lttng_ht_lookup(ht, &stream->key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(!node);
/*
*/
lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
- rcu_read_unlock();
-
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
*/
static void validate_endpoint_status_data_stream()
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer delete flagged data stream");
- rcu_read_lock();
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*data_ht->ht)) {
/* Validate delete flag of the stream */
if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
/* Delete it right now */
consumer_del_stream(stream, data_ht);
}
- rcu_read_unlock();
}
/*
*/
static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer delete flagged metadata stream");
LTTNG_ASSERT(pollset);
- rcu_read_lock();
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
/* Validate delete flag of the stream */
if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
/* Delete it right now */
consumer_del_metadata_stream(stream, metadata_ht);
}
- rcu_read_unlock();
}
/*
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
&stream,
- sizeof(stream));
- if (pipe_len < sizeof(stream)) {
+ sizeof(stream)); /* NOLINT sizeof
+ used on a
+ pointer. */
+ if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+ pointer. */
if (pipe_len < 0) {
PERROR("read metadata stream");
}
continue;
}
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
}
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
- stream = caa_container_of(node, struct lttng_consumer_stream, node);
+ stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
if (revents & (LPOLLIN | LPOLLPRI)) {
/* Get the data out of the metadata file descriptor */
consumer_del_metadata_stream(stream, metadata_ht);
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the stream looked up */
- rcu_read_unlock();
}
}
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i, err = -1;
+ int num_rdy, high_prio, ret, i, err = -1;
struct pollfd *pollfd = nullptr;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
health_code_update();
high_prio = 0;
- num_hup = 0;
/*
* the fds set has been updated, we need to update our
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
- pipe_readlen = lttng_pipe_read(
- ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
- if (pipe_readlen < sizeof(new_stream)) {
+ pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+ &new_stream,
+ sizeof(new_stream)); /* NOLINT sizeof used on
+ a pointer. */
+ if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+ */
PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
continue;
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
}
if (local_stream[i] != nullptr) {
*/
static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
{
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
-
- ht = the_consumer_data.stream_per_chan_id_ht;
-
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
/*
* Protect against teardown with mutex.
*/
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
}
static void destroy_channel_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
- int ret;
-
if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
- ret = lttng_ht_del(ht, &iter);
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::wait_fd_node),
+ <tng_consumer_channel::wait_fd_node>(*ht->ht)) {
+ const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
LTTNG_ASSERT(ret != 0);
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
switch (action) {
case CONSUMER_CHANNEL_ADD:
+ {
DBG("Adding channel %d to poll set", chan->wait_fd);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
- rcu_read_unlock();
/* Add channel to the global poll events list */
// FIXME: Empty flag on a pipe pollset, this might
// hang on FreeBSD.
lttng_poll_add(&events, chan->wait_fd, 0);
break;
+ }
case CONSUMER_CHANNEL_DEL:
{
/*
* GET_CHANNEL failed.
*/
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
chan = consumer_find_channel(key);
if (!chan) {
- rcu_read_unlock();
ERR("UST consumer get channel key %" PRIu64
" not found for del channel",
key);
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
- rcu_read_unlock();
goto restart;
}
case CONSUMER_CHANNEL_QUIT:
continue;
}
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
lttng_ht_lookup(channel_ht, &tmp_id, &iter);
}
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
- chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
+ chan = lttng::utils::container_of(node,
+ <tng_consumer_channel::wait_fd_node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
}
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the channel looked up */
- rcu_read_unlock();
}
}
*/
static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
{
- struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd = nullptr;
-
- ASSERT_RCU_READ_LOCKED();
-
/* Iterate over all relayd since they are indexed by net_seq_idx. */
- cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ for (auto *relayd :
+ lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+ decltype(consumer_relayd_sock_pair::node),
+ &consumer_relayd_sock_pair::node>(
+ *the_consumer_data.relayd_ht->ht)) {
/*
* Check by sessiond id which is unique here where the relayd session
* id might not be when having multiple relayd.
*/
if (relayd->sessiond_session_id == id) {
/* Found the relayd. There can be only one per id. */
- goto found;
+ return relayd;
}
}
return nullptr;
-
-found:
- return relayd;
}
/*
int consumer_data_pending(uint64_t id)
{
int ret;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
+ const auto ht = the_consumer_data.stream_list_ht;
struct consumer_relayd_sock_pair *relayd = nullptr;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
- rcu_read_lock();
- pthread_mutex_lock(&the_consumer_data.lock);
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
abort();
}
- /* Ease our life a bit */
- ht = the_consumer_data.stream_list_ht;
-
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
- {
- pthread_mutex_lock(&stream->lock);
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
/*
* A removed node from the hash table indicates that the stream has
/* Check the stream if there is data in the buffers. */
ret = data_pending(stream);
if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
goto data_pending;
}
}
-
- pthread_mutex_unlock(&stream->lock);
}
relayd = find_relayd_by_session_id(id);
if (relayd) {
unsigned int is_data_inflight = 0;
+ const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
+
/* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
if (ret < 0) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Communication error thus the relayd so no data pending. */
goto data_not_pending;
}
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(
+ *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
}
if (ret == 1) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
} else if (ret < 0) {
ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_not_pending;
}
}
/* Send end command for data pending. */
ret = relayd_end_data_pending(
&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
relayd->net_seq_idx);
data_not_pending:
/* Data is available to be read by a viewer. */
- pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 0;
data_pending:
/* Data is still being extracted from buffers. */
- pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 1;
}
uint64_t relayd_id)
{
int ret;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
struct lttng_dynamic_array stream_rotation_positions;
uint64_t next_chunk_id, stream_count = 0;
enum lttng_trace_chunk_status chunk_status;
bool rotating_to_new_chunk = true;
/* Array of `struct lttng_consumer_stream *` */
struct lttng_dynamic_pointer_array streams_packet_to_open;
- size_t stream_idx;
ASSERT_RCU_READ_LOCKED();
nullptr);
lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- rcu_read_lock();
+ const lttng::pthread::lock_guard channel_lock(channel->lock);
- pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
- goto end_unlock_channel;
+ goto end;
}
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
unsigned long produced_pos = 0, consumed_pos = 0;
health_code_update();
/*
* Lock stream because we are about to change its state.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (stream->trace_chunk == stream->chan->trace_chunk) {
rotating_to_new_chunk = false;
ret = sample_stream_positions(
stream, &produced_pos, &consumed_pos);
if (ret) {
- goto end_unlock_stream;
+ goto end;
}
/*
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
stream->key);
- goto end_unlock_stream;
+ goto end;
}
}
ret = lttng_consumer_take_snapshot(stream);
if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
ERR("Failed to sample snapshot position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
if (!ret) {
ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Failed to sample produced position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Failed to sample consumed position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
}
/*
ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
stream->key);
ret = -1;
- goto end_unlock_stream;
+ goto end;
}
stream->rotate_position = stream->last_sequence_number + 1 +
((produced_pos - consumed_pos) / stream->max_sb_size);
&position);
if (ret) {
ERR("Failed to allocate stream rotation position");
- goto end_unlock_stream;
+ goto end;
}
stream_count++;
}
if (ret) {
PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
ret = -1;
- goto end_unlock_stream;
+ goto end;
}
}
-
- pthread_mutex_unlock(&stream->lock);
}
- stream = nullptr;
if (!is_local_trace) {
relayd = consumer_find_relayd(relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, relayd_id);
ret = -1;
- goto end_unlock_channel;
+ goto end;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- goto end_unlock_channel;
+ goto end;
}
}
- for (stream_idx = 0;
+ for (std::size_t stream_idx = 0;
stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
stream_idx++) {
enum consumer_stream_open_packet_status status;
-
- stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
+ auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
&streams_packet_to_open, stream_idx);
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
status = consumer_stream_open_packet(stream);
- pthread_mutex_unlock(&stream->lock);
switch (status) {
case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet after a rotation: stream id = %" PRIu64
case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/* Logged by callee. */
ret = -1;
- goto end_unlock_channel;
+ goto end;
default:
abort();
}
}
- pthread_mutex_unlock(&channel->lock);
ret = 0;
- goto end;
-
-end_unlock_stream:
- pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
- pthread_mutex_unlock(&channel->lock);
end:
- rcu_read_unlock();
lttng_dynamic_array_reset(&stream_rotation_positions);
lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
return ret;
static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
{
- int ret;
- struct lttng_consumer_stream *stream;
+ const lttng::urcu::read_lock_guard read_lock;
+ const lttng::pthread::lock_guard channel_lock(channel->lock);
- rcu_read_lock();
- pthread_mutex_lock(&channel->lock);
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
- pthread_mutex_lock(&stream->lock);
- ret = consumer_clear_stream(stream);
+
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
+ const auto ret = consumer_clear_stream(stream);
if (ret) {
- goto error_unlock;
+ return ret;
}
- pthread_mutex_unlock(&stream->lock);
}
- pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
- return 0;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
- return ret;
+ return 0;
}
/*
int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
{
int ret;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
ASSERT_RCU_READ_LOCKED();
- rcu_read_lock();
-
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
- pthread_mutex_lock(&stream->chan->lock);
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (!stream->rotate_ready) {
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
continue;
}
- DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
ret = lttng_consumer_rotate_stream(stream);
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
if (ret) {
goto end;
}
ret = 0;
end:
- rcu_read_unlock();
return ret;
}
char creation_timestamp_buffer[ISO8601_STR_LEN];
const char *relayd_id_str = "(none)";
const char *creation_timestamp_str;
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
if (relayd_id) {
/* Only used for logging purposes. */
goto error;
}
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(
- the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
- the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id,
- &iter.iter,
- channel,
- channels_by_session_id_ht_node.node)
- {
+ for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_channel,
+ decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
+ <tng_consumer_channel::channels_by_session_id_ht_node,
+ std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
+ &session_id,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct)) {
ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
if (ret) {
/*
}
}
error_unlock:
- rcu_read_unlock();
error:
/* Release the reference returned by the "publish" operation. */
lttng_trace_chunk_put(published_chunk);
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
const char *close_command_name = "none";
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
enum lttng_trace_chunk_status chunk_status;
if (relayd_id) {
* it; it is only kept around to compare it (by address) to the
* current chunk found in the session's channels.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::node),
+ <tng_consumer_channel::node>(
+ *the_consumer_data.channel_ht->ht)) {
int ret;
/*
}
}
error_unlock:
- rcu_read_unlock();
end:
/*
* Release the reference returned by the "find" operation and
const bool is_local_trace = !relayd_id;
struct consumer_relayd_sock_pair *relayd = nullptr;
bool chunk_exists_local, chunk_exists_remote;
+ const lttng::urcu::read_lock_guard read_lock;
if (relayd_id) {
/* Only used for logging purposes. */
goto end;
}
- rcu_read_lock();
relayd = consumer_find_relayd(*relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, *relayd_id);
DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret_code;
}
static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
{
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
int ret;
-
- ht = the_consumer_data.stream_per_chan_id_ht;
-
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
/*
* Protect against teardown with mutex.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
+ continue;
}
+
ret = consumer_clear_stream(stream);
if (ret) {
- goto error_unlock;
+ return ret;
}
- next:
- pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
- return LTTCOMM_CONSUMERD_SUCCESS;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
- return ret;
+ return LTTCOMM_CONSUMERD_SUCCESS;
}
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
{
- struct lttng_consumer_stream *stream;
enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
if (channel->metadata_stream) {
ERR("Open channel packets command attempted on a metadata channel");
- ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
- goto end;
+ return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
}
- rcu_read_lock();
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ const lttng::urcu::read_lock_guard read_lock;
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
enum consumer_stream_open_packet_status status;
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
+ continue;
}
status = consumer_stream_open_packet(stream);
channel->key,
channel->name,
channel->session_id);
- ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
- goto error_unlock;
+ return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
default:
abort();
}
-
- next:
- pthread_mutex_unlock(&stream->lock);
}
-end_rcu_unlock:
- rcu_read_unlock();
-end:
return ret;
-
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- goto end_rcu_unlock;
}
void lttng_consumer_sigbus_handle(void *addr)