X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=d8105e4058128bdc9a902bb8051fc991971f2b90;hb=601262d65a9ccd90198558639ef2a73cda4230e1;hp=86bad26b3d5c028bcdf2a2f8dffbc4859a8fe2ba;hpb=3fd2739803ea7273c6483060ac042942af06b1d4;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 86bad26b3..d8105e405 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -61,6 +61,7 @@ #include "lttng-relayd.h" #include "live.h" #include "health-relayd.h" +#include "testpoint.h" /* command line options */ char *opt_output_path; @@ -90,7 +91,7 @@ const char * const config_section_name = "relayd"; * Quit pipe for all threads. This permits a single cancellation point * for all threads when receiving an event on the pipe. */ -static int thread_quit_pipe[2] = { -1, -1 }; +int thread_quit_pipe[2] = { -1, -1 }; /* * This pipe is used to inform the worker thread that a command is queued and @@ -616,7 +617,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size) } /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -753,6 +754,10 @@ void *relay_thread_listener(void *data) lttng_relay_notify_ready(); + if (testpoint(relayd_thread_listener)) { + goto error_testpoint; + } + while (1) { health_code_update(); @@ -853,6 +858,7 @@ restart: exit: error: error_poll_add: +error_testpoint: lttng_poll_clean(&events); error_create_poll: if (data_sock->fd >= 0) { @@ -896,6 +902,10 @@ void *relay_thread_dispatcher(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER); + if (testpoint(relayd_thread_dispatcher)) { + goto error_testpoint; + } + health_code_update(); while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { @@ -942,6 +952,7 @@ void *relay_thread_dispatcher(void *data) err = 0; error: +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -1154,6 +1165,10 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, reply.session_id = htobe64(session->id); switch (cmd->minor) { + case 1: + case 2: + case 3: + break; case 4: /* LTTng sessiond 2.4 */ default: ret = cmd_create_session_2_4(cmd, session); @@ -1335,11 +1350,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, * stream message is received, this list is emptied and streams are set * with the viewer ready flag. */ - if (stream->metadata_flag) { - stream->viewer_ready = 1; - } else { - queue_stream_handle(stream->stream_handle, cmd); - } + queue_stream_handle(stream->stream_handle, cmd); lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); @@ -2560,6 +2571,10 @@ void *relay_thread_worker(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER); + if (testpoint(relayd_thread_worker)) { + goto error_testpoint; + } + health_code_update(); /* table of connections indexed on socket */ @@ -2821,6 +2836,7 @@ relay_connections_ht_error: } DBG("Worker thread cleanup complete"); free(data_buffer); +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -2998,7 +3014,7 @@ int main(int argc, char **argv) goto exit_listener; } - ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe); + ret = live_start_threads(live_uri, relay_ctx); if (ret != 0) { ERR("Starting live viewer threads"); goto exit_live;