X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=02f676df8bc0e1bf56542e2c2394a5cabb59185c;hp=789e3300abc1e33a604a5d5aa78ffc816ae3ce4d;hb=2f8f53af90479595d530f8f02e71dd0b9fb810ee;hpb=0b242f6253f75bd4a2d138fba3c34a22653ab3bd diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 789e3300a..02f676df8 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -61,6 +61,8 @@ #include "lttng-relayd.h" #include "live.h" #include "health-relayd.h" +#include "testpoint.h" +#include "viewer-stream.h" /* command line options */ char *opt_output_path; @@ -753,6 +755,10 @@ void *relay_thread_listener(void *data) lttng_relay_notify_ready(); + if (testpoint(relayd_thread_listener)) { + goto error_testpoint; + } + while (1) { health_code_update(); @@ -853,6 +859,7 @@ restart: exit: error: error_poll_add: +error_testpoint: lttng_poll_clean(&events); error_create_poll: if (data_sock->fd >= 0) { @@ -896,6 +903,10 @@ void *relay_thread_dispatcher(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER); + if (testpoint(relayd_thread_dispatcher)) { + goto error_testpoint; + } + health_code_update(); while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { @@ -942,6 +953,7 @@ void *relay_thread_dispatcher(void *data) err = 0; error: +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -1023,7 +1035,7 @@ static void destroy_stream(struct relay_stream *stream) } } - vstream = live_find_viewer_stream_by_id(stream->stream_handle); + vstream = viewer_stream_find_by_id(stream->stream_handle); if (vstream) { /* * Set the last good value into the viewer stream. This is done @@ -1149,11 +1161,16 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, session->sock = cmd->sock; session->minor = cmd->minor; session->major = cmd->major; + pthread_mutex_init(&session->viewer_ready_lock, NULL); cmd->session = session; reply.session_id = htobe64(session->id); switch (cmd->minor) { + case 1: + case 2: + case 3: + break; case 4: /* LTTng sessiond 2.4 */ default: ret = cmd_create_session_2_4(cmd, session); @@ -1192,6 +1209,8 @@ void set_viewer_ready_flag(struct relay_command *cmd) { struct relay_stream_recv_handle *node, *tmp_node; + pthread_mutex_lock(&cmd->session->viewer_ready_lock); + cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) { struct relay_stream *stream; @@ -1214,6 +1233,7 @@ void set_viewer_ready_flag(struct relay_command *cmd) free(node); } + pthread_mutex_unlock(&cmd->session->viewer_ready_lock); return; } @@ -1335,11 +1355,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, * stream message is received, this list is emptied and streams are set * with the viewer ready flag. */ - if (stream->metadata_flag) { - stream->viewer_ready = 1; - } else { - queue_stream_handle(stream->stream_handle, cmd); - } + queue_stream_handle(stream->stream_handle, cmd); lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); @@ -2117,6 +2133,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, */ set_viewer_ready_flag(cmd); + /* + * Inform the viewer that there are new streams in the session. + */ + uatomic_set(&cmd->session->new_streams, 1); + reply.ret_code = htobe32(LTTNG_OK); send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); if (send_ret < 0) { @@ -2359,7 +2380,7 @@ int relay_process_data(struct relay_command *cmd) (stream->oldest_tracefile_id + 1) % stream->tracefile_count; } - vstream = live_find_viewer_stream_by_id(stream->stream_handle); + vstream = viewer_stream_find_by_id(stream->stream_handle); if (vstream) { /* * The viewer is reading a file about to be @@ -2560,6 +2581,10 @@ void *relay_thread_worker(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER); + if (testpoint(relayd_thread_worker)) { + goto error_testpoint; + } + health_code_update(); /* table of connections indexed on socket */ @@ -2821,6 +2846,7 @@ relay_connections_ht_error: } DBG("Worker thread cleanup complete"); free(data_buffer); +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__);