#include <common/common.hpp>
#include <common/compat/endian.hpp>
-#include <common/compat/fcntl.hpp>
#include <common/consumer/consumer-metadata-cache.hpp>
#include <common/consumer/consumer-stream.hpp>
#include <common/consumer/consumer-timer.hpp>
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/shm.hpp>
+#include <common/urcu.hpp>
#include <common/utils.hpp>
#include <lttng/ust-ctl.h>
#include <lttng/ust-sigbus.h>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ctx);
- if (ctx->on_recv_channel != NULL) {
+ if (ctx->on_recv_channel != nullptr) {
ret = ctx->on_recv_channel(channel);
if (ret == 0) {
ret = consumer_add_channel(channel, ctx);
int *_alloc_ret)
{
int alloc_ret;
- struct lttng_consumer_stream *stream = NULL;
+ struct lttng_consumer_stream *stream = nullptr;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ctx);
&alloc_ret,
channel->type,
channel->monitor);
- if (stream == NULL) {
+ if (stream == nullptr) {
switch (alloc_ret) {
case -ENOENT:
/*
stream->globally_visible = 1;
cds_list_del_init(&stream->send_node);
- ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a
+ pointer. */
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
int ret, cpu = 0;
struct lttng_ust_ctl_consumer_stream *ustream;
struct lttng_consumer_stream *stream;
- pthread_mutex_t *current_stream_lock = NULL;
+ pthread_mutex_t *current_stream_lock = nullptr;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ctx);
}
}
pthread_mutex_unlock(&stream->lock);
- current_stream_lock = NULL;
+ current_stream_lock = nullptr;
}
return 0;
}
/* Tell sessiond there is no more stream. */
- ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL);
+ ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr);
if (ret < 0) {
goto error;
}
DBG("UST consumer flush channel key %" PRIu64, chan_key);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
*/
sample_and_send_channel_buffer_stats(channel);
error:
- rcu_read_unlock();
return ret;
}
DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
pthread_mutex_unlock(&stream->lock);
}
error:
- rcu_read_unlock();
return ret;
}
* the stream is still in the local stream list of the channel. This call
* will make sure to clean that list.
*/
- consumer_stream_destroy(metadata->metadata_stream, NULL);
- metadata->metadata_stream = NULL;
+ consumer_stream_destroy(metadata->metadata_stream, nullptr);
+ metadata->metadata_stream = nullptr;
+ lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
send_streams_error:
error_no_stream:
end:
DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(!metadata_channel->monitor);
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
if (ret < 0) {
goto error;
}
* Clean up the stream completely because the next snapshot will use a
* new metadata stream.
*/
- consumer_stream_destroy(metadata_stream, NULL);
- metadata_channel->metadata_stream = NULL;
+ consumer_stream_destroy(metadata_stream, nullptr);
+ metadata_channel->metadata_stream = nullptr;
+ lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
error:
- rcu_read_unlock();
return ret;
}
LTTNG_ASSERT(ctx);
ASSERT_RCU_READ_LOCKED();
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
if (relayd_id != (uint64_t) -1ULL) {
use_relayd = 1;
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
return 0;
error_put_subbuf:
consumer_stream_close_output(stream);
error_unlock:
pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
return ret;
}
uint64_t len,
uint64_t version,
struct lttng_consumer_channel *channel,
- int timer,
+ bool invoked_by_timer,
int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
* channel is under a snapshot session type. No need to update
* the stream position in that scenario.
*/
- if (channel->metadata_stream != NULL) {
+ if (channel->metadata_stream != nullptr) {
pthread_mutex_lock(&channel->metadata_stream->lock);
metadata_stream_reset_cache_consumed_position(channel->metadata_stream);
pthread_mutex_unlock(&channel->metadata_stream->lock);
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
- health_code_update();
-
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
int ret_func;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttcomm_consumer_msg msg;
- struct lttng_consumer_channel *channel = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
health_code_update();
health_code_update();
/* relayd needs RCU read-side lock */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(index);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
DBG("Unable to find relayd %" PRIu64, index);
ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
}
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
- rcu_read_unlock();
return -ENOSYS;
}
case LTTNG_CONSUMER_DATA_PENDING:
channel = consumer_allocate_channel(
msg.u.ask_channel.key,
msg.u.ask_channel.session_id,
- msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
+ msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr,
msg.u.ask_channel.pathname,
msg.u.ask_channel.name,
msg.u.ask_channel.relayd_id,
health_code_update();
ret = lttng_ustconsumer_recv_metadata(
- sock, key, offset, len, version, found_channel, 0, 1);
+ sock, key, offset, len, version, found_channel, false, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
uint64_t key = msg.u.discarded_events.channel_key;
DBG("UST consumer discarded events command for session id %" PRIu64, id);
- rcu_read_lock();
pthread_mutex_lock(&the_consumer_data.lock);
ht = the_consumer_data.stream_list_ht;
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
DBG("UST consumer discarded events command for session id %" PRIu64
", channel key %" PRIu64,
uint64_t key = msg.u.lost_packets.channel_key;
DBG("UST consumer lost packets command for session id %" PRIu64, id);
- rcu_read_lock();
pthread_mutex_lock(&the_consumer_data.lock);
ht = the_consumer_data.stream_list_ht;
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
DBG("UST consumer lost packets command for session id %" PRIu64
", channel key %" PRIu64,
const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
msg.u.create_trace_chunk.override_name :
- NULL;
- struct lttng_directory_handle *chunk_directory_handle = NULL;
+ nullptr;
+ struct lttng_directory_handle *chunk_directory_handle = nullptr;
/*
* The session daemon will only provide a chunk directory file
}
ret_code = lttng_consumer_create_trace_chunk(
- !is_local_trace ? &relayd_id : NULL,
+ !is_local_trace ? &relayd_id : nullptr,
msg.u.create_trace_chunk.session_id,
msg.u.create_trace_chunk.chunk_id,
(time_t) msg.u.create_trace_chunk.creation_timestamp,
chunk_override_name,
- msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL,
+ msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr,
chunk_directory_handle);
lttng_directory_handle_put(chunk_directory_handle);
goto end_msg_sessiond;
int ret;
ret_code = lttng_consumer_close_trace_chunk(
- msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL,
+ msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr,
msg.u.close_trace_chunk.session_id,
msg.u.close_trace_chunk.chunk_id,
(time_t) msg.u.close_trace_chunk.close_timestamp,
- msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL,
+ msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr,
closed_trace_chunk_path);
reply.ret_code = ret_code;
reply.path_length = strlen(closed_trace_chunk_path) + 1;
const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
ret_code = lttng_consumer_trace_chunk_exists(
- msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL,
+ msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr,
msg.u.trace_chunk_exists.session_id,
msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
{
int ret_send_status;
- ret_send_status = consumer_send_status_channel(sock, NULL);
+ ret_send_status = consumer_send_status_channel(sock, nullptr);
if (ret_send_status < 0) {
/* Stop everything if session daemon can not be notified. */
goto error_fatal;
goto end;
end:
- rcu_read_unlock();
health_code_update();
return ret_func;
}
goto end;
}
stream->ust_metadata_pushed += write_len;
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
ret = write_len;
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
subbuffer->buffer.buffer =
lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
- LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL);
+ LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr);
end:
return ret;
}
DBG("UST consumer closing all metadata streams");
- rcu_read_lock();
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
- health_code_update();
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ health_code_update();
- pthread_mutex_lock(&stream->chan->lock);
- lttng_ustconsumer_close_metadata(stream->chan);
- pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ lttng_ustconsumer_close_metadata(stream->chan);
+ pthread_mutex_unlock(&stream->chan->lock);
+ }
}
- rcu_read_unlock();
}
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *channel,
- int timer,
+ bool invoked_by_timer,
int wait)
{
struct lttcomm_metadata_request_msg request;
health_code_update();
- ret = lttng_ustconsumer_recv_metadata(
- ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
+ ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+ key,
+ offset,
+ len,
+ version,
+ channel,
+ invoked_by_timer,
+ wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive