#include <common/dynamic-array.hpp>
#include <common/index/ctf-index.hpp>
#include <common/index/index.hpp>
+#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
#include <common/relayd/relayd.hpp>
#include <common/time.hpp>
#include <common/trace-chunk-registry.hpp>
#include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
#include <common/ust-consumer/ust-consumer.hpp>
#include <common/utils.hpp>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
LTTNG_ASSERT(pipe);
- (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+ (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+ pointer. */
}
static void notify_health_quit_pipe(int *pipe)
return nullptr;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
- rcu_read_unlock();
-
return stream;
}
{
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
stream = find_stream(key, ht);
if (stream) {
stream->key = (uint64_t) -1ULL;
*/
stream->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
/*
{
struct lttng_consumer_channel *channel;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(key);
if (channel) {
channel->key = (uint64_t) -1ULL;
*/
channel->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
static void free_channel_rcu(struct rcu_head *head)
if (channel->is_published) {
int ret;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
LTTNG_ASSERT(!ret);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
LTTNG_ASSERT(!ret);
- rcu_read_unlock();
}
channel->is_deleted = true;
struct lttng_ht_iter iter;
struct consumer_relayd_sock_pair *relayd;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
- consumer_destroy_relayd(relayd);
+ cds_lfht_for_each_entry (
+ the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ consumer_destroy_relayd(relayd);
+ }
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.relayd_ht);
}
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Let's begin with metadata */
cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
DBG("Delete flag set to data stream %d", stream->wait_fd);
}
}
- rcu_read_unlock();
}
/*
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
the_consumer_data.stream_count++;
the_consumer_data.need_update = 1;
- rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
LTTNG_ASSERT(path);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
stream->net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
LTTNG_ASSERT(net_seq_idx != -1ULL);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
- rcu_read_unlock();
}
/*
*/
steal_channel_key(channel->key);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
&channel->channels_by_session_id_ht_node);
- rcu_read_unlock();
channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Only active streams with an active end point can be added to the
- * poll set and local stream storage of the thread.
- *
- * There is a potential race here for endpoint_status to be updated
- * just after the check. However, this is OK since the stream(s) will
- * be deleted once the thread is notified that the end point state has
- * changed where this function will be called back again.
- *
- * We track the number of inactive FDs because they still need to be
- * closed by the polling thread after a wakeup on the data_pipe or
- * metadata_pipe.
- */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
- (*nb_inactive_fd)++;
- continue;
+
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
+ */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
+ continue;
+ }
+
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
}
- /*
- * This clobbers way too much the debug output. Uncomment that if you
- * need it for debugging purposes.
- */
- (*pollfd)[i].fd = stream->wait_fd;
- (*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] = stream;
- i++;
}
- rcu_read_unlock();
/*
* Insert the consumer_data_pipe at the end of the array and don't
struct lttng_consumer_channel *channel;
unsigned int trace_chunks_left;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- consumer_del_channel(channel);
+ cds_lfht_for_each_entry (
+ the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ consumer_del_channel(channel);
+ }
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.channel_ht);
lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int ret;
int outfd = stream->out_fd;
/*
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd,
- orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
- SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- ret = posix_fadvise(
- outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
- if (ret && ret != -ENOSYS) {
- errno = ret;
- PERROR("posix_fadvise on fd %i", outfd);
- }
+ lttng::io::hint_flush_range_dont_need_sync(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
}
/*
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_stream(stream, ht);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_metadata_stream(stream, ht);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
size_t write_len;
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return ret;
}
}
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return written;
}
* after this point.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/*
* Lookup the stream just to make sure it does not exist in our internal
*/
lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
- rcu_read_unlock();
-
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
DBG("Consumer delete flagged data stream");
- rcu_read_lock();
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
+ }
+ /* Delete it right now */
+ consumer_del_stream(stream, data_ht);
}
- /* Delete it right now */
- consumer_del_stream(stream, data_ht);
}
- rcu_read_unlock();
}
/*
LTTNG_ASSERT(pollset);
- rcu_read_lock();
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /*
- * Remove from pollset so the metadata thread can continue without
- * blocking on a deleted stream.
- */
- lttng_poll_del(pollset, stream->wait_fd);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
+ }
+ /*
+ * Remove from pollset so the metadata thread can continue without
+ * blocking on a deleted stream.
+ */
+ lttng_poll_del(pollset, stream->wait_fd);
- /* Delete it right now */
- consumer_del_metadata_stream(stream, metadata_ht);
+ /* Delete it right now */
+ consumer_del_metadata_stream(stream, metadata_ht);
+ }
}
- rcu_read_unlock();
}
/*
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
&stream,
- sizeof(stream));
- if (pipe_len < sizeof(stream)) {
+ sizeof(stream)); /* NOLINT sizeof
+ used on a
+ pointer. */
+ if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+ pointer. */
if (pipe_len < 0) {
PERROR("read metadata stream");
}
continue;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
consumer_del_metadata_stream(stream, metadata_ht);
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the stream looked up */
- rcu_read_unlock();
}
}
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i, err = -1;
+ int num_rdy, high_prio, ret, i, err = -1;
struct pollfd *pollfd = nullptr;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
health_code_update();
high_prio = 0;
- num_hup = 0;
/*
* the fds set has been updated, we need to update our
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
- pipe_readlen = lttng_pipe_read(
- ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
- if (pipe_readlen < sizeof(new_stream)) {
+ pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+ &new_stream,
+ sizeof(new_stream)); /* NOLINT sizeof used on
+ a pointer. */
+ if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+ */
PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
continue;
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
}
if (local_stream[i] != nullptr) {
ht = the_consumer_data.stream_per_chan_id_ht;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct,
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
}
static void destroy_channel_ht(struct lttng_ht *ht)
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
- ret = lttng_ht_del(ht, &iter);
- LTTNG_ASSERT(ret != 0);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
+ ret = lttng_ht_del(ht, &iter);
+ LTTNG_ASSERT(ret != 0);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
switch (action) {
case CONSUMER_CHANNEL_ADD:
+ {
DBG("Adding channel %d to poll set", chan->wait_fd);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
- rcu_read_unlock();
/* Add channel to the global poll events list */
// FIXME: Empty flag on a pipe pollset, this might
// hang on FreeBSD.
lttng_poll_add(&events, chan->wait_fd, 0);
break;
+ }
case CONSUMER_CHANNEL_DEL:
{
/*
* GET_CHANNEL failed.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
chan = consumer_find_channel(key);
if (!chan) {
- rcu_read_unlock();
ERR("UST consumer get channel key %" PRIu64
" not found for del channel",
key);
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
- rcu_read_unlock();
goto restart;
}
case CONSUMER_CHANNEL_QUIT:
continue;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
}
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the channel looked up */
- rcu_read_unlock();
}
}
DBG("Consumer data pending command on session id %" PRIu64, id);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&the_consumer_data.lock);
switch (the_consumer_data.type) {
data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 0;
data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 1;
}
nullptr);
lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
end_unlock_channel:
pthread_mutex_unlock(&channel->lock);
end:
- rcu_read_unlock();
lttng_dynamic_array_reset(&stream_rotation_positions);
lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
return ret;
int ret;
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
pthread_mutex_unlock(&stream->lock);
}
pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
return 0;
error_unlock:
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
return ret;
}
ASSERT_RCU_READ_LOCKED();
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
ret = 0;
end:
- rcu_read_unlock();
return ret;
}
goto error;
}
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(
- the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
- the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id,
- &iter.iter,
- channel,
- channels_by_session_id_ht_node.node)
{
- ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
- if (ret) {
- /*
- * Roll-back the creation of this chunk.
- *
- * This is important since the session daemon will
- * assume that the creation of this chunk failed and
- * will never ask for it to be closed, resulting
- * in a leak and an inconsistent state for some
- * channels.
- */
- enum lttcomm_return_code close_ret;
- char path[LTTNG_PATH_MAX];
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry_duplicate(
+ the_consumer_data.channels_by_session_id_ht->ht,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
+ lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id,
+ &iter.iter,
+ channel,
+ channels_by_session_id_ht_node.node)
+ {
+ ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret =
+ lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ nullptr,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
- DBG("Failed to set new trace chunk on existing channels, rolling back");
- close_ret = lttng_consumer_close_trace_chunk(relayd_id,
- session_id,
- chunk_id,
- chunk_creation_timestamp,
- nullptr,
- path);
- if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- session_id,
- chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
}
-
- ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
- break;
}
}
}
}
error_unlock:
- rcu_read_unlock();
error:
/* Release the reference returned by the "publish" operation. */
lttng_trace_chunk_put(published_chunk);
* it; it is only kept around to compare it (by address) to the
* current chunk found in the session's channels.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- int ret;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (
+ the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ int ret;
- /*
- * Only change the channel's chunk to NULL if it still
- * references the chunk being closed. The channel may
- * reference a newer channel in the case of a session
- * rotation. When a session rotation occurs, the "next"
- * chunk is created before the "current" chunk is closed.
- */
- if (channel->trace_chunk != chunk) {
- continue;
- }
- ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
- if (ret) {
/*
- * Attempt to close the chunk on as many channels as
- * possible.
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
*/
- ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
}
}
-
if (relayd_id) {
int ret;
struct consumer_relayd_sock_pair *relayd;
}
}
error_unlock:
- rcu_read_unlock();
end:
/*
* Release the reference returned by the "find" operation and
const bool is_local_trace = !relayd_id;
struct consumer_relayd_sock_pair *relayd = nullptr;
bool chunk_exists_local, chunk_exists_remote;
+ lttng::urcu::read_lock_guard read_lock;
if (relayd_id) {
/* Only used for logging purposes. */
goto end;
}
- rcu_read_lock();
relayd = consumer_find_relayd(*relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, *relayd_id);
DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret_code;
}
ht = the_consumer_data.stream_per_chan_id_ht;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct,
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
return LTTCOMM_CONSUMERD_SUCCESS;
error_unlock:
pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
return ret;
}
goto end;
}
- rcu_read_lock();
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
- enum consumer_stream_open_packet_status status;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ enum consumer_stream_open_packet_status status;
- pthread_mutex_lock(&stream->lock);
- if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
- }
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
- status = consumer_stream_open_packet(stream);
- switch (status) {
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk = true;
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
- DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
- /*
- * Only unexpected internal errors can lead to this
- * failing. Report an unknown error.
- */
- ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
- ", channel id = %" PRIu64 ", channel name = %s"
- ", session id = %" PRIu64,
- stream->key,
- channel->key,
- channel->name,
- channel->session_id);
- ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
- goto error_unlock;
- default:
- abort();
- }
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64 ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key,
+ channel->key,
+ channel->name,
+ channel->session_id);
+ ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ goto error_unlock;
+ default:
+ abort();
+ }
- next:
- pthread_mutex_unlock(&stream->lock);
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
}
-
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret;