X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-timer.cpp;h=2ae7f0f73d1b2fdaffe59ed6b21b3814b78e8c22;hb=HEAD;hp=7f88a9712825de9151c4d082f6951ad11e81db1a;hpb=cd9adb8b829564212158943a0d279bb35322ab30;p=lttng-tools.git diff --git a/src/common/consumer/consumer-timer.cpp b/src/common/consumer/consumer-timer.cpp index 7f88a9712..f79a8061b 100644 --- a/src/common/consumer/consumer-timer.cpp +++ b/src/common/consumer/consumer-timer.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -81,6 +82,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo channel = (lttng_consumer_channel *) si->si_value.sival_ptr; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); if (channel->switch_timer_error) { return; @@ -95,7 +97,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo * - metadata_socket_lock * - Calling lttng_ustconsumer_recv_metadata(): * - channel->metadata_cache->lock - * - Calling consumer_metadata_cache_flushed(): + * - Calling consumer_wait_metadata_cache_flushed(): * - channel->timer_lock * - channel->metadata_cache->lock * @@ -104,7 +106,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo * they are held while consumer_timer_switch_stop() is * called. */ - ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1); + ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1); if (ret < 0) { channel->switch_timer_error = 1; } @@ -265,43 +267,34 @@ end: static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si) { int ret; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ? consumer_flush_kernel_index : consumer_flush_ust_index; - channel = (lttng_consumer_channel *) si->si_value.sival_ptr; + auto *channel = (lttng_consumer_channel *) si->si_value.sival_ptr; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); if (channel->switch_timer_error) { - goto error; + return; } DBG("Live timer for channel %" PRIu64, channel->key); - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { ret = check_stream(stream, flush_index); if (ret < 0) { - goto error_unlock; + return; } } - -error_unlock: - rcu_read_unlock(); - -error: - return; } static void consumer_timer_signal_thread_qs(unsigned int signr) @@ -372,6 +365,7 @@ static int consumer_channel_timer_start(timer_t *timer_id, LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->key); + LTTNG_ASSERT(!channel->is_deleted); if (timer_interval_us == 0) { /* No creation needed; not an error. */ @@ -434,6 +428,7 @@ void consumer_timer_switch_start(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->key); + LTTNG_ASSERT(!channel->is_deleted); ret = consumer_channel_timer_start(&channel->switch_timer, channel, @@ -470,6 +465,7 @@ void consumer_timer_live_start(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->key); + LTTNG_ASSERT(!channel->is_deleted); ret = consumer_channel_timer_start( &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); @@ -507,6 +503,7 @@ int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->key); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(!channel->monitor_timer_enabled); ret = consumer_channel_timer_start(&channel->monitor_timer, @@ -529,7 +526,7 @@ int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR); if (ret == -1) { - ERR("Failed to stop live timer"); + ERR("Failed to stop monitor timer"); goto end; } @@ -567,24 +564,20 @@ static int sample_channel_positions(struct lttng_consumer_channel *channel, get_produced_cb get_produced) { int ret = 0; - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; bool empty_channel = true; uint64_t high = 0, low = UINT64_MAX; struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; *_total_consumed = 0; - rcu_read_lock(); - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { unsigned long produced, consumed, usage; empty_channel = false; @@ -633,10 +626,10 @@ static int sample_channel_positions(struct lttng_consumer_channel *channel, *_highest_use = high; *_lowest_use = low; end: - rcu_read_unlock(); if (empty_channel) { ret = -1; } + return ret; } @@ -644,7 +637,7 @@ end: void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel) { int ret; - int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe(); + const int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe(); struct lttcomm_consumer_channel_monitor_msg msg = { .key = channel->key, .session_id = channel->session_id, @@ -687,7 +680,8 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel msg.highest = highest; msg.lowest = lowest; - msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent; + msg.consumed_since_last_sample = + total_consumed - channel->consumed_size_as_of_last_sample_sent; /* * Writes performed here are assumed to be atomic which is only @@ -712,7 +706,7 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel channel->key, msg.highest, msg.lowest); - channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample; + channel->consumed_size_as_of_last_sample_sent = total_consumed; } }