projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Health check test: add relayd check support
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index dffcaff9855a61b265bba6b3eb376a990188a76e..e7c57f32e2457595b31d20709837c5b7727741be 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-57,6
+57,7
@@
#include "utils.h"
#include "lttng-relayd.h"
#include "live.h"
#include "utils.h"
#include "lttng-relayd.h"
#include "live.h"
+#include "health-relayd.h"
/* command line options */
char *opt_output_path;
/* command line options */
char *opt_output_path;
@@
-114,6
+115,9
@@
struct lttng_ht *viewer_streams_ht;
/* Global hash table that stores relay index object. */
struct lttng_ht *indexes_ht;
/* Global hash table that stores relay index object. */
struct lttng_ht *indexes_ht;
+/* Relayd health monitoring */
+static struct health_app *health_relayd;
+
/*
* usage function on stderr
*/
/*
* usage function on stderr
*/
@@
-513,6
+517,10
@@
void *relay_thread_listener(void *data)
DBG("[thread] Relay listener started");
DBG("[thread] Relay listener started");
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
+
+ health_code_update();
+
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
@@
-544,10
+552,14
@@
void *relay_thread_listener(void *data)
}
while (1) {
}
while (1) {
+ health_code_update();
+
DBG("Listener accepting connections");
restart:
DBG("Listener accepting connections");
restart:
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
@@
-562,6
+574,8
@@
restart:
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
@@
-656,8
+670,10
@@
error_sock_relay:
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
- DBG("Thread exited with error");
+ health_error();
+ ERR("Health error occurred in %s", __func__);
}
}
+ health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
stop_threads();
return NULL;
DBG("Relay listener thread cleanup complete");
stop_threads();
return NULL;
@@
-669,17
+685,25
@@
error_sock_control:
static
void *relay_thread_dispatcher(void *data)
{
static
void *relay_thread_dispatcher(void *data)
{
- int ret;
+ int ret
, err = -1
;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
DBG("[thread] Relay dispatcher started");
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
DBG("[thread] Relay dispatcher started");
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+
+ health_code_update();
+
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ health_code_update();
+
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
+ health_code_update();
+
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
@@
-708,10
+732,20
@@
void *relay_thread_dispatcher(void *data)
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
+ health_poll_entry();
futex_nto1_wait(&relay_cmd_queue.futex);
futex_nto1_wait(&relay_cmd_queue.futex);
+ health_poll_exit();
}
}
+ /* Normal exit, no error */
+ err = 0;
+
error:
error:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
return NULL;
DBG("Dispatch thread dying");
stop_threads();
return NULL;
@@
-2144,6
+2178,10
@@
void *relay_thread_worker(void *data)
rcu_register_thread();
rcu_register_thread();
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+
+ health_code_update();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
@@
-2170,9
+2208,13
@@
restart:
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ health_code_update();
+
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
@@
-2195,6
+2237,8
@@
restart:
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
@@
-2297,6
+2341,9
@@
restart:
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
if (last_seen_data_fd == pollfd) {
idx = i;
break;
if (last_seen_data_fd == pollfd) {
idx = i;
break;
@@
-2310,6
+2357,8
@@
restart:
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
@@
-2359,6
+2408,9
@@
restart:
last_seen_data_fd = -1;
}
last_seen_data_fd = -1;
}
+ /* Normal exit, no error */
+ ret = 0;
+
exit:
error:
lttng_poll_clean(&events);
exit:
error:
lttng_poll_clean(&events);
@@
-2366,6
+2418,8
@@
error:
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+ health_code_update();
+
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
@@
-2387,8
+2441,13
@@
relay_connections_ht_error:
}
DBG("Worker thread cleanup complete");
free(data_buffer);
}
DBG("Worker thread cleanup complete");
free(data_buffer);
- stop_threads();
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
rcu_unregister_thread();
rcu_unregister_thread();
+ stop_threads();
return NULL;
}
return NULL;
}
@@
-2503,6
+2562,13
@@
int main(int argc, char **argv)
goto exit_relay_ctx_viewer_streams;
}
goto exit_relay_ctx_viewer_streams;
}
+ /* Initialize thread health monitoring */
+ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+ if (!health_relayd) {
+ PERROR("health_app_create error");
+ goto exit_health_app_create;
+ }
+
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
@@
-2527,31
+2593,39
@@
int main(int argc, char **argv)
goto exit_listener;
}
goto exit_listener;
}
- ret = live_start_threads(live_uri, relay_ctx);
+ ret = live_start_threads(live_uri, relay_ctx
, thread_quit_pipe
);
if (ret != 0) {
ERR("Starting live viewer threads");
if (ret != 0) {
ERR("Starting live viewer threads");
+ goto exit_live;
}
}
-exit_listener:
+ live_stop_threads();
+
+exit_live:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
-exit_
work
er:
+exit_
listen
er:
ret = pthread_join(worker_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
ret = pthread_join(worker_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
-exit_
dispatch
er:
+exit_
work
er:
ret = pthread_join(dispatcher_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
ret = pthread_join(dispatcher_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
+
+exit_dispatcher:
+ health_app_destroy(health_relayd);
+
+exit_health_app_create:
lttng_ht_destroy(viewer_streams_ht);
exit_relay_ctx_viewer_streams:
lttng_ht_destroy(viewer_streams_ht);
exit_relay_ctx_viewer_streams:
@@
-2564,7
+2638,6
@@
exit_relay_ctx_sessions:
free(relay_ctx);
exit:
free(relay_ctx);
exit:
- live_stop_threads();
cleanup();
if (!ret) {
exit(EXIT_SUCCESS);
cleanup();
if (!ret) {
exit(EXIT_SUCCESS);
This page took
0.028123 seconds
and
4
git commands to generate.