From f385ae0a8f5c34cc33ca57cdb412393f90f9c0a8 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 16 Sep 2013 09:47:16 -0500 Subject: [PATCH] relayd: add health instrumentation to threads Signed-off-by: Mathieu Desnoyers --- src/bin/lttng-relayd/main.c | 54 ++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index bf7be3e25..e7c57f32e 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -519,6 +519,8 @@ void *relay_thread_listener(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER); + health_code_update(); + control_sock = relay_init_sock(control_uri); if (!control_sock) { goto error_sock_control; @@ -550,10 +552,14 @@ void *relay_thread_listener(void *data) } while (1) { + health_code_update(); + DBG("Listener accepting connections"); restart: + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -568,6 +574,8 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { + health_code_update(); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -662,7 +670,8 @@ error_sock_relay: lttcomm_destroy_sock(control_sock); error_sock_control: if (err) { - DBG("Thread exited with error"); + health_error(); + ERR("Health error occurred in %s", __func__); } health_unregister(health_relayd); DBG("Relay listener thread cleanup complete"); @@ -676,7 +685,7 @@ error_sock_control: static void *relay_thread_dispatcher(void *data) { - int ret; + int ret, err = -1; struct cds_wfq_node *node; struct relay_command *relay_cmd = NULL; @@ -684,11 +693,17 @@ void *relay_thread_dispatcher(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER); + health_code_update(); + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + health_code_update(); + /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_cmd_queue.futex); do { + health_code_update(); + /* Dequeue commands */ node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue); if (node == NULL) { @@ -717,10 +732,19 @@ void *relay_thread_dispatcher(void *data) } while (node != NULL); /* Futex wait on queue. Blocking call on futex() */ + health_poll_entry(); futex_nto1_wait(&relay_cmd_queue.futex); + health_poll_exit(); } + /* Normal exit, no error */ + err = 0; + error: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } health_unregister(health_relayd); DBG("Dispatch thread dying"); stop_threads(); @@ -2156,6 +2180,8 @@ void *relay_thread_worker(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER); + health_code_update(); + /* table of connections indexed on socket */ relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (!relay_connections_ht) { @@ -2182,9 +2208,13 @@ restart: while (1) { int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1; + health_code_update(); + /* Infinite blocking call, waiting for transmission */ DBG3("Relayd worker thread polling..."); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -2207,6 +2237,8 @@ restart: uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -2309,6 +2341,9 @@ restart: if (last_seen_data_fd >= 0) { for (i = 0; i < nb_fd; i++) { int pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + if (last_seen_data_fd == pollfd) { idx = i; break; @@ -2322,6 +2357,8 @@ restart: uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(); + /* Skip the command pipe. It's handled in the first loop. */ if (pollfd == relay_cmd_pipe[0]) { continue; @@ -2371,6 +2408,9 @@ restart: last_seen_data_fd = -1; } + /* Normal exit, no error */ + ret = 0; + exit: error: lttng_poll_clean(&events); @@ -2378,6 +2418,8 @@ error: /* empty the hash table and free the memory */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) { + health_code_update(); + node = lttng_ht_iter_get_node_ulong(&iter); if (node) { relay_connection = caa_container_of(node, @@ -2397,11 +2439,15 @@ relay_connections_ht_error: if (err) { DBG("Thread exited with error"); } - health_unregister(health_relayd); DBG("Worker thread cleanup complete"); free(data_buffer); - stop_threads(); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_relayd); rcu_unregister_thread(); + stop_threads(); return NULL; } -- 2.34.1