relayd: add health check support for live threads
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index dc41caad031dade163e0353c008c5371edbec7b3..6727a547de65b2a94706a76b5f2089b5b33956a5 100644 (file)
@@ -57,6 +57,7 @@
 #include "utils.h"
 #include "lttng-relayd.h"
 #include "live.h"
+#include "health-relayd.h"
 
 /* command line options */
 char *opt_output_path;
@@ -114,6 +115,9 @@ struct lttng_ht *viewer_streams_ht;
 /* Global hash table that stores relay index object. */
 struct lttng_ht *indexes_ht;
 
+/* Relayd health monitoring */
+struct health_app *health_relayd;
+
 /*
  * usage function on stderr
  */
@@ -513,6 +517,10 @@ void *relay_thread_listener(void *data)
 
        DBG("[thread] Relay listener started");
 
+       health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
+
+       health_code_update();
+
        control_sock = relay_init_sock(control_uri);
        if (!control_sock) {
                goto error_sock_control;
@@ -544,10 +552,14 @@ void *relay_thread_listener(void *data)
        }
 
        while (1) {
+               health_code_update();
+
                DBG("Listener accepting connections");
 
 restart:
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -562,6 +574,8 @@ restart:
 
                DBG("Relay new connection received");
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
@@ -656,8 +670,10 @@ error_sock_relay:
        lttcomm_destroy_sock(control_sock);
 error_sock_control:
        if (err) {
-               DBG("Thread exited with error");
+               health_error();
+               ERR("Health error occurred in %s", __func__);
        }
+       health_unregister(health_relayd);
        DBG("Relay listener thread cleanup complete");
        stop_threads();
        return NULL;
@@ -669,17 +685,25 @@ error_sock_control:
 static
 void *relay_thread_dispatcher(void *data)
 {
-       int ret;
+       int ret, err = -1;
        struct cds_wfq_node *node;
        struct relay_command *relay_cmd = NULL;
 
        DBG("[thread] Relay dispatcher started");
 
+       health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+
+       health_code_update();
+
        while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+               health_code_update();
+
                /* Atomically prepare the queue futex */
                futex_nto1_prepare(&relay_cmd_queue.futex);
 
                do {
+                       health_code_update();
+
                        /* Dequeue commands */
                        node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
                        if (node == NULL) {
@@ -708,10 +732,20 @@ void *relay_thread_dispatcher(void *data)
                } while (node != NULL);
 
                /* Futex wait on queue. Blocking call on futex() */
+               health_poll_entry();
                futex_nto1_wait(&relay_cmd_queue.futex);
+               health_poll_exit();
        }
 
+       /* Normal exit, no error */
+       err = 0;
+
 error:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_relayd);
        DBG("Dispatch thread dying");
        stop_threads();
        return NULL;
@@ -2144,6 +2178,10 @@ void *relay_thread_worker(void *data)
 
        rcu_register_thread();
 
+       health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+
+       health_code_update();
+
        /* table of connections indexed on socket */
        relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        if (!relay_connections_ht) {
@@ -2170,9 +2208,13 @@ restart:
        while (1) {
                int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
 
+               health_code_update();
+
                /* Infinite blocking call, waiting for transmission */
                DBG3("Relayd worker thread polling...");
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -2195,6 +2237,8 @@ restart:
                        uint32_t revents = LTTNG_POLL_GETEV(&events, i);
                        int pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update();
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -2297,6 +2341,9 @@ restart:
                if (last_seen_data_fd >= 0) {
                        for (i = 0; i < nb_fd; i++) {
                                int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                               health_code_update();
+
                                if (last_seen_data_fd == pollfd) {
                                        idx = i;
                                        break;
@@ -2310,6 +2357,8 @@ restart:
                        uint32_t revents = LTTNG_POLL_GETEV(&events, i);
                        int pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update();
+
                        /* Skip the command pipe. It's handled in the first loop. */
                        if (pollfd == relay_cmd_pipe[0]) {
                                continue;
@@ -2359,6 +2408,9 @@ restart:
                last_seen_data_fd = -1;
        }
 
+       /* Normal exit, no error */
+       ret = 0;
+
 exit:
 error:
        lttng_poll_clean(&events);
@@ -2366,6 +2418,8 @@ error:
        /* empty the hash table and free the memory */
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+               health_code_update();
+
                node = lttng_ht_iter_get_node_ulong(&iter);
                if (node) {
                        relay_connection = caa_container_of(node,
@@ -2387,8 +2441,13 @@ relay_connections_ht_error:
        }
        DBG("Worker thread cleanup complete");
        free(data_buffer);
-       stop_threads();
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_relayd);
        rcu_unregister_thread();
+       stop_threads();
        return NULL;
 }
 
@@ -2503,6 +2562,13 @@ int main(int argc, char **argv)
                goto exit_relay_ctx_viewer_streams;
        }
 
+       /* Initialize thread health monitoring */
+       health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+       if (!health_relayd) {
+               PERROR("health_app_create error");
+               goto exit_health_app_create;
+       }
+
        /* Setup the dispatcher thread */
        ret = pthread_create(&dispatcher_thread, NULL,
                        relay_thread_dispatcher, (void *) NULL);
@@ -2530,30 +2596,36 @@ int main(int argc, char **argv)
        ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe);
        if (ret != 0) {
                ERR("Starting live viewer threads");
+               goto exit_live;
        }
 
-exit_listener:
+       live_stop_threads();
+
+exit_live:
        ret = pthread_join(listener_thread, &status);
        if (ret != 0) {
                PERROR("pthread_join");
                goto error;     /* join error, exit without cleanup */
        }
 
-exit_worker:
+exit_listener:
        ret = pthread_join(worker_thread, &status);
        if (ret != 0) {
                PERROR("pthread_join");
                goto error;     /* join error, exit without cleanup */
        }
 
-exit_dispatcher:
+exit_worker:
        ret = pthread_join(dispatcher_thread, &status);
        if (ret != 0) {
                PERROR("pthread_join");
                goto error;     /* join error, exit without cleanup */
        }
 
-       live_stop_threads();
+exit_dispatcher:
+       health_app_destroy(health_relayd);
+
+exit_health_app_create:
        lttng_ht_destroy(viewer_streams_ht);
 
 exit_relay_ctx_viewer_streams:
This page took 0.030652 seconds and 4 git commands to generate.