projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Health check: implement health check query in sessiond and consumerd
[lttng-tools.git]
/
src
/
common
/
consumer.c
diff --git
a/src/common/consumer.c
b/src/common/consumer.c
index 6abd8b1e86de34c71d330ac68726846dd971da3b..6f3a02d2619eeef095684fc927399fca7a9404dd 100644
(file)
--- a/
src/common/consumer.c
+++ b/
src/common/consumer.c
@@
-94,6
+94,18
@@
static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
(void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
}
(void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
}
+static void notify_health_quit_pipe(int *pipe)
+{
+ int ret;
+
+ do {
+ ret = write(pipe[1], "4", 1);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != 1) {
+ PERROR("write consumer health quit");
+ }
+}
+
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
uint64_t key,
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
uint64_t key,
@@
-2196,6
+2208,8
@@
void *consumer_thread_metadata_poll(void *data)
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+ health_code_update();
+
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
@@
-2221,6
+2235,8
@@
void *consumer_thread_metadata_poll(void *data)
DBG("Metadata main loop started");
while (1) {
DBG("Metadata main loop started");
while (1) {
+ health_code_update();
+
/* Only the metadata pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
err = 0; /* All is OK */
/* Only the metadata pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
err = 0; /* All is OK */
@@
-2229,7
+2245,9
@@
void *consumer_thread_metadata_poll(void *data)
restart:
DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
restart:
DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
DBG("Metadata event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
DBG("Metadata event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
@@
-2243,6
+2261,8
@@
restart:
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
@@
-2312,6
+2332,8
@@
restart:
/* We just flushed the stream now read it. */
do {
/* We just flushed the stream now read it. */
do {
+ health_code_update();
+
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
@@
-2334,6
+2356,8
@@
restart:
assert(stream->wait_fd == pollfd);
do {
assert(stream->wait_fd == pollfd);
do {
+ health_code_update();
+
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
@@
-2394,6
+2418,8
@@
void *consumer_thread_data_poll(void *data)
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+ health_code_update();
+
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
@@
-2407,6
+2433,8
@@
void *consumer_thread_data_poll(void *data)
}
while (1) {
}
while (1) {
+ health_code_update();
+
high_prio = 0;
num_hup = 0;
high_prio = 0;
num_hup = 0;
@@
-2459,7
+2487,9
@@
void *consumer_thread_data_poll(void *data)
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
+ health_poll_entry();
num_rdy = poll(pollfd, nb_fd + 1, -1);
num_rdy = poll(pollfd, nb_fd + 1, -1);
+ health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
@@
-2509,6
+2539,8
@@
void *consumer_thread_data_poll(void *data)
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
if (local_stream[i] == NULL) {
continue;
}
@@
-2537,6
+2569,8
@@
void *consumer_thread_data_poll(void *data)
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
if (local_stream[i] == NULL) {
continue;
}
@@
-2557,6
+2591,8
@@
void *consumer_thread_data_poll(void *data)
/* Handle hangup and errors */
for (i = 0; i < nb_fd; i++) {
/* Handle hangup and errors */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
if (local_stream[i] == NULL) {
continue;
}
@@
-2721,6
+2757,8
@@
void *consumer_thread_channel_poll(void *data)
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+ health_code_update();
+
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!channel_ht) {
/* ENOMEM at this point. Better to bail out. */
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!channel_ht) {
/* ENOMEM at this point. Better to bail out. */
@@
-2745,6
+2783,8
@@
void *consumer_thread_channel_poll(void *data)
DBG("Channel main loop started");
while (1) {
DBG("Channel main loop started");
while (1) {
+ health_code_update();
+
/* Only the channel pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
err = 0; /* All is OK */
/* Only the channel pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
err = 0; /* All is OK */
@@
-2753,7
+2793,9
@@
void *consumer_thread_channel_poll(void *data)
restart:
DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
restart:
DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
DBG("Channel event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
DBG("Channel event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
@@
-2767,6
+2809,8
@@
restart:
/* From here, the event is a channel wait fd */
for (i = 0; i < nb_fd; i++) {
/* From here, the event is a channel wait fd */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
@@
-2833,6
+2877,8
@@
restart:
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
send_node) {
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
send_node) {
+ health_code_update();
+
cds_list_del(&stream->send_node);
lttng_ustconsumer_del_stream(stream);
uatomic_sub(&stream->chan->refcount, 1);
cds_list_del(&stream->send_node);
lttng_ustconsumer_del_stream(stream);
uatomic_sub(&stream->chan->refcount, 1);
@@
-2967,6
+3013,8
@@
void *consumer_thread_sessiond_poll(void *data)
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+ health_code_update();
+
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@
-3027,7
+3075,12
@@
void *consumer_thread_sessiond_poll(void *data)
consumer_sockpoll[1].events = POLLIN | POLLPRI;
while (1) {
consumer_sockpoll[1].events = POLLIN | POLLPRI;
while (1) {
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_code_update();
+
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
goto end;
}
DBG("Incoming command on sock");
goto end;
}
DBG("Incoming command on sock");
@@
-3080,6
+3133,8
@@
end:
notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
+ notify_health_quit_pipe(health_quit_pipe);
+
/* Cleaning up possibly open sockets. */
if (sock >= 0) {
ret = close(sock);
/* Cleaning up possibly open sockets. */
if (sock >= 0) {
ret = close(sock);
This page took
0.03091 seconds
and
4
git commands to generate.