#include <inttypes.h>
#include <signal.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
+#include <common/compat/endian.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/kernel-consumer/kernel-consumer.h>
#include <common/consumer-stream.h>
#include "consumer-timer.h"
+#include "consumer-testpoint.h"
#include "ust-consumer/ust-consumer.h"
-#include "../bin/lttng-consumerd/health-consumerd.h"
static struct timer_signal_data timer_signal = {
.tid = 0,
}
}
-static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
+ uint64_t stream_id)
{
int ret;
- struct lttng_packet_index index;
+ struct ctf_packet_index index;
memset(&index, 0, sizeof(index));
+ index.stream_id = htobe64(stream_id);
index.timestamp_end = htobe64(ts);
ret = consumer_stream_write_index(stream, &index);
if (ret < 0) {
static int check_kernel_stream(struct lttng_consumer_stream *stream)
{
- uint64_t ts;
+ uint64_t ts, stream_id;
int ret;
/*
}
ret = kernctl_snapshot(stream->wait_fd);
if (ret < 0) {
- if (errno != EAGAIN) {
- ERR("Taking kernel snapshot");
+ if (errno != EAGAIN && errno != ENODATA) {
+ PERROR("live timer kernel snapshot");
ret = -1;
goto error_unlock;
}
+ ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
+ if (ret < 0) {
+ PERROR("kernctl_get_stream_id");
+ goto error_unlock;
+ }
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
- ret = send_empty_index(stream, ts);
+ ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
goto error_unlock;
}
static int check_ust_stream(struct lttng_consumer_stream *stream)
{
- uint64_t ts;
+ uint64_t ts, stream_id;
int ret;
assert(stream);
ret = -1;
goto error_unlock;
}
+ ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
+ if (ret < 0) {
+ PERROR("ustctl_get_stream_id");
+ goto error_unlock;
+ }
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
- ret = send_empty_index(stream, ts);
+ ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
goto error_unlock;
}
assert(channel);
assert(channel->key);
- if (live_timer_interval == 0) {
+ if (live_timer_interval <= 0) {
return;
}
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
+ if (testpoint(consumerd_thread_metadata_timer)) {
+ goto error_testpoint;
+ }
+
+ health_code_update();
+
/* Only self thread will receive signal mask. */
setmask(&mask);
CMM_STORE_SHARED(timer_signal.tid, pthread_self());
while (1) {
+ health_code_update();
+
+ health_poll_entry();
signr = sigwaitinfo(&mask, &info);
+ health_poll_exit();
if (signr == -1) {
if (errno != EINTR) {
PERROR("sigwaitinfo");
}
}
- /* Currently never reached */
+error_testpoint:
+ /* Only reached in testpoint error */
+ health_error();
health_unregister(health_consumerd);
/* Never return */