X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=380b4f833042cad3dcf7809e1871016fda753506;hb=4a9daf1745ccbd2aab029206a664f39fcbd640ce;hp=789e3300abc1e33a604a5d5aa78ffc816ae3ce4d;hpb=0b242f6253f75bd4a2d138fba3c34a22653ab3bd;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 789e3300a..380b4f833 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -61,6 +61,7 @@ #include "lttng-relayd.h" #include "live.h" #include "health-relayd.h" +#include "testpoint.h" /* command line options */ char *opt_output_path; @@ -753,6 +754,10 @@ void *relay_thread_listener(void *data) lttng_relay_notify_ready(); + if (testpoint(relayd_thread_listener)) { + goto error_testpoint; + } + while (1) { health_code_update(); @@ -853,6 +858,7 @@ restart: exit: error: error_poll_add: +error_testpoint: lttng_poll_clean(&events); error_create_poll: if (data_sock->fd >= 0) { @@ -896,6 +902,10 @@ void *relay_thread_dispatcher(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER); + if (testpoint(relayd_thread_dispatcher)) { + goto error_testpoint; + } + health_code_update(); while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { @@ -942,6 +952,7 @@ void *relay_thread_dispatcher(void *data) err = 0; error: +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -1154,6 +1165,10 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, reply.session_id = htobe64(session->id); switch (cmd->minor) { + case 1: + case 2: + case 3: + break; case 4: /* LTTng sessiond 2.4 */ default: ret = cmd_create_session_2_4(cmd, session); @@ -1335,11 +1350,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, * stream message is received, this list is emptied and streams are set * with the viewer ready flag. */ - if (stream->metadata_flag) { - stream->viewer_ready = 1; - } else { - queue_stream_handle(stream->stream_handle, cmd); - } + queue_stream_handle(stream->stream_handle, cmd); lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); @@ -2117,6 +2128,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, */ set_viewer_ready_flag(cmd); + /* + * Inform the viewer that there are new streams in the session. + */ + uatomic_set(&cmd->session->new_streams, 1); + reply.ret_code = htobe32(LTTNG_OK); send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); if (send_ret < 0) { @@ -2560,6 +2576,10 @@ void *relay_thread_worker(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER); + if (testpoint(relayd_thread_worker)) { + goto error_testpoint; + } + health_code_update(); /* table of connections indexed on socket */ @@ -2821,6 +2841,7 @@ relay_connections_ht_error: } DBG("Worker thread cleanup complete"); free(data_buffer); +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__);