From: Jérémie Galarneau Date: Wed, 29 Apr 2020 04:03:43 +0000 (-0400) Subject: consumerd: refactor: combine duplicated check_*_functions X-Git-Tag: v2.13.0-rc1~639 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=fad4b619a1f7491acaf9b8fd2396c115b5bef56f;hp=41dc93d9df7c6bba1e1fb765d726700c9eafb144 consumerd: refactor: combine duplicated check_*_functions The check_ust_stream and check_kernel_stream functions are identical except for the call to the domain-specific call to consumer_flush_*_index. A "flush_index" callback is passed to check_stream in order to share the rest of that code. Signed-off-by: Jérémie Galarneau Change-Id: Iafdb64192322c0106a555b67f54290dadc4f0579 --- diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index 003e5adef..c190d3b62 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -26,6 +26,7 @@ typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, unsigned long *consumed); typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, unsigned long *produced); +typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream); static struct timer_signal_data timer_signal = { .tid = 0, @@ -175,7 +176,8 @@ end: return ret; } -static int check_kernel_stream(struct lttng_consumer_stream *stream) +static int check_stream(struct lttng_consumer_stream *stream, + flush_index_cb flush_index) { int ret; @@ -214,7 +216,7 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream) } break; } - ret = consumer_flush_kernel_index(stream); + ret = flush_index(stream); pthread_mutex_unlock(&stream->lock); end: return ret; @@ -259,53 +261,6 @@ end: return ret; } -static int check_ust_stream(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - assert(stream->ustream); - /* - * While holding the stream mutex, try to take a snapshot, if it - * succeeds, it means that data is ready to be sent, just let the data - * thread handle that. Otherwise, if the snapshot returns EAGAIN, it - * means that there is no data to read after the flush, so we can - * safely send the empty index. - * - * Doing a trylock and checking if waiting on metadata if - * trylock fails. Bail out of the stream is indeed waiting for - * metadata to be pushed. Busy wait on trylock otherwise. - */ - for (;;) { - ret = pthread_mutex_trylock(&stream->lock); - switch (ret) { - case 0: - break; /* We have the lock. */ - case EBUSY: - pthread_mutex_lock(&stream->metadata_timer_lock); - if (stream->waiting_on_metadata) { - ret = 0; - stream->missed_metadata_flush = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - goto end; /* Bail out. */ - } - pthread_mutex_unlock(&stream->metadata_timer_lock); - /* Try again. */ - caa_cpu_relax(); - continue; - default: - ERR("Unexpected pthread_mutex_trylock error %d", ret); - ret = -1; - goto end; - } - break; - } - ret = consumer_flush_ust_index(stream); - pthread_mutex_unlock(&stream->lock); -end: - return ret; -} - /* * Execute action on a live timer */ @@ -315,8 +270,12 @@ static void live_timer(struct lttng_consumer_local_data *ctx, int ret; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; - struct lttng_ht *ht; struct lttng_ht_iter iter; + const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + const flush_index_cb flush_index = + ctx->type == LTTNG_CONSUMER_KERNEL ? + consumer_flush_kernel_index : + consumer_flush_ust_index; channel = si->si_value.sival_ptr; assert(channel); @@ -324,38 +283,18 @@ static void live_timer(struct lttng_consumer_local_data *ctx, if (channel->switch_timer_error) { goto error; } - ht = consumer_data.stream_per_chan_id_ht; DBG("Live timer for channel %" PRIu64, channel->key); rcu_read_lock(); - switch (ctx->type) { - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - ret = check_ust_stream(stream); - if (ret < 0) { - goto error_unlock; - } - } - break; - case LTTNG_CONSUMER_KERNEL: - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - ret = check_kernel_stream(stream); - if (ret < 0) { - goto error_unlock; - } + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + ret = check_stream(stream, flush_index); + if (ret < 0) { + goto error_unlock; } - break; - case LTTNG_CONSUMER_UNKNOWN: - assert(0); - break; } error_unlock: