X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-timer.c;h=09cf3629331d344e54c63a6d3b2a467c527cb648;hp=b0a434284a87d888f631b3fc8f0d435fc4c1d888;hb=e9404c27e7cc9d841785e6c4292c1add19fbc1cc;hpb=40d253abbf635c6cf0b938298d56747cf67c11db diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index b0a434284..09cf36293 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -31,6 +32,12 @@ #include #include +typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream); +typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, + unsigned long *consumed); +typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, + unsigned long *produced); + static struct timer_signal_data timer_signal = { .tid = 0, .setup_done = 0, @@ -61,8 +68,14 @@ static void setmask(sigset_t *mask) if (ret) { PERROR("sigaddset live"); } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR); + if (ret) { + PERROR("sigaddset monitor"); + } } +static int channel_monitor_pipe = -1; + /* * Execute action on a timer switch. * @@ -71,7 +84,7 @@ static void setmask(sigset_t *mask) * deadlocks. */ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -304,7 +317,7 @@ end: * Execute action on a live timer */ static void live_timer(struct lttng_consumer_local_data *ctx, - int sig, siginfo_t *si, void *uc) + int sig, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -411,44 +424,95 @@ void consumer_timer_signal_thread_qs(unsigned int signr) } /* - * Set the timer for periodical metadata flush. + * Start a timer channel timer which will fire at a given interval + * (timer_interval_us)and fire a given signal (signal). + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval) +static +int consumer_channel_timer_start(timer_t *timer_id, + struct lttng_consumer_channel *channel, + unsigned int timer_interval_us, int signal) { - int ret; + int ret = 0, delete_ret; struct sigevent sev; struct itimerspec its; assert(channel); assert(channel->key); - if (switch_timer_interval == 0) { - return; + if (timer_interval_us == 0) { + /* No creation needed; not an error. */ + ret = 1; + goto end; } sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH; + sev.sigev_signo = signal; sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->switch_timer); + ret = timer_create(CLOCKID, &sev, timer_id); if (ret == -1) { PERROR("timer_create"); + goto end; } - channel->switch_timer_enabled = 1; - its.it_value.tv_sec = switch_timer_interval / 1000000; - its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000; + its.it_value.tv_sec = timer_interval_us / 1000000; + its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000; its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; - ret = timer_settime(channel->switch_timer, 0, &its, NULL); + ret = timer_settime(*timer_id, 0, &its, NULL); if (ret == -1) { PERROR("timer_settime"); + goto error_destroy_timer; + } +end: + return ret; +error_destroy_timer: + delete_ret = timer_delete(*timer_id); + if (delete_ret == -1) { + PERROR("timer_delete"); + } + goto end; +} + +static +int consumer_channel_timer_stop(timer_t *timer_id, int signal) +{ + int ret = 0; + + ret = timer_delete(*timer_id); + if (ret == -1) { + PERROR("timer_delete"); + goto end; } + + consumer_timer_signal_thread_qs(signal); + *timer_id = 0; +end: + return ret; +} + +/* + * Set the channel's switch timer. + */ +void consumer_timer_switch_start(struct lttng_consumer_channel *channel, + unsigned int switch_timer_interval_us) +{ + int ret; + + assert(channel); + assert(channel->key); + + ret = consumer_channel_timer_start(&channel->switch_timer, channel, + switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH); + + channel->switch_timer_enabled = !!(ret == 0); } /* - * Stop and delete timer. + * Stop and delete the channel's switch timer. */ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) { @@ -456,72 +520,91 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) assert(channel); - ret = timer_delete(channel->switch_timer); + ret = consumer_channel_timer_stop(&channel->switch_timer, + LTTNG_CONSUMER_SIG_SWITCH); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop switch timer"); } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH); - - channel->switch_timer = 0; channel->switch_timer_enabled = 0; } /* - * Set the timer for the live mode. + * Set the channel's live timer. */ void consumer_timer_live_start(struct lttng_consumer_channel *channel, - int live_timer_interval) + unsigned int live_timer_interval_us) { int ret; - struct sigevent sev; - struct itimerspec its; assert(channel); assert(channel->key); - if (live_timer_interval <= 0) { - return; - } + ret = consumer_channel_timer_start(&channel->live_timer, channel, + live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); - sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE; - sev.sigev_value.sival_ptr = channel; - ret = timer_create(CLOCKID, &sev, &channel->live_timer); - if (ret == -1) { - PERROR("timer_create"); - } - channel->live_timer_enabled = 1; + channel->live_timer_enabled = !!(ret == 0); +} - its.it_value.tv_sec = live_timer_interval / 1000000; - its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000; - its.it_interval.tv_sec = its.it_value.tv_sec; - its.it_interval.tv_nsec = its.it_value.tv_nsec; +/* + * Stop and delete the channel's live timer. + */ +void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); - ret = timer_settime(channel->live_timer, 0, &its, NULL); + ret = consumer_channel_timer_stop(&channel->live_timer, + LTTNG_CONSUMER_SIG_LIVE); if (ret == -1) { - PERROR("timer_settime"); + ERR("Failed to stop live timer"); } + + channel->live_timer_enabled = 0; } /* - * Stop and delete timer. + * Set the channel's monitoring timer. + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). */ -void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, + unsigned int monitor_timer_interval_us) { int ret; assert(channel); + assert(channel->key); + assert(!channel->monitor_timer_enabled); - ret = timer_delete(channel->live_timer); + ret = consumer_channel_timer_start(&channel->monitor_timer, channel, + monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR); + channel->monitor_timer_enabled = !!(ret == 0); + return ret; +} + +/* + * Stop and delete the channel's monitoring timer. + */ +int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); + assert(channel->monitor_timer_enabled); + + ret = consumer_channel_timer_stop(&channel->monitor_timer, + LTTNG_CONSUMER_SIG_MONITOR); if (ret == -1) { - PERROR("timer_delete"); + ERR("Failed to stop live timer"); + goto end; } - consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE); - - channel->live_timer = 0; - channel->live_timer_enabled = 0; + channel->monitor_timer_enabled = 0; +end: + return ret; } /* @@ -544,9 +627,165 @@ int consumer_signal_init(void) return 0; } +static +int sample_channel_positions(struct lttng_consumer_channel *channel, + uint64_t *_highest_use, uint64_t *_lowest_use, + sample_positions_cb sample, get_consumed_cb get_consumed, + get_produced_cb get_produced) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + bool empty_channel = true; + uint64_t high = 0, low = UINT64_MAX; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + unsigned long produced, consumed, usage; + + empty_channel = false; + + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + + ret = sample(stream); + if (ret) { + ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = get_consumed(stream, &consumed); + if (ret) { + ERR("Failed to get buffer consumed position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + ret = get_produced(stream, &produced); + if (ret) { + ERR("Failed to get buffer produced position in monitor timer"); + pthread_mutex_unlock(&stream->lock); + goto end; + } + + usage = produced - consumed; + high = (usage > high) ? usage : high; + low = (usage < low) ? usage : low; + next: + pthread_mutex_unlock(&stream->lock); + } + + *_highest_use = high; + *_lowest_use = low; +end: + rcu_read_unlock(); + if (empty_channel) { + ret = -1; + } + return ret; +} + +/* + * Execute action on a monitor timer. + */ +static +void monitor_timer(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *channel) +{ + int ret; + int channel_monitor_pipe = + consumer_timer_thread_get_channel_monitor_pipe(); + struct lttcomm_consumer_channel_monitor_msg msg = { + .key = channel->key, + }; + sample_positions_cb sample; + get_consumed_cb get_consumed; + get_produced_cb get_produced; + + assert(channel); + pthread_mutex_lock(&consumer_data.lock); + + if (channel_monitor_pipe < 0) { + goto end; + } + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + sample = lttng_kconsumer_sample_snapshot_positions; + get_consumed = lttng_kconsumer_get_consumed_snapshot; + get_produced = lttng_kconsumer_get_produced_snapshot; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + sample = lttng_ustconsumer_sample_snapshot_positions; + get_consumed = lttng_ustconsumer_get_consumed_snapshot; + get_produced = lttng_ustconsumer_get_produced_snapshot; + break; + default: + abort(); + } + + ret = sample_channel_positions(channel, &msg.highest, &msg.lowest, + sample, get_consumed, get_produced); + if (ret) { + goto end; + } + + /* + * Writes performed here are assumed to be atomic which is only + * guaranteed for sizes < than PIPE_BUF. + */ + assert(sizeof(msg) <= PIPE_BUF); + + do { + ret = write(channel_monitor_pipe, &msg, sizeof(msg)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + if (errno == EAGAIN) { + /* Not an error, the sample is merely dropped. */ + DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64, + channel->key); + } else { + PERROR("write to the channel monitor pipe"); + } + } else { + DBG("Sent channel monitoring sample for channel key %" PRIu64 + ", (highest = %" PRIu64 ", lowest = %"PRIu64")", + channel->key, msg.highest, msg.lowest); + } +end: + pthread_mutex_unlock(&consumer_data.lock); +} + +int consumer_timer_thread_get_channel_monitor_pipe(void) +{ + return uatomic_read(&channel_monitor_pipe); +} + +int consumer_timer_thread_set_channel_monitor_pipe(int fd) +{ + int ret; + + ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd); + if (ret != -1) { + ret = -1; + goto end; + } + ret = 0; +end: + return ret; +} + /* * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, - * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE. + * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and + * LTTNG_CONSUMER_SIG_MONITOR. */ void *consumer_timer_thread(void *data) { @@ -575,20 +814,31 @@ void *consumer_timer_thread(void *data) health_poll_entry(); signr = sigwaitinfo(&mask, &info); health_poll_exit(); + + /* + * NOTE: cascading conditions are used instead of a switch case + * since the use of SIGRTMIN in the definition of the signals' + * values prevents the reduction to an integer constant. + */ if (signr == -1) { if (errno != EINTR) { PERROR("sigwaitinfo"); } continue; } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) { - metadata_switch_timer(ctx, info.si_signo, &info, NULL); + metadata_switch_timer(ctx, info.si_signo, &info); } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) { cmm_smp_mb(); CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { - live_timer(ctx, info.si_signo, &info, NULL); + live_timer(ctx, info.si_signo, &info); + } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) { + struct lttng_consumer_channel *channel; + + channel = info.si_value.sival_ptr; + monitor_timer(ctx, channel); } else { ERR("Unexpected signal %d\n", info.si_signo); }