Clean-up: modernize pretty_xml.cpp
[lttng-tools.git] / src / common / consumer / consumer-timer.cpp
index 4e308383d22e5a1c22e7f9442c5b9a9dc87d1020..2ae7f0f73d1b2fdaffe59ed6b21b3814b78e8c22 100644 (file)
@@ -7,25 +7,24 @@
  */
 
 #define _LGPL_SOURCE
-#include <inttypes.h>
-#include <signal.h>
-
-#include <bin/lttng-consumerd/health-consumerd.hpp>
 #include <common/common.hpp>
 #include <common/compat/endian.hpp>
-#include <common/kernel-ctl/kernel-ctl.hpp>
-#include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/consumer/consumer-stream.hpp>
-#include <common/consumer/consumer-timer.hpp>
 #include <common/consumer/consumer-testpoint.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/urcu.hpp>
 #include <common/ust-consumer/ust-consumer.hpp>
 
-typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
-typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
-               unsigned long *consumed);
-typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
-               unsigned long *produced);
-typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <inttypes.h>
+#include <signal.h>
+
+using sample_positions_cb = int (*)(struct lttng_consumer_stream *);
+using get_consumed_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
+using get_produced_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
+using flush_index_cb = int (*)(struct lttng_consumer_stream *);
 
 static struct timer_signal_data timer_signal = {
        .tid = 0,
@@ -76,8 +75,7 @@ static int the_channel_monitor_pipe = -1;
  * while consumer_timer_switch_stop() is called. It would result in
  * deadlocks.
  */
-static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
-               siginfo_t *si)
+static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
@@ -98,7 +96,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
                 * - metadata_socket_lock
                 *   - Calling lttng_ustconsumer_recv_metadata():
                 *     - channel->metadata_cache->lock
-                *     - Calling consumer_metadata_cache_flushed():
+                *     - Calling consumer_wait_metadata_cache_flushed():
                 *       - channel->timer_lock
                 *         - channel->metadata_cache->lock
                 *
@@ -107,7 +105,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
                 * they are held while consumer_timer_switch_stop() is
                 * called.
                 */
-               ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
+               ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
                if (ret < 0) {
                        channel->switch_timer_error = 1;
                }
@@ -119,8 +117,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
        }
 }
 
-static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
-               uint64_t stream_id)
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, uint64_t stream_id)
 {
        int ret;
        struct ctf_packet_index index;
@@ -175,8 +172,7 @@ end:
        return ret;
 }
 
