consumerd: refactor: combine duplicated check_*_functions
[lttng-tools.git] / src / common / consumer / consumer-timer.c
index 931f7471fc6fe974982ff1cb687750ff68f80a7c..c190d3b627d0e8bc2ffc079bcc778456d3d22da4 100644 (file)
@@ -1,19 +1,9 @@
 /*
- * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
- *                      David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #define _LGPL_SOURCE
 #include <common/consumer/consumer-testpoint.h>
 #include <common/ust-consumer/ust-consumer.h>
 
+typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
+               unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
+               unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
+
 static struct timer_signal_data timer_signal = {
        .tid = 0,
        .setup_done = 0,
@@ -61,8 +58,18 @@ static void setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigaddset live");
        }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
+       if (ret) {
+               PERROR("sigaddset monitor");
+       }
+       ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+       if (ret) {
+               PERROR("sigaddset exit");
+       }
 }
 
+static int channel_monitor_pipe = -1;
+
 /*
  * Execute action on a timer switch.
  *
@@ -71,7 +78,7 @@ static void setmask(sigset_t *mask)
  * deadlocks.
  */
 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
-               int sig, siginfo_t *si, void *uc)
+               siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
@@ -148,7 +155,7 @@ int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
        }
        ret = kernctl_snapshot(stream->wait_fd);
        if (ret < 0) {
-               if (errno != EAGAIN && errno != ENODATA) {
+               if (ret != -EAGAIN && ret != -ENODATA) {
                        PERROR("live timer kernel snapshot");
                        ret = -1;
                        goto end;
@@ -169,7 +176,8 @@ end:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+static int check_stream(struct lttng_consumer_stream *stream,
+               flush_index_cb flush_index)
 {
        int ret;
 
@@ -208,7 +216,7 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream)
                }
                break;
        }
-       ret = consumer_flush_kernel_index(stream);
+       ret = flush_index(stream);
        pthread_mutex_unlock(&stream->lock);
 end:
        return ret;
@@ -253,64 +261,21 @@ end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-       assert(stream->ustream);
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        *
-        * Doing a trylock and checking if waiting on metadata if
-        * trylock fails. Bail out of the stream is indeed waiting for
-        * metadata to be pushed. Busy wait on trylock otherwise.
-        */
-       for (;;) {
-               ret = pthread_mutex_trylock(&stream->lock);
-               switch (ret) {
-               case 0:
-                       break;  /* We have the lock. */
-               case EBUSY:
-                       pthread_mutex_lock(&stream->metadata_timer_lock);
-                       if (stream->waiting_on_metadata) {
-                               ret = 0;
-                               stream->missed_metadata_flush = true;
-                               pthread_mutex_unlock(&stream->metadata_timer_lock);
-                               goto end;       /* Bail out. */
-                       }
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
-                       /* Try again. */
-                       caa_cpu_relax();
-                       continue;
-               default:
-                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
-                       ret = -1;
-                       goto end;
-               }
-               break;
-       }
-       ret = consumer_flush_ust_index(stream);
-       pthread_mutex_unlock(&stream->lock);
-end:
-       return ret;
-}
-
 /*
  * Execute action on a live timer
  */
 static void live_timer(struct lttng_consumer_local_data *ctx,
-               int sig, siginfo_t *si, void *uc)
+               siginfo_t *si)
 {
        int ret;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
        struct lttng_ht_iter iter;
+       const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+       const flush_index_cb flush_index =
+                       ctx->type == LTTNG_CONSUMER_KERNEL ?
+                                       consumer_flush_kernel_index :
+                                       consumer_flush_ust_index;
 
        channel = si->si_value.sival_ptr;
        assert(channel);
@@ -318,38 +283,18 @@ static void live_timer(struct lttng_consumer_local_data *ctx,
        if (channel->switch_timer_error) {
                goto error;
        }
-       ht = consumer_data.stream_per_chan_id_ht;
 
        DBG("Live timer for channel %" PRIu64, channel->key);
 
        rcu_read_lock();
-       switch (ctx->type) {
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&channel->key, lttng_ht_seed),
-                               ht->match_fct, &channel->key, &iter.iter,
-                               stream, node_channel_id.node) {
-                       ret = check_ust_stream(stream);
-                       if (ret < 0) {
-                               goto error_unlock;
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER_KERNEL:
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&channel->key, lttng_ht_seed),
-                               ht->match_fct, &channel->key, &iter.iter,
-                               stream, node_channel_id.node) {
-                       ret = check_kernel_stream(stream);
-                       if (ret < 0) {
-                               goto error_unlock;
-                       }
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key, &iter.iter,
+                       stream, node_channel_id.node) {
+               ret = check_stream(stream, flush_index);
+               if (ret < 0) {
+                       goto error_unlock;
                }
-               break;
-       case LTTNG_CONSUMER_UNKNOWN:
-               assert(0);
-               break;
        }
 
 error_unlock:
@@ -381,7 +326,7 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
                if (ret == -1) {
                        PERROR("sigpending");
                }
-               if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
+               if (!sigismember(&pending_set, signr)) {
                        break;
                }
                caa_cpu_relax();
@@ -411,44 +356,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr)
 }
 
 /*
- * Set the timer for periodical metadata flush.
+ * Start a timer channel timer which will fire at a given interval
+ * (timer_interval_us)and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
  */
