+static
+int sample_channel_positions(struct lttng_consumer_channel *channel,
+ uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
+ sample_positions_cb sample, get_consumed_cb get_consumed,
+ get_produced_cb get_produced)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool empty_channel = true;
+ uint64_t high = 0, low = UINT64_MAX;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+
+ *_total_consumed = 0;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key,
+ &iter.iter, stream, node_channel_id.node) {
+ unsigned long produced, consumed, usage;
+
+ empty_channel = false;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ ret = sample(stream);
+ if (ret) {
+ ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_consumed(stream, &consumed);
+ if (ret) {
+ ERR("Failed to get buffer consumed position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_produced(stream, &produced);
+ if (ret) {
+ ERR("Failed to get buffer produced position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+
+ usage = produced - consumed;
+ high = (usage > high) ? usage : high;
+ low = (usage < low) ? usage : low;
+
+ /*
+ * We don't use consumed here for 2 reasons:
+ * - output_written takes into account the padding written in the
+ * tracefiles when we stop the session;
+ * - the consumed position is not the accurate representation of what
+ * was extracted from a buffer in overwrite mode.
+ */
+ *_total_consumed += stream->output_written;
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ *_highest_use = high;
+ *_lowest_use = low;
+end:
+ rcu_read_unlock();
+ if (empty_channel) {
+ ret = -1;
+ }
+ return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ int channel_monitor_pipe =
+ consumer_timer_thread_get_channel_monitor_pipe();
+ struct lttcomm_consumer_channel_monitor_msg msg = {
+ .key = channel->key,
+ };
+ sample_positions_cb sample;
+ get_consumed_cb get_consumed;
+ get_produced_cb get_produced;
+ uint64_t lowest = 0, highest = 0, total_consumed = 0;
+
+ assert(channel);
+
+ if (channel_monitor_pipe < 0) {
+ return;
+ }
+
+ switch (the_consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ sample = lttng_kconsumer_sample_snapshot_positions;
+ get_consumed = lttng_kconsumer_get_consumed_snapshot;
+ get_produced = lttng_kconsumer_get_produced_snapshot;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ sample = lttng_ustconsumer_sample_snapshot_positions;
+ get_consumed = lttng_ustconsumer_get_consumed_snapshot;
+ get_produced = lttng_ustconsumer_get_produced_snapshot;
+ break;
+ default:
+ abort();
+ }
+
+ ret = sample_channel_positions(channel, &highest, &lowest,
+ &total_consumed, sample, get_consumed, get_produced);
+ if (ret) {
+ return;
+ }
+ msg.highest = highest;
+ msg.lowest = lowest;
+ msg.total_consumed = total_consumed;
+
+ /*
+ * Writes performed here are assumed to be atomic which is only
+ * guaranteed for sizes < than PIPE_BUF.
+ */
+ assert(sizeof(msg) <= PIPE_BUF);
+
+ do {
+ ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ if (errno == EAGAIN) {
+ /* Not an error, the sample is merely dropped. */
+ DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
+ channel->key);
+ } else {
+ PERROR("write to the channel monitor pipe");
+ }
+ } else {
+ DBG("Sent channel monitoring sample for channel key %" PRIu64
+ ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
+ channel->key, msg.highest, msg.lowest);
+ }
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+ return uatomic_read(&the_channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+ int ret;
+
+ ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
+ if (ret != -1) {
+ ret = -1;
+ goto end;
+ }
+ ret = 0;
+end:
+ return ret;
+}
+