#include "consumer.h"
#include "consumer-stream.h"
+#include "../bin/lttng-consumerd/health-consumerd.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
*/
void *consumer_thread_metadata_poll(void *data)
{
- int ret, i, pollfd;
+ int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
while (1) {
/* Only the metadata pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
}
}
+ /* All is OK */
+ err = 0;
error:
end:
DBG("Metadata poll thread exiting");
end_poll:
destroy_stream_ht(metadata_ht);
end_ht:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i;
+ int num_rdy, num_hup, high_prio, ret, i, err = -1;
struct pollfd *pollfd = NULL;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
/* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
}
}
}
+ /* All is OK */
+ err = 0;
end:
DBG("polling thread exiting");
free(pollfd);
destroy_data_stream_ht(data_ht);
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
+
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_channel_poll(void *data)
{
- int ret, i, pollfd;
+ int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_consumer_channel *chan = NULL;
struct lttng_ht_iter iter;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!channel_ht) {
/* ENOMEM at this point. Better to bail out. */
while (1) {
/* Only the channel pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
}
}
+ /* All is OK */
+ err = 0;
end:
lttng_poll_clean(&events);
end_poll:
destroy_channel_ht(channel_ht);
end_ht:
DBG("Channel poll thread exiting");
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock = -1, client_socket, ret;
+ int sock = -1, client_socket, ret, err = -1;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
}
if (consumer_quit) {
DBG("consumer_thread_receive_fds received quit from signal");
+ err = 0; /* All is OK */
goto end;
}
DBG("received command on sock");
}
+ /* All is OK */
+ err = 0;
+
end:
DBG("Consumer thread sessiond poll exiting");
}
}
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
+
rcu_unregister_thread();
return NULL;
}