X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-timer.c;h=646d32342cdfa0a7a7552332dd3f5da2c90cc680;hb=847a5916c26ab9cc0dfc9322cccd2c748c54747e;hp=e2be05e7e731b8355235e3d040365545c5828bbc;hpb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;p=lttng-tools.git diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index e2be05e7e..646d32342 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -17,17 +17,20 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include -#include +#include #include +#include #include #include #include #include "consumer-timer.h" +#include "consumer-testpoint.h" #include "ust-consumer/ust-consumer.h" static struct timer_signal_data timer_signal = { @@ -100,7 +103,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, * they are held while consumer_timer_switch_stop() is * called. */ - ret = lttng_ustconsumer_request_metadata(ctx, channel, 1); + ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1); if (ret < 0) { channel->switch_timer_error = 1; } @@ -112,12 +115,14 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, } } -static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts) +static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, + uint64_t stream_id) { int ret; - struct lttng_packet_index index; + struct ctf_packet_index index; memset(&index, 0, sizeof(index)); + index.stream_id = htobe64(stream_id); index.timestamp_end = htobe64(ts); ret = consumer_stream_write_index(stream, &index); if (ret < 0) { @@ -130,7 +135,7 @@ error: static int check_kernel_stream(struct lttng_consumer_stream *stream) { - uint64_t ts; + uint64_t ts, stream_id; int ret; /* @@ -153,13 +158,18 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream) } ret = kernctl_snapshot(stream->wait_fd); if (ret < 0) { - if (errno != EAGAIN) { - ERR("Taking kernel snapshot"); + if (errno != EAGAIN && errno != ENODATA) { + PERROR("live timer kernel snapshot"); ret = -1; goto error_unlock; } + ret = kernctl_get_stream_id(stream->wait_fd, &stream_id); + if (ret < 0) { + PERROR("kernctl_get_stream_id"); + goto error_unlock; + } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts); + ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { goto error_unlock; } @@ -173,7 +183,7 @@ error_unlock: static int check_ust_stream(struct lttng_consumer_stream *stream) { - uint64_t ts; + uint64_t ts, stream_id; int ret; assert(stream); @@ -186,21 +196,31 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) * safely send the empty index. */ pthread_mutex_lock(&stream->lock); - ret = ustctl_get_current_timestamp(stream->ustream, &ts); + ret = cds_lfht_is_node_deleted(&stream->node.node); + if (ret) { + goto error_unlock; + } + + ret = lttng_ustconsumer_get_current_timestamp(stream, &ts); if (ret < 0) { ERR("Failed to get the current timestamp"); goto error_unlock; } - ustctl_flush_buffer(stream->ustream, 1); - ret = ustctl_snapshot(stream->ustream); + lttng_ustconsumer_flush_buffer(stream, 1); + ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { - if (errno != EAGAIN) { + if (ret != -EAGAIN) { ERR("Taking UST snapshot"); ret = -1; goto error_unlock; } + ret = lttng_ustconsumer_get_stream_id(stream, &stream_id); + if (ret < 0) { + PERROR("ustctl_get_stream_id"); + goto error_unlock; + } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts); + ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { goto error_unlock; } @@ -392,7 +412,7 @@ void consumer_timer_live_start(struct lttng_consumer_channel *channel, assert(channel); assert(channel->key); - if (live_timer_interval == 0) { + if (live_timer_interval <= 0) { return; } @@ -440,7 +460,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel) * Block the RT signals for the entire process. It must be called from the * consumer main before creating the threads */ -void consumer_signal_init(void) +int consumer_signal_init(void) { int ret; sigset_t mask; @@ -451,7 +471,9 @@ void consumer_signal_init(void) if (ret) { errno = ret; PERROR("pthread_sigmask"); + return -1; } + return 0; } /* @@ -465,12 +487,24 @@ void *consumer_timer_thread(void *data) siginfo_t info; struct lttng_consumer_local_data *ctx = data; + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER); + + if (testpoint(consumerd_thread_metadata_timer)) { + goto error_testpoint; + } + + health_code_update(); + /* Only self thread will receive signal mask. */ setmask(&mask); CMM_STORE_SHARED(timer_signal.tid, pthread_self()); while (1) { + health_code_update(); + + health_poll_entry(); signr = sigwaitinfo(&mask, &info); + health_poll_exit(); if (signr == -1) { if (errno != EINTR) { PERROR("sigwaitinfo"); @@ -490,5 +524,11 @@ void *consumer_timer_thread(void *data) } } +error_testpoint: + /* Only reached in testpoint error */ + health_error(); + health_unregister(health_consumerd); + + /* Never return */ return NULL; }