*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <assert.h>
#include <inttypes.h>
#include <signal.h>
-#include <lttng/ust-ctl.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
+#include <common/compat/endian.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/kernel-consumer/kernel-consumer.h>
#include <common/consumer-stream.h>
#include "consumer-timer.h"
+#include "consumer-testpoint.h"
#include "ust-consumer/ust-consumer.h"
static struct timer_signal_data timer_signal = {
* they are held while consumer_timer_switch_stop() is
* called.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, channel, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
if (ret < 0) {
channel->switch_timer_error = 1;
}
}
}
-static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
+ uint64_t stream_id)
{
int ret;
- struct lttng_packet_index index;
+ struct ctf_packet_index index;
memset(&index, 0, sizeof(index));
+ index.stream_id = htobe64(stream_id);
index.timestamp_end = htobe64(ts);
ret = consumer_stream_write_index(stream, &index);
if (ret < 0) {
return ret;
}
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
{
- uint64_t ts;
+ uint64_t ts, stream_id;
int ret;
- /*
- * While holding the stream mutex, try to take a snapshot, if it
- * succeeds, it means that data is ready to be sent, just let the data
- * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
- * means that there is no data to read after the flush, so we can
- * safely send the empty index.
- */
- pthread_mutex_lock(&stream->lock);
ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
- goto error_unlock;
+ goto end;
}
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto error_unlock;
+ goto end;
}
ret = kernctl_snapshot(stream->wait_fd);
if (ret < 0) {
- if (errno != EAGAIN) {
- ERR("Taking kernel snapshot");
+ if (errno != EAGAIN && errno != ENODATA) {
+ PERROR("live timer kernel snapshot");
ret = -1;
- goto error_unlock;
+ goto end;
+ }
+ ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
+ if (ret < 0) {
+ PERROR("kernctl_get_stream_id");
+ goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
- ret = send_empty_index(stream, ts);
+ ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
- goto error_unlock;
+ goto end;
}
}
ret = 0;
-
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
+end:
return ret;
}
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
{
- uint64_t ts;
int ret;
- assert(stream);
- assert(stream->ustream);
/*
* While holding the stream mutex, try to take a snapshot, if it
* succeeds, it means that data is ready to be sent, just let the data
* thread handle that. Otherwise, if the snapshot returns EAGAIN, it
* means that there is no data to read after the flush, so we can
* safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
*/
- pthread_mutex_lock(&stream->lock);
- ret = ustctl_get_current_timestamp(stream->ustream, &ts);
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = consumer_flush_kernel_index(stream);
+ pthread_mutex_unlock(&stream->lock);
+end:
+ return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+ uint64_t ts, stream_id;
+ int ret;
+
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (ret) {
+ goto end;
+ }
+
+ ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
if (ret < 0) {
ERR("Failed to get the current timestamp");
- goto error_unlock;
+ goto end;
}
- ustctl_flush_buffer(stream->ustream, 1);
- ret = ustctl_snapshot(stream->ustream);
+ lttng_ustconsumer_flush_buffer(stream, 1);
+ ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
- if (errno != EAGAIN) {
+ if (ret != -EAGAIN) {
ERR("Taking UST snapshot");
ret = -1;
- goto error_unlock;
+ goto end;
+ }
+ ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
+ if (ret < 0) {
+ PERROR("ustctl_get_stream_id");
+ goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
- ret = send_empty_index(stream, ts);
+ ret = send_empty_index(stream, ts, stream_id);
if (ret < 0) {
- goto error_unlock;
+ goto end;
}
}
ret = 0;
+end:
+ return ret;
+}
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+ assert(stream->ustream);
+ /*
+ * While holding the stream mutex, try to take a snapshot, if it
+ * succeeds, it means that data is ready to be sent, just let the data
+ * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+ * means that there is no data to read after the flush, so we can
+ * safely send the empty index.
+ *
+ * Doing a trylock and checking if waiting on metadata if
+ * trylock fails. Bail out of the stream is indeed waiting for
+ * metadata to be pushed. Busy wait on trylock otherwise.
+ */
+ for (;;) {
+ ret = pthread_mutex_trylock(&stream->lock);
+ switch (ret) {
+ case 0:
+ break; /* We have the lock. */
+ case EBUSY:
+ pthread_mutex_lock(&stream->metadata_timer_lock);
+ if (stream->waiting_on_metadata) {
+ ret = 0;
+ stream->missed_metadata_flush = true;
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ goto end; /* Bail out. */
+ }
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
+ /* Try again. */
+ caa_cpu_relax();
+ continue;
+ default:
+ ERR("Unexpected pthread_mutex_trylock error %d", ret);
+ ret = -1;
+ goto end;
+ }
+ break;
+ }
+ ret = consumer_flush_ust_index(stream);
pthread_mutex_unlock(&stream->lock);
+end:
return ret;
}
assert(channel);
assert(channel->key);
- if (live_timer_interval == 0) {
+ if (live_timer_interval <= 0) {
return;
}
* Block the RT signals for the entire process. It must be called from the
* consumer main before creating the threads
*/
-void consumer_signal_init(void)
+int consumer_signal_init(void)
{
int ret;
sigset_t mask;
if (ret) {
errno = ret;
PERROR("pthread_sigmask");
+ return -1;
}
+ return 0;
}
/*
siginfo_t info;
struct lttng_consumer_local_data *ctx = data;
+ rcu_register_thread();
+
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
+
+ if (testpoint(consumerd_thread_metadata_timer)) {
+ goto error_testpoint;
+ }
+
+ health_code_update();
+
/* Only self thread will receive signal mask. */
setmask(&mask);
CMM_STORE_SHARED(timer_signal.tid, pthread_self());
while (1) {
+ health_code_update();
+
+ health_poll_entry();
signr = sigwaitinfo(&mask, &info);
+ health_poll_exit();
if (signr == -1) {
if (errno != EINTR) {
PERROR("sigwaitinfo");
}
}
+error_testpoint:
+ /* Only reached in testpoint error */
+ health_error();
+ health_unregister(health_consumerd);
+
+ rcu_unregister_thread();
+
+ /* Never return */
return NULL;
}