X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=80c3da57ac63d2ee0fd69b5942e063c7aefa4f16;hb=9b5e086337f67aaf99375fc39d911d167fd8c778;hp=6864ff4eca19c717c79098b906d54d66ec6562c5;hpb=a4baae1b0463bc4ce65c2a458c4a941e7fabc594;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 6864ff4ec..80c3da57a 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -57,15 +57,10 @@ #include "lttng-viewer.h" #include "utils.h" #include "health-relayd.h" +#include "testpoint.h" static struct lttng_uri *live_uri; -/* - * Quit pipe for all threads. This permits a single cancellation point - * for all threads when receiving an event on the pipe. - */ -static int live_thread_quit_pipe[2] = { -1, -1 }; - /* * This pipe is used to inform the worker thread that a command is queued and * ready to be processed. @@ -126,7 +121,7 @@ void stop_threads(void) /* Stopping all threads */ DBG("Terminating all live threads"); - ret = notify_thread_pipe(live_thread_quit_pipe[1]); + ret = notify_thread_pipe(thread_quit_pipe[1]); if (ret < 0) { ERR("write error on thread quit pipe"); } @@ -155,7 +150,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size) } /* Add quit pipe */ - ret = lttng_poll_add(events, live_thread_quit_pipe[0], LPOLLIN); + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -174,7 +169,7 @@ error: static int check_thread_quit_pipe(int fd, uint32_t events) { - if (fd == live_thread_quit_pipe[0] && (events & LPOLLIN)) { + if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { return 1; } @@ -245,9 +240,7 @@ void *thread_listener(void *data) goto error_sock_control; } - /* - * Pass 3 as size here for the thread quit pipe, control and data socket. - */ + /* Pass 2 as size here for the thread quit pipe and control sockets. */ ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_create_poll; @@ -259,6 +252,12 @@ void *thread_listener(void *data) goto error_poll_add; } + lttng_relay_notify_ready(); + + if (testpoint(relayd_thread_live_listener)) { + goto error_testpoint; + } + while (1) { health_code_update(); @@ -346,6 +345,7 @@ restart: exit: error: error_poll_add: +error_testpoint: lttng_poll_clean(&events); error_create_poll: if (live_control_sock->fd >= 0) { @@ -381,6 +381,10 @@ void *thread_dispatcher(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_DISPATCHER); + if (testpoint(relayd_thread_live_dispatcher)) { + goto error_testpoint; + } + health_code_update(); while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) { @@ -429,6 +433,7 @@ void *thread_dispatcher(void *data) err = 0; error: +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -1120,7 +1125,8 @@ void destroy_viewer_stream(struct relay_viewer_stream *vstream) * we need to remove it because we won't detect a EOF for this * stream. */ - if (ret_ref == 1 && vstream->ctf_trace->metadata_stream) { + if (ret_ref == 1 && vstream->ctf_trace->viewer_metadata_stream) { + delete_viewer_stream(vstream->ctf_trace->viewer_metadata_stream); destroy_viewer_stream(vstream->ctf_trace->viewer_metadata_stream); vstream->ctf_trace->metadata_stream = NULL; DBG("Freeing ctf_trace %" PRIu64, vstream->ctf_trace->id); @@ -1830,6 +1836,10 @@ void *thread_worker(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_WORKER); + if (testpoint(relayd_thread_live_worker)) { + goto error_testpoint; + } + /* table of connections indexed on socket */ relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (!relay_connections_ht) { @@ -1985,6 +1995,7 @@ relay_connections_ht_error: DBG("Viewer worker thread exited with error"); } DBG("Viewer worker thread cleanup complete"); +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -2043,7 +2054,7 @@ error: * main */ int live_start_threads(struct lttng_uri *uri, - struct relay_local_data *relay_ctx, int quit_pipe[2]) + struct relay_local_data *relay_ctx) { int ret = 0; void *status; @@ -2052,9 +2063,6 @@ int live_start_threads(struct lttng_uri *uri, assert(uri); live_uri = uri; - live_thread_quit_pipe[0] = quit_pipe[0]; - live_thread_quit_pipe[1] = quit_pipe[1]; - /* Check if daemon is UID = 0 */ is_root = !getuid();