Implement consumer health check thread
[lttng-tools.git] / src / common / consumer.c
index 6abd8b1e86de34c71d330ac68726846dd971da3b..6f3a02d2619eeef095684fc927399fca7a9404dd 100644 (file)
@@ -94,6 +94,18 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
        (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
 }
 
+static void notify_health_quit_pipe(int *pipe)
+{
+       int ret;
+
+       do {
+               ret = write(pipe[1], "4", 1);
+       } while (ret < 0 && errno == EINTR);
+       if (ret < 0 || ret != 1) {
+               PERROR("write consumer health quit");
+       }
+}
+
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
                uint64_t key,
@@ -2196,6 +2208,8 @@ void *consumer_thread_metadata_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
 
+       health_code_update();
+
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2221,6 +2235,8 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the metadata pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
                        err = 0;        /* All is OK */
@@ -2229,7 +2245,9 @@ void *consumer_thread_metadata_poll(void *data)
 
 restart:
                DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2243,6 +2261,8 @@ restart:
 
                /* From here, the event is a metadata wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
@@ -2312,6 +2332,8 @@ restart:
 
                                        /* We just flushed the stream now read it. */
                                        do {
+                                               health_code_update();
+
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
@@ -2334,6 +2356,8 @@ restart:
                                assert(stream->wait_fd == pollfd);
 
                                do {
+                                       health_code_update();
+
                                        len = ctx->on_buffer_ready(stream, ctx);
                                        /*
                                         * We don't check the return value here since if we get
@@ -2394,6 +2418,8 @@ void *consumer_thread_data_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
 
+       health_code_update();
+
        data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2407,6 +2433,8 @@ void *consumer_thread_data_poll(void *data)
        }
 
        while (1) {
+               health_code_update();
+
                high_prio = 0;
                num_hup = 0;
 
@@ -2459,7 +2487,9 @@ void *consumer_thread_data_poll(void *data)
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
+               health_poll_entry();
                num_rdy = poll(pollfd, nb_fd + 1, -1);
+               health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2509,6 +2539,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2537,6 +2569,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2557,6 +2591,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Handle hangup and errors */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2721,6 +2757,8 @@ void *consumer_thread_channel_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
 
+       health_code_update();
+
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!channel_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2745,6 +2783,8 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the channel pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
                        err = 0;        /* All is OK */
@@ -2753,7 +2793,9 @@ void *consumer_thread_channel_poll(void *data)
 
 restart:
                DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2767,6 +2809,8 @@ restart:
 
                /* From here, the event is a channel wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
@@ -2833,6 +2877,8 @@ restart:
                                                        /* Delete streams that might have been left in the stream list. */
                                                        cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
                                                                        send_node) {
+                                                               health_code_update();
+
                                                                cds_list_del(&stream->send_node);
                                                                lttng_ustconsumer_del_stream(stream);
                                                                uatomic_sub(&stream->chan->refcount, 1);
@@ -2967,6 +3013,8 @@ void *consumer_thread_sessiond_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
 
+       health_code_update();
+
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
        unlink(ctx->consumer_command_sock_path);
        client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@ -3027,7 +3075,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
        while (1) {
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_code_update();
+
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        goto end;
                }
                DBG("Incoming command on sock");
@@ -3080,6 +3133,8 @@ end:
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
+       notify_health_quit_pipe(health_quit_pipe);
+
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
                ret = close(sock);
This page took 0.026305 seconds and 4 git commands to generate.