X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-timer.c;h=6d2e6b2b2cbc0ff0183b93f3b67afda3aa846f0b;hb=a0377dfefe40662ba7d68617bce6ff467114136c;hp=60ed94083e543a4d082e6f4e700f1ca57662120a;hpb=873dda4e25e5199d434d322fb7d64697cd9868d3;p=lttng-tools.git diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index 60ed94083..6d2e6b2b2 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -1,27 +1,15 @@ /* - * Copyright (C) 2012 - Julien Desfossez - * David Goulet + * Copyright (C) 2012 Julien Desfossez + * Copyright (C) 2012 David Goulet * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE -#include #include #include -#include #include #include #include @@ -37,6 +25,7 @@ typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, unsigned long *consumed); typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, unsigned long *produced); +typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream); static struct timer_signal_data timer_signal = { .tid = 0, @@ -72,9 +61,13 @@ static void setmask(sigset_t *mask) if (ret) { PERROR("sigaddset monitor"); } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT); + if (ret) { + PERROR("sigaddset exit"); + } } -static int channel_monitor_pipe = -1; +static int the_channel_monitor_pipe = -1; /* * Execute action on a timer switch. @@ -84,13 +77,13 @@ static int channel_monitor_pipe = -1; * deadlocks. */ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si) + siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; channel = si->si_value.sival_ptr; - assert(channel); + LTTNG_ASSERT(channel); if (channel->switch_timer_error) { return; @@ -121,7 +114,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, break; case LTTNG_CONSUMER_KERNEL: case LTTNG_CONSUMER_UNKNOWN: - assert(0); + abort(); break; } } @@ -182,7 +175,8 @@ end: return ret; } -static int check_kernel_stream(struct lttng_consumer_stream *stream) +static int check_stream(struct lttng_consumer_stream *stream, + flush_index_cb flush_index) { int ret; @@ -221,7 +215,7 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream) } break; } - ret = consumer_flush_kernel_index(stream); + ret = flush_index(stream); pthread_mutex_unlock(&stream->lock); end: return ret; @@ -242,7 +236,11 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream) ERR("Failed to get the current timestamp"); goto end; } - lttng_ustconsumer_flush_buffer(stream, 1); + ret = lttng_ustconsumer_flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush buffer while flushing index"); + goto end; + } ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { if (ret != -EAGAIN) { @@ -252,7 +250,7 @@ int consumer_flush_ust_index(struct lttng_consumer_stream *stream) } ret = lttng_ustconsumer_get_stream_id(stream, &stream_id); if (ret < 0) { - PERROR("ustctl_get_stream_id"); + PERROR("lttng_ust_ctl_get_stream_id"); goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); @@ -266,103 +264,40 @@ end: return ret; } -static int check_ust_stream(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - assert(stream->ustream); - /* - * While holding the stream mutex, try to take a snapshot, if it - * succeeds, it means that data is ready to be sent, just let the data - * thread handle that. Otherwise, if the snapshot returns EAGAIN, it - * means that there is no data to read after the flush, so we can - * safely send the empty index. - * - * Doing a trylock and checking if waiting on metadata if - * trylock fails. Bail out of the stream is indeed waiting for - * metadata to be pushed. Busy wait on trylock otherwise. - */ - for (;;) { - ret = pthread_mutex_trylock(&stream->lock); - switch (ret) { - case 0: - break; /* We have the lock. */ - case EBUSY: - pthread_mutex_lock(&stream->metadata_timer_lock); - if (stream->waiting_on_metadata) { - ret = 0; - stream->missed_metadata_flush = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - goto end; /* Bail out. */ - } - pthread_mutex_unlock(&stream->metadata_timer_lock); - /* Try again. */ - caa_cpu_relax(); - continue; - default: - ERR("Unexpected pthread_mutex_trylock error %d", ret); - ret = -1; - goto end; - } - break; - } - ret = consumer_flush_ust_index(stream); - pthread_mutex_unlock(&stream->lock); -end: - return ret; -} - /* * Execute action on a live timer */ static void live_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si) + siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; - struct lttng_ht *ht; struct lttng_ht_iter iter; + const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + const flush_index_cb flush_index = + ctx->type == LTTNG_CONSUMER_KERNEL ? + consumer_flush_kernel_index : + consumer_flush_ust_index; channel = si->si_value.sival_ptr; - assert(channel); + LTTNG_ASSERT(channel); if (channel->switch_timer_error) { goto error; } - ht = consumer_data.stream_per_chan_id_ht; DBG("Live timer for channel %" PRIu64, channel->key); rcu_read_lock(); - switch (ctx->type) { - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - ret = check_ust_stream(stream); - if (ret < 0) { - goto error_unlock; - } - } - break; - case LTTNG_CONSUMER_KERNEL: - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - ret = check_kernel_stream(stream); - if (ret < 0) { - goto error_unlock; - } + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + ret = check_stream(stream, flush_index); + if (ret < 0) { + goto error_unlock; } - break; - case LTTNG_CONSUMER_UNKNOWN: - assert(0); - break; } error_unlock: @@ -394,7 +329,7 @@ void consumer_timer_signal_thread_qs(unsigned int signr) if (ret == -1) { PERROR("sigpending"); } - if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) { + if (!sigismember(&pending_set, signr)) { break; } caa_cpu_relax(); @@ -436,11 +371,11 @@ int consumer_channel_timer_start(timer_t *timer_id, unsigned int timer_interval_us, int signal) { int ret = 0, delete_ret; - struct sigevent sev; + struct sigevent sev = {}; struct itimerspec its; - assert(channel); - assert(channel->key); + LTTNG_ASSERT(channel); + LTTNG_ASSERT(channel->key); if (timer_interval_us == 0) { /* No creation needed; not an error. */ @@ -502,8 +437,8 @@ void consumer_timer_switch_start(struct lttng_consumer_channel *channel, { int ret; - assert(channel); - assert(channel->key); + LTTNG_ASSERT(channel); + LTTNG_ASSERT(channel->key); ret = consumer_channel_timer_start(&channel->switch_timer, channel, switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH); @@ -518,7 +453,7 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) { int ret; - assert(channel); + LTTNG_ASSERT(channel); ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH); @@ -537,8 +472,8 @@ void consumer_timer_live_start(struct lttng_consumer_channel *channel, { int ret; - assert(channel); - assert(channel->key); + LTTNG_ASSERT(channel); + LTTNG_ASSERT(channel->key); ret = consumer_channel_timer_start(&channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); @@ -553,7 +488,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel) { int ret; - assert(channel); + LTTNG_ASSERT(channel); ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE); @@ -575,9 +510,9 @@ int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, { int ret; - assert(channel); - assert(channel->key); - assert(!channel->monitor_timer_enabled); + LTTNG_ASSERT(channel); + LTTNG_ASSERT(channel->key); + LTTNG_ASSERT(!channel->monitor_timer_enabled); ret = consumer_channel_timer_start(&channel->monitor_timer, channel, monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR); @@ -592,8 +527,8 @@ int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) { int ret; - assert(channel); - assert(channel->monitor_timer_enabled); + LTTNG_ASSERT(channel); + LTTNG_ASSERT(channel->monitor_timer_enabled); ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR); @@ -629,16 +564,18 @@ int consumer_signal_init(void) static int sample_channel_positions(struct lttng_consumer_channel *channel, - uint64_t *_highest_use, uint64_t *_lowest_use, + uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed, sample_positions_cb sample, get_consumed_cb get_consumed, get_produced_cb get_produced) { - int ret; + int ret = 0; struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; bool empty_channel = true; uint64_t high = 0, low = UINT64_MAX; - struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + + *_total_consumed = 0; rcu_read_lock(); @@ -677,6 +614,15 @@ int sample_channel_positions(struct lttng_consumer_channel *channel, usage = produced - consumed; high = (usage > high) ? usage : high; low = (usage < low) ? usage : low; + + /* + * We don't use consumed here for 2 reasons: + * - output_written takes into account the padding written in the + * tracefiles when we stop the session; + * - the consumed position is not the accurate representation of what + * was extracted from a buffer in overwrite mode. + */ + *_total_consumed += stream->output_written; next: pthread_mutex_unlock(&stream->lock); } @@ -695,8 +641,7 @@ end: * Execute action on a monitor timer. */ static -void monitor_timer(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel) +void monitor_timer(struct lttng_consumer_channel *channel) { int ret; int channel_monitor_pipe = @@ -707,14 +652,15 @@ void monitor_timer(struct lttng_consumer_local_data *ctx, sample_positions_cb sample; get_consumed_cb get_consumed; get_produced_cb get_produced; + uint64_t lowest = 0, highest = 0, total_consumed = 0; - assert(channel); + LTTNG_ASSERT(channel); if (channel_monitor_pipe < 0) { return; } - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: sample = lttng_kconsumer_sample_snapshot_positions; get_consumed = lttng_kconsumer_get_consumed_snapshot; @@ -730,17 +676,20 @@ void monitor_timer(struct lttng_consumer_local_data *ctx, abort(); } - ret = sample_channel_positions(channel, &msg.highest, &msg.lowest, - sample, get_consumed, get_produced); + ret = sample_channel_positions(channel, &highest, &lowest, + &total_consumed, sample, get_consumed, get_produced); if (ret) { return; } + msg.highest = highest; + msg.lowest = lowest; + msg.total_consumed = total_consumed; /* * Writes performed here are assumed to be atomic which is only * guaranteed for sizes < than PIPE_BUF. */ - assert(sizeof(msg) <= PIPE_BUF); + LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF); do { ret = write(channel_monitor_pipe, &msg, sizeof(msg)); @@ -762,14 +711,14 @@ void monitor_timer(struct lttng_consumer_local_data *ctx, int consumer_timer_thread_get_channel_monitor_pipe(void) { - return uatomic_read(&channel_monitor_pipe); + return uatomic_read(&the_channel_monitor_pipe); } int consumer_timer_thread_set_channel_monitor_pipe(int fd) { int ret; - ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd); + ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd); if (ret != -1) { ret = -1; goto end; @@ -782,7 +731,7 @@ end: /* * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and - * LTTNG_CONSUMER_SIG_MONITOR. + * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT. */ void *consumer_timer_thread(void *data) { @@ -823,19 +772,22 @@ void *consumer_timer_thread(void *data) } continue; } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) { - metadata_switch_timer(ctx, info.si_signo, &info); + metadata_switch_timer(ctx, &info); } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) { cmm_smp_mb(); CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { - live_timer(ctx, info.si_signo, &info); + live_timer(ctx, &info); } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) { struct lttng_consumer_channel *channel; channel = info.si_value.sival_ptr; - monitor_timer(ctx, channel); + monitor_timer(channel); + } else if (signr == LTTNG_CONSUMER_SIG_EXIT) { + LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit)); + goto end; } else { ERR("Unexpected signal %d\n", info.si_signo); } @@ -844,10 +796,8 @@ void *consumer_timer_thread(void *data) error_testpoint: /* Only reached in testpoint error */ health_error(); +end: health_unregister(health_consumerd); - rcu_unregister_thread(); - - /* Never return */ return NULL; }