-void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
-               unsigned int switch_timer_interval)
+static
+int consumer_channel_timer_start(timer_t *timer_id,
+               struct lttng_consumer_channel *channel,
+               unsigned int timer_interval_us, int signal)
 {
-       int ret;
+       int ret = 0, delete_ret;
        struct sigevent sev;
        struct itimerspec its;
 
        assert(channel);
        assert(channel->key);
 
-       if (switch_timer_interval == 0) {
-               return;
+       if (timer_interval_us == 0) {
+               /* No creation needed; not an error. */
+               ret = 1;
+               goto end;
        }
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
+       sev.sigev_signo = signal;
        sev.sigev_value.sival_ptr = channel;
-       ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
+       ret = timer_create(CLOCKID, &sev, timer_id);
        if (ret == -1) {
                PERROR("timer_create");
+               goto end;
        }
-       channel->switch_timer_enabled = 1;
 
-       its.it_value.tv_sec = switch_timer_interval / 1000000;
-       its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
+       its.it_value.tv_sec = timer_interval_us / 1000000;
+       its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
        its.it_interval.tv_sec = its.it_value.tv_sec;
        its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       ret = timer_settime(channel->switch_timer, 0, &its, NULL);
+       ret = timer_settime(*timer_id, 0, &its, NULL);
        if (ret == -1) {
                PERROR("timer_settime");
+               goto error_destroy_timer;
        }
+end:
+       return ret;
+error_destroy_timer:
+       delete_ret = timer_delete(*timer_id);
+       if (delete_ret == -1) {
+               PERROR("timer_delete");
+       }
+       goto end;
+}
+
+static
+int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+{
+       int ret = 0;
+
+       ret = timer_delete(*timer_id);
+       if (ret == -1) {
+               PERROR("timer_delete");
+               goto end;
+       }
+
+       consumer_timer_signal_thread_qs(signal);
+       *timer_id = 0;
+end:
+       return ret;
+}
+
+/*
+ * Set the channel's switch timer.
+ */
+void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
+               unsigned int switch_timer_interval_us)
+{
+       int ret;
+
+       assert(channel);
+       assert(channel->key);
+
+       ret = consumer_channel_timer_start(&channel->switch_timer, channel,
+                       switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+
+       channel->switch_timer_enabled = !!(ret == 0);
 }
 
 /*
- * Stop and delete timer.
+ * Stop and delete the channel's switch timer.
  */
 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 {
@@ -456,72 +452,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
 
        assert(channel);
 
-       ret = timer_delete(channel->switch_timer);
+       ret = consumer_channel_timer_stop(&channel->switch_timer,
+                       LTTNG_CONSUMER_SIG_SWITCH);
        if (ret == -1) {
-               PERROR("timer_delete");
+               ERR("Failed to stop switch timer");
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
-
-       channel->switch_timer = 0;
        channel->switch_timer_enabled = 0;
 }
 
 /*
- * Set the timer for the live mode.
+ * Set the channel's live timer.
  */
 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
-               int live_timer_interval)
+               unsigned int live_timer_interval_us)
 {
        int ret;
-       struct sigevent sev;
-       struct itimerspec its;
 
        assert(channel);
        assert(channel->key);
 
-       if (live_timer_interval <= 0) {
-               return;
-       }
+       ret = consumer_channel_timer_start(&channel->live_timer, channel,
+                       live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
 
-       sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
-       sev.sigev_value.sival_ptr = channel;
-       ret = timer_create(CLOCKID, &sev, &channel->live_timer);
-       if (ret == -1) {
-               PERROR("timer_create");
-       }
-       channel->live_timer_enabled = 1;
+       channel->live_timer_enabled = !!(ret == 0);
+}
 
-       its.it_value.tv_sec = live_timer_interval / 1000000;
-       its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
-       its.it_interval.tv_sec = its.it_value.tv_sec;
-       its.it_interval.tv_nsec = its.it_value.tv_nsec;
+/*
+ * Stop and delete the channel's live timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       assert(channel);
 
-       ret = timer_settime(channel->live_timer, 0, &its, NULL);
+       ret = consumer_channel_timer_stop(&channel->live_timer,
+                       LTTNG_CONSUMER_SIG_LIVE);
        if (ret == -1) {
-               PERROR("timer_settime");
+               ERR("Failed to stop live timer");
        }
+
+       channel->live_timer_enabled = 0;
 }
 
 /*
- * Stop and delete timer.
+ * Set the channel's monitoring timer.
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
  */
-void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+               unsigned int monitor_timer_interval_us)
+{
+       int ret;
+
+       assert(channel);
+       assert(channel->key);
+       assert(!channel->monitor_timer_enabled);
+
+       ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
+                       monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+       channel->monitor_timer_enabled = !!(ret == 0);
+       return ret;
+}
+
+/*
+ * Stop and delete the channel's monitoring timer.
+ */
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
 {
        int ret;
 
        assert(channel);
+       assert(channel->monitor_timer_enabled);
 
-       ret = timer_delete(channel->live_timer);
+       ret = consumer_channel_timer_stop(&channel->monitor_timer,
+                       LTTNG_CONSUMER_SIG_MONITOR);
        if (ret == -1) {
-               PERROR("timer_delete");
+               ERR("Failed to stop live timer");
+               goto end;
        }
 
-       consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
-       channel->live_timer = 0;
-       channel->live_timer_enabled = 0;
+       channel->monitor_timer_enabled = 0;
+end:
+       return ret;
 }
 
 /*
@@ -544,9 +559,176 @@ int consumer_signal_init(void)
        return 0;
 }
 
+static
+int sample_channel_positions(struct lttng_consumer_channel *channel,
+               uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
+               sample_positions_cb sample, get_consumed_cb get_consumed,
+               get_produced_cb get_produced)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       bool empty_channel = true;
+       uint64_t high = 0, low = UINT64_MAX;
+       struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+       *_total_consumed = 0;
+
+       rcu_read_lock();
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               unsigned long produced, consumed, usage;
+
+               empty_channel = false;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               ret = sample(stream);
+               if (ret) {
+                       ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+               ret = get_consumed(stream, &consumed);
+               if (ret) {
+                       ERR("Failed to get buffer consumed position in monitor timer");
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+               ret = get_produced(stream, &produced);
+               if (ret) {
+                       ERR("Failed to get buffer produced position in monitor timer");
+                       pthread_mutex_unlock(&stream->lock);
+                       goto end;
+               }
+
+               usage = produced - consumed;
+               high = (usage > high) ? usage : high;
+               low = (usage < low) ? usage : low;
+
+               /*
+                * We don't use consumed here for 2 reasons:
+                *  - output_written takes into account the padding written in the
+                *    tracefiles when we stop the session;
+                *  - the consumed position is not the accurate representation of what
+                *    was extracted from a buffer in overwrite mode.
+                */
+               *_total_consumed += stream->output_written;
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       *_highest_use = high;
+       *_lowest_use = low;
+end:
+       rcu_read_unlock();
+       if (empty_channel) {
+               ret = -1;
+       }
+       return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_channel *channel)
+{
+       int ret;
+       int channel_monitor_pipe =
+                       consumer_timer_thread_get_channel_monitor_pipe();
+       struct lttcomm_consumer_channel_monitor_msg msg = {
+               .key = channel->key,
+       };
+       sample_positions_cb sample;
+       get_consumed_cb get_consumed;
+       get_produced_cb get_produced;
+       uint64_t lowest = 0, highest = 0, total_consumed = 0;
+
+       assert(channel);
+
+       if (channel_monitor_pipe < 0) {
+               return;
+       }
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               sample = lttng_kconsumer_sample_snapshot_positions;
+               get_consumed = lttng_kconsumer_get_consumed_snapshot;
+               get_produced = lttng_kconsumer_get_produced_snapshot;
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               sample = lttng_ustconsumer_sample_snapshot_positions;
+               get_consumed = lttng_ustconsumer_get_consumed_snapshot;
+               get_produced = lttng_ustconsumer_get_produced_snapshot;
+               break;
+       default:
+               abort();
+       }
+
+       ret = sample_channel_positions(channel, &highest, &lowest,
+                       &total_consumed, sample, get_consumed, get_produced);
+       if (ret) {
+               return;
+       }
+       msg.highest = highest;
+       msg.lowest = lowest;
+       msg.total_consumed = total_consumed;
+
+       /*
+        * Writes performed here are assumed to be atomic which is only
+        * guaranteed for sizes < than PIPE_BUF.
+        */
+       assert(sizeof(msg) <= PIPE_BUF);
+
+       do {
+               ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+       } while (ret == -1 && errno == EINTR);
+       if (ret == -1) {
+               if (errno == EAGAIN) {
+                       /* Not an error, the sample is merely dropped. */
+                       DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
+                                       channel->key);
+               } else {
+                       PERROR("write to the channel monitor pipe");
+               }
+       } else {
+               DBG("Sent channel monitoring sample for channel key %" PRIu64
+                               ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
+                               channel->key, msg.highest, msg.lowest);
+       }
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+       return uatomic_read(&channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+       int ret;
+
+       ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+       if (ret != -1) {
+               ret = -1;
+               goto end;
+       }
+       ret = 0;
+end:
+       return ret;
+}
+
 /*
  * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
- * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
  */
 void *consumer_timer_thread(void *data)
 {
@@ -575,20 +757,34 @@ void *consumer_timer_thread(void *data)
                health_poll_entry();
                signr = sigwaitinfo(&mask, &info);
                health_poll_exit();
+
+               /*
+                * NOTE: cascading conditions are used instead of a switch case
+                * since the use of SIGRTMIN in the definition of the signals'
+                * values prevents the reduction to an integer constant.
+                */
                if (signr == -1) {
                        if (errno != EINTR) {
                                PERROR("sigwaitinfo");
                        }
                        continue;
                } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
-                       metadata_switch_timer(ctx, info.si_signo, &info, NULL);
+                       metadata_switch_timer(ctx, &info);
                } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
                        cmm_smp_mb();
                        DBG("Signal timer metadata thread teardown");
                } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
-                       live_timer(ctx, info.si_signo, &info, NULL);
+                       live_timer(ctx, &info);
+               } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
+                       struct lttng_consumer_channel *channel;
+
+                       channel = info.si_value.sival_ptr;
+                       monitor_timer(channel);
+               } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+                       assert(CMM_LOAD_SHARED(consumer_quit));
+                       goto end;
                } else {
                        ERR("Unexpected signal %d\n", info.si_signo);
                }
@@ -597,10 +793,8 @@ void *consumer_timer_thread(void *data)
 error_testpoint:
        /* Only reached in testpoint error */
        health_error();
+end:
        health_unregister(health_consumerd);
-
        rcu_unregister_thread();
-
-       /* Never return */
        return NULL;
 }
This page took 0.031011 seconds and 4 git commands to generate.