#include <inttypes.h>
#include <signal.h>
-#include <bin/lttng-sessiond/ust-ctl.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/compat/endian.h>
if (ret) {
PERROR("sigaddset monitor");
}
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
+ if (ret) {
+ PERROR("sigaddset exit");
+ }
}
static int channel_monitor_pipe = -1;
* deadlocks.
*/
static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si)
+ siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
* Execute action on a live timer
*/
static void live_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si)
+ siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
static
int sample_channel_positions(struct lttng_consumer_channel *channel,
- uint64_t *_highest_use, uint64_t *_lowest_use,
+ uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
sample_positions_cb sample, get_consumed_cb get_consumed,
get_produced_cb get_produced)
{
- int ret;
+ int ret = 0;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
bool empty_channel = true;
uint64_t high = 0, low = UINT64_MAX;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ *_total_consumed = 0;
+
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
usage = produced - consumed;
high = (usage > high) ? usage : high;
low = (usage < low) ? usage : low;
+
+ /*
+ * We don't use consumed here for 2 reasons:
+ * - output_written takes into account the padding written in the
+ * tracefiles when we stop the session;
+ * - the consumed position is not the accurate representation of what
+ * was extracted from a buffer in overwrite mode.
+ */
+ *_total_consumed += stream->output_written;
next:
pthread_mutex_unlock(&stream->lock);
}
* Execute action on a monitor timer.
*/
static
-void monitor_timer(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel)
+void monitor_timer(struct lttng_consumer_channel *channel)
{
int ret;
int channel_monitor_pipe =
sample_positions_cb sample;
get_consumed_cb get_consumed;
get_produced_cb get_produced;
+ uint64_t lowest = 0, highest = 0, total_consumed = 0;
assert(channel);
abort();
}
- ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
- sample, get_consumed, get_produced);
+ ret = sample_channel_positions(channel, &highest, &lowest,
+ &total_consumed, sample, get_consumed, get_produced);
if (ret) {
return;
}
+ msg.highest = highest;
+ msg.lowest = lowest;
+ msg.total_consumed = total_consumed;
/*
* Writes performed here are assumed to be atomic which is only
/*
* This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
* LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
- * LTTNG_CONSUMER_SIG_MONITOR.
+ * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
*/
void *consumer_timer_thread(void *data)
{
}
continue;
} else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
- metadata_switch_timer(ctx, info.si_signo, &info);
+ metadata_switch_timer(ctx, &info);
} else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
cmm_smp_mb();
CMM_STORE_SHARED(timer_signal.qs_done, 1);
cmm_smp_mb();
DBG("Signal timer metadata thread teardown");
} else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
- live_timer(ctx, info.si_signo, &info);
+ live_timer(ctx, &info);
} else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
struct lttng_consumer_channel *channel;
channel = info.si_value.sival_ptr;
- monitor_timer(ctx, channel);
+ monitor_timer(channel);
+ } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
+ assert(CMM_LOAD_SHARED(consumer_quit));
+ goto end;
} else {
ERR("Unexpected signal %d\n", info.si_signo);
}
error_testpoint:
/* Only reached in testpoint error */
health_error();
+end:
health_unregister(health_consumerd);
-
rcu_unregister_thread();
-
- /* Never return */
return NULL;
}