#include <common/common.hpp>
#include <common/compat/endian.hpp>
-#include <common/compat/fcntl.hpp>
#include <common/consumer/consumer-metadata-cache.hpp>
#include <common/consumer/consumer-stream.hpp>
#include <common/consumer/consumer-timer.hpp>
#include <common/consumer/consumer.hpp>
#include <common/index/index.hpp>
#include <common/optional.hpp>
+#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/shm.hpp>
+#include <common/urcu.hpp>
#include <common/utils.hpp>
#include <lttng/ust-ctl.h>
#include <lttng/ust-sigbus.h>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
int ret = 0;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(ctx);
if (ctx->on_recv_channel != nullptr) {
struct lttng_consumer_stream *stream = nullptr;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(ctx);
stream = consumer_stream_create(channel,
pthread_mutex_t *current_stream_lock = nullptr;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(ctx);
/*
struct lttng_ust_ctl_consumer_channel *ust_channel;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(attr);
LTTNG_ASSERT(ust_chanp);
LTTNG_ASSERT(channel->buffer_credentials.is_set);
int *relayd_error)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- struct lttng_consumer_stream *stream;
uint64_t net_seq_idx = -1ULL;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(sock >= 0);
DBG("UST consumer sending channel %s to sessiond", channel->name);
if (channel->relayd_id != (uint64_t) -1ULL) {
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream :
+ lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Try to send the stream to the relayd if one is available. */
}
/* The channel was sent successfully to the sessiond at this point. */
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Send stream to session daemon. */
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(attr);
/*
struct lttng_consumer_local_data *ctx)
{
int ret = 0;
- struct lttng_consumer_stream *stream, *stmp;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(ctx);
/* Send streams to the corresponding thread. */
- cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Sending the stream to the thread. */
{
int ret = 0;
struct lttng_consumer_channel *channel;
- struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
- struct lttng_ht_iter iter;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
DBG("UST consumer flush channel key %" PRIu64, chan_key);
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
goto error;
}
- ht = the_consumer_data.stream_per_chan_id_ht;
-
/* For each stream of the channel id, flush it. */
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
pthread_mutex_lock(&stream->lock);
*/
sample_and_send_channel_buffer_stats(channel);
error:
- rcu_read_unlock();
return ret;
}
*/
static int clear_quiescent_channel(uint64_t chan_key)
{
- int ret = 0;
- struct lttng_consumer_channel *channel;
- struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
- struct lttng_ht_iter iter;
-
DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
- rcu_read_lock();
- channel = consumer_find_channel(chan_key);
+ const lttng::urcu::read_lock_guard read_lock;
+ auto channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
- ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
- goto error;
+ return LTTNG_ERR_UST_CHAN_NOT_FOUND;
}
- ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
/* For each stream of the channel id, clear quiescent state. */
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
stream->quiescent = false;
- pthread_mutex_unlock(&stream->lock);
}
-error:
- rcu_read_unlock();
- return ret;
+
+ return 0;
}
/*
*/
consumer_stream_destroy(metadata->metadata_stream, nullptr);
metadata->metadata_stream = nullptr;
+ metadata->metadata_pushed_wait_queue.wake_all();
+
send_streams_error:
error_no_stream:
end:
DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(!metadata_channel->monitor);
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
if (ret < 0) {
goto error;
}
*/
consumer_stream_destroy(metadata_stream, nullptr);
metadata_channel->metadata_stream = nullptr;
+ metadata_channel->metadata_pushed_wait_queue.wake_all();
error:
- rcu_read_unlock();
return ret;
}
int ret;
unsigned use_relayd = 0;
unsigned long consumed_pos, produced_pos;
- struct lttng_consumer_stream *stream;
LTTNG_ASSERT(path);
LTTNG_ASSERT(ctx);
ASSERT_RCU_READ_LOCKED();
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
if (relayd_id != (uint64_t) -1ULL) {
use_relayd = 1;
LTTNG_ASSERT(!channel->monitor);
DBG("UST consumer snapshot channel %" PRIu64, key);
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Lock stream because we are about to change its state. */
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
LTTNG_ASSERT(channel->trace_chunk);
if (!lttng_trace_chunk_get(channel->trace_chunk)) {
/*
* holds a reference to the trace chunk.
*/
ERR("Failed to acquire reference to channel's trace chunk");
- ret = -1;
- goto error_unlock;
+ return -1;
}
LTTNG_ASSERT(!stream->trace_chunk);
stream->trace_chunk = channel->trace_chunk;
stream->net_seq_idx = relayd_id;
+ /* Close stream output when were are done. */
+ const auto close_stream_output = lttng::make_scope_exit(
+ [stream]() noexcept { consumer_stream_close_output(stream); });
+
if (use_relayd) {
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
- goto error_close_stream;
+ return ret;
}
} else {
ret = consumer_stream_create_output_files(stream, false);
if (ret < 0) {
- goto error_close_stream;
+ return ret;
}
+
DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
}
", channel name = '%s'",
channel->key,
channel->name);
- goto error_unlock;
+ return ret;
}
}
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking UST snapshot");
- goto error_close_stream;
+ return ret;
}
ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced UST snapshot position");
- goto error_close_stream;
+ return ret;
}
ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd UST snapshot position");
- goto error_close_stream;
+ return ret;
}
/*
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("lttng_ust_ctl_get_subbuf snapshot");
- goto error_close_stream;
+ return ret;
}
+
DBG("UST consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
stream->chan->lost_packets++;
continue;
}
+ /* Put the subbuffer once we are done. */
+ const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+ if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
+ ERR("Snapshot lttng_ust_ctl_put_subbuf");
+ }
+ });
+
ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
if (ret < 0) {
ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
- goto error_put_subbuf;
+ return ret;
}
ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
if (ret < 0) {
ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
- goto error_put_subbuf;
+ return ret;
}
ret = get_current_subbuf_addr(stream, &subbuf_addr);
if (ret) {
- goto error_put_subbuf;
+ return ret;
}
subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
stream, &subbuf_view, padded_len - len);
if (use_relayd) {
if (read_len != len) {
- ret = -EPERM;
- goto error_put_subbuf;
+ return -EPERM;
}
} else {
if (read_len != padded_len) {
- ret = -EPERM;
- goto error_put_subbuf;
+ return -EPERM;
}
}
- ret = lttng_ust_ctl_put_subbuf(stream->ustream);
- if (ret < 0) {
- ERR("Snapshot lttng_ust_ctl_put_subbuf");
- goto error_close_stream;
- }
consumed_pos += stream->max_sb_size;
}
/* Simply close the stream so we can use it on the next snapshot. */
consumer_stream_close_output(stream);
- pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
return 0;
-
-error_put_subbuf:
- if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
- ERR("Snapshot lttng_ust_ctl_put_subbuf");
- }
-error_close_stream:
- consumer_stream_close_output(stream);
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
- return ret;
}
static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
uint64_t len,
uint64_t version,
struct lttng_consumer_channel *channel,
- int timer,
+ bool invoked_by_timer,
int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
-
- health_code_update();
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
health_code_update();
/* relayd needs RCU read-side lock */
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
- uint32_t major = msg.u.relayd_sock.major;
- uint32_t minor = msg.u.relayd_sock.minor;
- enum lttcomm_sock_proto protocol =
+ const uint32_t major = msg.u.relayd_sock.major;
+ const uint32_t minor = msg.u.relayd_sock.minor;
+ const lttcomm_sock_proto protocol =
(enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
/* Session daemon status message are handled in the following call. */
}
case LTTNG_CONSUMER_DESTROY_RELAYD:
{
- uint64_t index = msg.u.destroy_relayd.net_seq_idx;
+ const uint64_t index = msg.u.destroy_relayd.net_seq_idx;
struct consumer_relayd_sock_pair *relayd;
DBG("UST consumer destroying relayd %" PRIu64, index);
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
- rcu_read_unlock();
return -ENOSYS;
}
case LTTNG_CONSUMER_DATA_PENDING:
{
int is_data_pending;
ssize_t ret_send;
- uint64_t id = msg.u.data_pending.session_id;
+ const uint64_t id = msg.u.data_pending.session_id;
DBG("UST consumer data pending command for id %" PRIu64, id);
case LTTNG_CONSUMER_GET_CHANNEL:
{
int ret, relayd_err = 0;
- uint64_t key = msg.u.get_channel.key;
+ const uint64_t key = msg.u.get_channel.key;
struct lttng_consumer_channel *found_channel;
found_channel = consumer_find_channel(key);
}
case LTTNG_CONSUMER_DESTROY_CHANNEL:
{
- uint64_t key = msg.u.destroy_channel.key;
+ const uint64_t key = msg.u.destroy_channel.key;
/*
* Only called if streams have not been sent to stream
case LTTNG_CONSUMER_PUSH_METADATA:
{
int ret;
- uint64_t len = msg.u.push_metadata.len;
- uint64_t key = msg.u.push_metadata.key;
- uint64_t offset = msg.u.push_metadata.target_offset;
- uint64_t version = msg.u.push_metadata.version;
+ const uint64_t len = msg.u.push_metadata.len;
+ const uint64_t key = msg.u.push_metadata.key;
+ const uint64_t offset = msg.u.push_metadata.target_offset;
+ const uint64_t version = msg.u.push_metadata.version;
struct lttng_consumer_channel *found_channel;
DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
health_code_update();
ret = lttng_ustconsumer_recv_metadata(
- sock, key, offset, len, version, found_channel, 0, 1);
+ sock, key, offset, len, version, found_channel, false, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
{
struct lttng_consumer_channel *found_channel;
- uint64_t key = msg.u.snapshot_channel.key;
+ const uint64_t key = msg.u.snapshot_channel.key;
int ret_send;
found_channel = consumer_find_channel(key);
{
int ret = 0;
uint64_t discarded_events;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- uint64_t id = msg.u.discarded_events.session_id;
- uint64_t key = msg.u.discarded_events.channel_key;
+ const auto id = msg.u.discarded_events.session_id;
+ const auto key = msg.u.discarded_events.channel_key;
DBG("UST consumer discarded events command for session id %" PRIu64, id);
- rcu_read_lock();
- pthread_mutex_lock(&the_consumer_data.lock);
-
- ht = the_consumer_data.stream_list_ht;
-
- /*
- * We only need a reference to the channel, but they are not
- * directly indexed, so we just use the first matching stream
- * to extract the information we need, we default to 0 if not
- * found (no events are dropped if the channel is not yet in
- * use).
- */
- discarded_events = 0;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
{
- if (stream->chan->key == key) {
- discarded_events = stream->chan->discarded_events;
- break;
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+ const auto ht = the_consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no events are dropped if the channel is not yet in
+ * use).
+ */
+ discarded_events = 0;
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht,
+ &id,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct)) {
+ if (stream->chan->key == key) {
+ discarded_events = stream->chan->discarded_events;
+ break;
+ }
}
}
- pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
DBG("UST consumer discarded events command for session id %" PRIu64
", channel key %" PRIu64,
{
int ret;
uint64_t lost_packets;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- uint64_t id = msg.u.lost_packets.session_id;
- uint64_t key = msg.u.lost_packets.channel_key;
+ const auto id = msg.u.lost_packets.session_id;
+ const auto key = msg.u.lost_packets.channel_key;
DBG("UST consumer lost packets command for session id %" PRIu64, id);
- rcu_read_lock();
- pthread_mutex_lock(&the_consumer_data.lock);
-
- ht = the_consumer_data.stream_list_ht;
-
- /*
- * We only need a reference to the channel, but they are not
- * directly indexed, so we just use the first matching stream
- * to extract the information we need, we default to 0 if not
- * found (no packets lost if the channel is not yet in use).
- */
- lost_packets = 0;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
{
- if (stream->chan->key == key) {
- lost_packets = stream->chan->lost_packets;
- break;
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+ const auto ht = the_consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no packets lost if the channel is not yet in use).
+ */
+ lost_packets = 0;
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht,
+ &id,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct)) {
+ if (stream->chan->key == key) {
+ lost_packets = stream->chan->lost_packets;
+ break;
+ }
}
}
- pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
DBG("UST consumer lost packets command for session id %" PRIu64
", channel key %" PRIu64,
case LTTNG_CONSUMER_ROTATE_CHANNEL:
{
struct lttng_consumer_channel *found_channel;
- uint64_t key = msg.u.rotate_channel.key;
+ const uint64_t key = msg.u.rotate_channel.key;
int ret_send_status;
found_channel = consumer_find_channel(key);
case LTTNG_CONSUMER_CLEAR_CHANNEL:
{
struct lttng_consumer_channel *found_channel;
- uint64_t key = msg.u.clear_channel.key;
+ const uint64_t key = msg.u.clear_channel.key;
int ret_send_status;
found_channel = consumer_find_channel(key);
goto end;
end:
- rcu_read_unlock();
health_code_update();
return ret_func;
}
ret = write_len;
goto end;
}
+
stream->ust_metadata_pushed += write_len;
+ stream->chan->metadata_pushed_wait_queue.wake_all();
LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
ret = write_len;
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
*/
void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
LTTNG_ASSERT(metadata_ht);
LTTNG_ASSERT(metadata_ht->ht);
DBG("UST consumer closing all metadata streams");
- rcu_read_lock();
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
health_code_update();
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
lttng_ustconsumer_close_metadata(stream->chan);
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
}
- rcu_read_unlock();
}
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *channel,
- int timer,
+ bool invoked_by_timer,
int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
- enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ const lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
uint64_t len, key, offset, version;
int ret;
LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(!channel->is_deleted);
LTTNG_ASSERT(channel->metadata_cache);
memset(&request, 0, sizeof(request));
health_code_update();
- ret = lttng_ustconsumer_recv_metadata(
- ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
+ ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+ key,
+ offset,
+ len,
+ version,
+ channel,
+ invoked_by_timer,
+ wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive