#include <common/consumer/consumer-timer.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/urcu.hpp>
#include <common/ust-consumer/ust-consumer.hpp>
#include <bin/lttng-consumerd/health-consumerd.hpp>
#include <inttypes.h>
#include <signal.h>
-typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
-typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, unsigned long *consumed);
-typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, unsigned long *produced);
-typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
+using sample_positions_cb = int (*)(struct lttng_consumer_stream *);
+using get_consumed_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
+using get_produced_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
+using flush_index_cb = int (*)(struct lttng_consumer_stream *);
static struct timer_signal_data timer_signal = {
.tid = 0,
* - metadata_socket_lock
* - Calling lttng_ustconsumer_recv_metadata():
* - channel->metadata_cache->lock
- * - Calling consumer_metadata_cache_flushed():
+ * - Calling consumer_wait_metadata_cache_flushed():
* - channel->timer_lock
* - channel->metadata_cache->lock
*
* they are held while consumer_timer_switch_stop() is
* called.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
if (ret < 0) {
channel->switch_timer_error = 1;
}
DBG("Live timer for channel %" PRIu64, channel->key);
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
{
- ret = check_stream(stream, flush_index);
- if (ret < 0) {
- goto error_unlock;
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
+ ret = check_stream(stream, flush_index);
+ if (ret < 0) {
+ goto error_unlock;
+ }
}
}
-
error_unlock:
- rcu_read_unlock();
error:
return;
its.it_interval.tv_sec = its.it_value.tv_sec;
its.it_interval.tv_nsec = its.it_value.tv_nsec;
- ret = timer_settime(*timer_id, 0, &its, NULL);
+ ret = timer_settime(*timer_id, 0, &its, nullptr);
if (ret == -1) {
PERROR("timer_settime");
goto error_destroy_timer;
}
consumer_timer_signal_thread_qs(signal);
- *timer_id = 0;
+ *timer_id = nullptr;
end:
return ret;
}
* Block the RT signals for the entire process. It must be called from the
* consumer main before creating the threads
*/
-int consumer_signal_init(void)
+int consumer_signal_init()
{
int ret;
sigset_t mask;
/* Block signal for entire process, so only our thread processes it. */
setmask(&mask);
- ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ ret = pthread_sigmask(SIG_BLOCK, &mask, nullptr);
if (ret) {
errno = ret;
PERROR("pthread_sigmask");
*_total_consumed = 0;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
*_highest_use = high;
*_lowest_use = low;
end:
- rcu_read_unlock();
if (empty_channel) {
ret = -1;
}
}
}
-int consumer_timer_thread_get_channel_monitor_pipe(void)
+int consumer_timer_thread_get_channel_monitor_pipe()
{
return uatomic_read(&the_channel_monitor_pipe);
}
setmask(&mask);
CMM_STORE_SHARED(timer_signal.tid, pthread_self());
- while (1) {
+ while (true) {
health_code_update();
health_poll_entry();
end:
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}