-static int check_stream(struct lttng_consumer_stream *stream,
-               flush_index_cb flush_index)
+static int check_stream(struct lttng_consumer_stream *stream, flush_index_cb flush_index)
 {
        int ret;
 
@@ -195,14 +191,14 @@ static int check_stream(struct lttng_consumer_stream *stream,
                ret = pthread_mutex_trylock(&stream->lock);
                switch (ret) {
                case 0:
-                       break;  /* We have the lock. */
+                       break; /* We have the lock. */
                case EBUSY:
                        pthread_mutex_lock(&stream->metadata_timer_lock);
                        if (stream->waiting_on_metadata) {
                                ret = 0;
                                stream->missed_metadata_flush = true;
                                pthread_mutex_unlock(&stream->metadata_timer_lock);
-                               goto end;       /* Bail out. */
+                               goto end; /* Bail out. */
                        }
                        pthread_mutex_unlock(&stream->metadata_timer_lock);
                        /* Try again. */
@@ -267,18 +263,16 @@ end:
 /*
  * Execute action on a live timer
  */
-static void live_timer(struct lttng_consumer_local_data *ctx,
-               siginfo_t *si)
+static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
-       const flush_index_cb flush_index =
-                       ctx->type == LTTNG_CONSUMER_KERNEL ?
-                                       consumer_flush_kernel_index :
-                                       consumer_flush_ust_index;
+       const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ?
+               consumer_flush_kernel_index :
+               consumer_flush_ust_index;
 
        channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
        LTTNG_ASSERT(channel);
@@ -289,26 +283,29 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
 
        DBG("Live timer for channel %" PRIu64, channel->key);
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct(&channel->key, lttng_ht_seed),
-                       ht->match_fct, &channel->key, &iter.iter,
-                       stream, node_channel_id.node) {
-               ret = check_stream(stream, flush_index);
-               if (ret < 0) {
-                       goto error_unlock;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                                                 ht->hash_fct(&channel->key, lttng_ht_seed),
+                                                 ht->match_fct,
+                                                 &channel->key,
+                                                 &iter.iter,
+                                                 stream,
+                                                 node_channel_id.node)
+               {
+                       ret = check_stream(stream, flush_index);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
                }
        }
-
 error_unlock:
-       rcu_read_unlock();
 
 error:
        return;
 }
 
-static
-void consumer_timer_signal_thread_qs(unsigned int signr)
+static void consumer_timer_signal_thread_qs(unsigned int signr)
 {
        sigset_t pending_set;
        int ret;
@@ -365,10 +362,10 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
  * Returns a negative value on error, 0 if a timer was created, and
  * a positive value if no timer was created (not an error).
  */
-static
-int consumer_channel_timer_start(timer_t *timer_id,
-               struct lttng_consumer_channel *channel,
-               unsigned int timer_interval_us, int signal)
+static int consumer_channel_timer_start(timer_t *timer_id,
+                                       struct lttng_consumer_channel *channel,
+                                       unsigned int timer_interval_us,
+                                       int signal)
 {
        int ret = 0, delete_ret;
        struct sigevent sev = {};
@@ -397,7 +394,7 @@ int consumer_channel_timer_start(timer_t *timer_id,
        its.it_interval.tv_sec = its.it_value.tv_sec;
        its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       ret = timer_settime(*timer_id, 0, &its, NULL);
+       ret = timer_settime(*timer_id, 0, &its, nullptr);
        if (ret == -1) {
                PERROR("timer_settime");
                goto error_destroy_timer;
@@ -412,8 +409,7 @@ error_destroy_timer:
        goto end;
 }
 
-static
-int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+static int consumer_channel_timer_stop(timer_t *timer_id, int signal)
 {
        int ret = 0;
 
@@ -424,7 +420,7 @@ int consumer_channel_timer_stop(timer_t *timer_id, int signal)
        }
 
        consumer_timer_signal_thread_qs(signal);
-       *timer_id = 0;
+       *timer_id = nullptr;
 end:
        return ret;
 }
@@ -433,15 +429,17 @@ end:
  * Set the channel's switch timer.
  */
 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
-               unsigned int switch_timer_interval_us)
+                                unsigned int switch_timer_interval_us)
 {
        int ret;
 
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(channel->key);
 
-       ret = consumer_channel_timer_start(&channel->switch_timer, channel,
-                       switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+       ret = consumer_channel_timer_start(&channel->switch_timer,
+                                          channel,
+                                          switch_timer_interval_us,
+                                          LTTNG_CONSUMER_SIG_SWITCH);
 
        channel->switch_timer_enabled = !!(ret == 0);
 }
@@ -455,8 +453,7 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 
        LTTNG_ASSERT(channel);
 
-       ret = consumer_channel_timer_stop(&channel->switch_timer,
-                       LTTNG_CONSUMER_SIG_SWITCH);
+       ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH);
        if (ret == -1) {
                ERR("Failed to stop switch timer");
        }
@@ -468,15 +465,15 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
  * Set the channel's live timer.
  */
 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
-               unsigned int live_timer_interval_us)
+                              unsigned int live_timer_interval_us)
 {
        int ret;
 
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(channel->key);
 
-       ret = consumer_channel_timer_start(&channel->live_timer, channel,
-                       live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
+       ret = consumer_channel_timer_start(
+               &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
 
        channel->live_timer_enabled = !!(ret == 0);
 }
@@ -490,8 +487,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
 
        LTTNG_ASSERT(channel);
 
-       ret = consumer_channel_timer_stop(&channel->live_timer,
-                       LTTNG_CONSUMER_SIG_LIVE);
+       ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE);
        if (ret == -1) {
                ERR("Failed to stop live timer");
        }
@@ -506,7 +502,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
  * a positive value if no timer was created (not an error).
  */
 int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
-               unsigned int monitor_timer_interval_us)
+                                unsigned int monitor_timer_interval_us)
 {
        int ret;
 
@@ -514,8 +510,10 @@ int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(channel->key);
        LTTNG_ASSERT(!channel->monitor_timer_enabled);
 
-       ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
-                       monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+       ret = consumer_channel_timer_start(&channel->monitor_timer,
+                                          channel,
+                                          monitor_timer_interval_us,
+                                          LTTNG_CONSUMER_SIG_MONITOR);
        channel->monitor_timer_enabled = !!(ret == 0);
        return ret;
 }
@@ -530,10 +528,9 @@ int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(channel->monitor_timer_enabled);
 
-       ret = consumer_channel_timer_stop(&channel->monitor_timer,
-                       LTTNG_CONSUMER_SIG_MONITOR);
+       ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR);
        if (ret == -1) {
-               ERR("Failed to stop live timer");
+               ERR("Failed to stop monitor timer");
                goto end;
        }
 
