/*
- * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
- * David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <inttypes.h>
#include <signal.h>
-#include <bin/lttng-sessiond/ust-ctl.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/compat/endian.h>
unsigned long *consumed);
typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
static struct timer_signal_data timer_signal = {
.tid = 0,
return ret;
}
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+static int check_stream(struct lttng_consumer_stream *stream,
+ flush_index_cb flush_index)
{
int ret;
}
break;
}
- ret = consumer_flush_kernel_index(stream);
+ ret = flush_index(stream);
pthread_mutex_unlock(&stream->lock);
end:
return ret;
return ret;
}
-static int check_ust_stream(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
- assert(stream->ustream);
- /*
- * While holding the stream mutex, try to take a snapshot, if it
- * succeeds, it means that data is ready to be sent, just let the data
- * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
- * means that there is no data to read after the flush, so we can
- * safely send the empty index.
- *
- * Doing a trylock and checking if waiting on metadata if
- * trylock fails. Bail out of the stream is indeed waiting for
- * metadata to be pushed. Busy wait on trylock otherwise.
- */
- for (;;) {
- ret = pthread_mutex_trylock(&stream->lock);
- switch (ret) {
- case 0:
- break; /* We have the lock. */
- case EBUSY:
- pthread_mutex_lock(&stream->metadata_timer_lock);
- if (stream->waiting_on_metadata) {
- ret = 0;
- stream->missed_metadata_flush = true;
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- goto end; /* Bail out. */
- }
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- /* Try again. */
- caa_cpu_relax();
- continue;
- default:
- ERR("Unexpected pthread_mutex_trylock error %d", ret);
- ret = -1;
- goto end;
- }
- break;
- }
- ret = consumer_flush_ust_index(stream);
- pthread_mutex_unlock(&stream->lock);
-end:
- return ret;
-}
-
/*
* Execute action on a live timer
*/
int ret;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
struct lttng_ht_iter iter;
+ const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ const flush_index_cb flush_index =
+ ctx->type == LTTNG_CONSUMER_KERNEL ?
+ consumer_flush_kernel_index :
+ consumer_flush_ust_index;
channel = si->si_value.sival_ptr;
assert(channel);
if (channel->switch_timer_error) {
goto error;
}
- ht = consumer_data.stream_per_chan_id_ht;
DBG("Live timer for channel %" PRIu64, channel->key);
rcu_read_lock();
- switch (ctx->type) {
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
- ret = check_ust_stream(stream);
- if (ret < 0) {
- goto error_unlock;
- }
- }
- break;
- case LTTNG_CONSUMER_KERNEL:
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
- ret = check_kernel_stream(stream);
- if (ret < 0) {
- goto error_unlock;
- }
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ ret = check_stream(stream, flush_index);
+ if (ret < 0) {
+ goto error_unlock;
}
- break;
- case LTTNG_CONSUMER_UNKNOWN:
- assert(0);
- break;
}
error_unlock:
unsigned int timer_interval_us, int signal)
{
int ret = 0, delete_ret;
- struct sigevent sev;
+ struct sigevent sev = {};
struct itimerspec its;
assert(channel);
* Execute action on a monitor timer.
*/
static
-void monitor_timer(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel)
+void monitor_timer(struct lttng_consumer_channel *channel)
{
int ret;
int channel_monitor_pipe =
sample_positions_cb sample;
get_consumed_cb get_consumed;
get_produced_cb get_produced;
+ uint64_t lowest = 0, highest = 0, total_consumed = 0;
assert(channel);
abort();
}
- ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
- &msg.total_consumed, sample, get_consumed, get_produced);
+ ret = sample_channel_positions(channel, &highest, &lowest,
+ &total_consumed, sample, get_consumed, get_produced);
if (ret) {
return;
}
+ msg.highest = highest;
+ msg.lowest = lowest;
+ msg.total_consumed = total_consumed;
/*
* Writes performed here are assumed to be atomic which is only
struct lttng_consumer_channel *channel;
channel = info.si_value.sival_ptr;
- monitor_timer(ctx, channel);
+ monitor_timer(channel);
} else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
assert(CMM_LOAD_SHARED(consumer_quit));
goto end;