X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=33aad88ee3e5d0d274790dd4edef675ffeeb06db;hb=51a9e1c7f7fd48e2b53e258aee269a69cb8b59d3;hp=dffcaff9855a61b265bba6b3eb376a990188a76e;hpb=7d2f74525fbda4dcc744f33ea26c911545b5df13;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index dffcaff98..33aad88ee 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -57,6 +57,7 @@ #include "utils.h" #include "lttng-relayd.h" #include "live.h" +#include "health-relayd.h" /* command line options */ char *opt_output_path; @@ -67,6 +68,8 @@ static struct lttng_uri *live_uri; const char *progname; +const char *tracing_group_name = DEFAULT_TRACING_GROUP; + /* * Quit pipe for all threads. This permits a single cancellation point * for all threads when receiving an event on the pipe. @@ -85,6 +88,7 @@ static int dispatch_thread_exit; static pthread_t listener_thread; static pthread_t dispatcher_thread; static pthread_t worker_thread; +static pthread_t health_thread; static uint64_t last_relay_stream_id; static uint64_t last_relay_session_id; @@ -114,6 +118,9 @@ struct lttng_ht *viewer_streams_ht; /* Global hash table that stores relay index object. */ struct lttng_ht *indexes_ht; +/* Relayd health monitoring */ +struct health_app *health_relayd; + /* * usage function on stderr */ @@ -127,6 +134,7 @@ void usage(void) fprintf(stderr, " -D, --data-port URL Data port listening.\n"); fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); } static @@ -140,6 +148,7 @@ int parse_args(int argc, char **argv) { "control-port", 1, 0, 'C', }, { "data-port", 1, 0, 'D', }, { "daemonize", 0, 0, 'd', }, + { "group", 1, 0, 'g', }, { "help", 0, 0, 'h', }, { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, @@ -148,7 +157,7 @@ int parse_args(int argc, char **argv) while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhv" "C:D:o:", + c = getopt_long(argc, argv, "dhv" "C:D:o:g:", long_options, &option_index); if (c == -1) { break; @@ -184,6 +193,9 @@ int parse_args(int argc, char **argv) case 'd': opt_daemon = 1; break; + case 'g': + tracing_group_name = optarg; + break; case 'h': usage(); exit(EXIT_FAILURE); @@ -293,6 +305,18 @@ int notify_thread_pipe(int wpipe) return ret; } +static void notify_health_quit_pipe(int *pipe) +{ + int ret; + + do { + ret = write(pipe[1], "4", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write relay health quit"); + } +} + /* * Stop all threads by closing the thread quit pipe. */ @@ -308,6 +332,8 @@ void stop_threads(void) ERR("write error on thread quit pipe"); } + notify_health_quit_pipe(health_quit_pipe); + /* Dispatch thread */ CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&relay_cmd_queue.futex); @@ -513,6 +539,10 @@ void *relay_thread_listener(void *data) DBG("[thread] Relay listener started"); + health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER); + + health_code_update(); + control_sock = relay_init_sock(control_uri); if (!control_sock) { goto error_sock_control; @@ -544,10 +574,14 @@ void *relay_thread_listener(void *data) } while (1) { + health_code_update(); + DBG("Listener accepting connections"); restart: + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -562,6 +596,8 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { + health_code_update(); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -656,8 +692,10 @@ error_sock_relay: lttcomm_destroy_sock(control_sock); error_sock_control: if (err) { - DBG("Thread exited with error"); + health_error(); + ERR("Health error occurred in %s", __func__); } + health_unregister(health_relayd); DBG("Relay listener thread cleanup complete"); stop_threads(); return NULL; @@ -669,17 +707,25 @@ error_sock_control: static void *relay_thread_dispatcher(void *data) { - int ret; + int ret, err = -1; struct cds_wfq_node *node; struct relay_command *relay_cmd = NULL; DBG("[thread] Relay dispatcher started"); + health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER); + + health_code_update(); + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + health_code_update(); + /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_cmd_queue.futex); do { + health_code_update(); + /* Dequeue commands */ node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue); if (node == NULL) { @@ -708,10 +754,20 @@ void *relay_thread_dispatcher(void *data) } while (node != NULL); /* Futex wait on queue. Blocking call on futex() */ + health_poll_entry(); futex_nto1_wait(&relay_cmd_queue.futex); + health_poll_exit(); } + /* Normal exit, no error */ + err = 0; + error: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_relayd); DBG("Dispatch thread dying"); stop_threads(); return NULL; @@ -2144,6 +2200,10 @@ void *relay_thread_worker(void *data) rcu_register_thread(); + health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER); + + health_code_update(); + /* table of connections indexed on socket */ relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (!relay_connections_ht) { @@ -2170,9 +2230,13 @@ restart: while (1) { int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1; + health_code_update(); + /* Infinite blocking call, waiting for transmission */ DBG3("Relayd worker thread polling..."); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -2195,6 +2259,8 @@ restart: uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -2297,6 +2363,9 @@ restart: if (last_seen_data_fd >= 0) { for (i = 0; i < nb_fd; i++) { int pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + if (last_seen_data_fd == pollfd) { idx = i; break; @@ -2310,6 +2379,8 @@ restart: uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(); + /* Skip the command pipe. It's handled in the first loop. */ if (pollfd == relay_cmd_pipe[0]) { continue; @@ -2359,6 +2430,9 @@ restart: last_seen_data_fd = -1; } + /* Normal exit, no error */ + ret = 0; + exit: error: lttng_poll_clean(&events); @@ -2366,6 +2440,8 @@ error: /* empty the hash table and free the memory */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) { + health_code_update(); + node = lttng_ht_iter_get_node_ulong(&iter); if (node) { relay_connection = caa_container_of(node, @@ -2387,8 +2463,13 @@ relay_connections_ht_error: } DBG("Worker thread cleanup complete"); free(data_buffer); - stop_threads(); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_relayd); rcu_unregister_thread(); + stop_threads(); return NULL; } @@ -2503,6 +2584,26 @@ int main(int argc, char **argv) goto exit_relay_ctx_viewer_streams; } + /* Initialize thread health monitoring */ + health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); + if (!health_relayd) { + PERROR("health_app_create error"); + goto exit_health_app_create; + } + + ret = utils_create_pipe(health_quit_pipe); + if (ret < 0) { + goto error_health_pipe; + } + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto health_error; + } + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL); @@ -2527,31 +2628,52 @@ int main(int argc, char **argv) goto exit_listener; } - ret = live_start_threads(live_uri, relay_ctx); + ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe); if (ret != 0) { ERR("Starting live viewer threads"); + goto exit_live; } -exit_listener: +exit_live: ret = pthread_join(listener_thread, &status); if (ret != 0) { PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } -exit_worker: +exit_listener: ret = pthread_join(worker_thread, &status); if (ret != 0) { PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } -exit_dispatcher: +exit_worker: ret = pthread_join(dispatcher_thread, &status); if (ret != 0) { PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } + +exit_dispatcher: + ret = pthread_join(health_thread, &status); + if (ret != 0) { + PERROR("pthread_join health thread"); + goto error; /* join error, exit without cleanup */ + } + + /* + * Stop live threads only after joining other threads. + */ + live_stop_threads(); + +health_error: + utils_close_pipe(health_quit_pipe); + +error_health_pipe: + health_app_destroy(health_relayd); + +exit_health_app_create: lttng_ht_destroy(viewer_streams_ht); exit_relay_ctx_viewer_streams: @@ -2564,7 +2686,6 @@ exit_relay_ctx_sessions: free(relay_ctx); exit: - live_stop_threads(); cleanup(); if (!ret) { exit(EXIT_SUCCESS);