@@ -546,14 +543,14 @@ end:
  * Block the RT signals for the entire process. It must be called from the
  * consumer main before creating the threads
  */
-int consumer_signal_init(void)
+int consumer_signal_init()
 {
        int ret;
        sigset_t mask;
 
        /* Block signal for entire process, so only our thread processes it. */
        setmask(&mask);
-       ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+       ret = pthread_sigmask(SIG_BLOCK, &mask, nullptr);
        if (ret) {
                errno = ret;
                PERROR("pthread_sigmask");
@@ -562,11 +559,13 @@ int consumer_signal_init(void)
        return 0;
 }
 
-static
-int sample_channel_positions(struct lttng_consumer_channel *channel,
-               uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
-               sample_positions_cb sample, get_consumed_cb get_consumed,
-               get_produced_cb get_produced)
+static int sample_channel_positions(struct lttng_consumer_channel *channel,
+                                   uint64_t *_highest_use,
+                                   uint64_t *_lowest_use,
+                                   uint64_t *_total_consumed,
+                                   sample_positions_cb sample,
+                                   get_consumed_cb get_consumed,
+                                   get_produced_cb get_produced)
 {
        int ret = 0;
        struct lttng_ht_iter iter;
@@ -577,12 +576,16 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
 
        *_total_consumed = 0;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct(&channel->key, lttng_ht_seed),
-                       ht->match_fct, &channel->key,
-                       &iter.iter, stream, node_channel_id.node) {
+                                         ht->hash_fct(&channel->key, lttng_ht_seed),
+                                         ht->match_fct,
+                                         &channel->key,
+                                         &iter.iter,
+                                         stream,
+                                         node_channel_id.node)
+       {
                unsigned long produced, consumed, usage;
 
                empty_channel = false;
@@ -594,7 +597,8 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
 
                ret = sample(stream);
                if (ret) {
-                       ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+                       ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)",
+                           ret);
                        pthread_mutex_unlock(&stream->lock);
                        goto end;
                }
@@ -630,7 +634,6 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
        *_highest_use = high;
        *_lowest_use = low;
 end:
-       rcu_read_unlock();
        if (empty_channel) {
                ret = -1;
        }
@@ -641,13 +644,13 @@ end:
 void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel)
 {
        int ret;
-       int channel_monitor_pipe =
-                       consumer_timer_thread_get_channel_monitor_pipe();
+       int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe();
        struct lttcomm_consumer_channel_monitor_msg msg = {
                .key = channel->key,
+               .session_id = channel->session_id,
                .lowest = 0,
                .highest = 0,
-               .total_consumed = 0,
+               .consumed_since_last_sample = 0,
        };
        sample_positions_cb sample;
        get_consumed_cb get_consumed;
@@ -676,14 +679,15 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
                abort();
        }
 
-       ret = sample_channel_positions(channel, &highest, &lowest,
-                       &total_consumed, sample, get_consumed, get_produced);
+       ret = sample_channel_positions(
+               channel, &highest, &lowest, &total_consumed, sample, get_consumed, get_produced);
        if (ret) {
                return;
        }
+
        msg.highest = highest;
        msg.lowest = lowest;
-       msg.total_consumed = total_consumed;
+       msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent;
 
        /*
         * Writes performed here are assumed to be atomic which is only
@@ -698,18 +702,21 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
                if (errno == EAGAIN) {
                        /* Not an error, the sample is merely dropped. */
                        DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
-                                       channel->key);
+                           channel->key);
                } else {
                        PERROR("write to the channel monitor pipe");
                }
        } else {
                DBG("Sent channel monitoring sample for channel key %" PRIu64
-                               ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
-                               channel->key, msg.highest, msg.lowest);
+                   ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
+                   channel->key,
+                   msg.highest,
+                   msg.lowest);
+               channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample;
        }
 }
 
-int consumer_timer_thread_get_channel_monitor_pipe(void)
+int consumer_timer_thread_get_channel_monitor_pipe()
 {
        return uatomic_read(&the_channel_monitor_pipe);
 }
@@ -754,7 +761,7 @@ void *consumer_timer_thread(void *data)
        setmask(&mask);
        CMM_STORE_SHARED(timer_signal.tid, pthread_self());
 
-       while (1) {
+       while (true) {
                health_code_update();
 
                health_poll_entry();
@@ -799,5 +806,5 @@ error_testpoint:
 end:
        health_unregister(health_consumerd);
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
This page took 0.029507 seconds and 4 git commands to generate.