X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=02f676df8bc0e1bf56542e2c2394a5cabb59185c;hb=2f8f53af90479595d530f8f02e71dd0b9fb810ee;hp=fe4a898bc6d349dfd866218e597d77ef12a746aa;hpb=719af1bb6e6719c6170e059a7e473aa7923ddae9;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index fe4a898bc..02f676df8 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -62,6 +62,7 @@ #include "live.h" #include "health-relayd.h" #include "testpoint.h" +#include "viewer-stream.h" /* command line options */ char *opt_output_path; @@ -1034,7 +1035,7 @@ static void destroy_stream(struct relay_stream *stream) } } - vstream = live_find_viewer_stream_by_id(stream->stream_handle); + vstream = viewer_stream_find_by_id(stream->stream_handle); if (vstream) { /* * Set the last good value into the viewer stream. This is done @@ -1160,6 +1161,7 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, session->sock = cmd->sock; session->minor = cmd->minor; session->major = cmd->major; + pthread_mutex_init(&session->viewer_ready_lock, NULL); cmd->session = session; reply.session_id = htobe64(session->id); @@ -1207,6 +1209,8 @@ void set_viewer_ready_flag(struct relay_command *cmd) { struct relay_stream_recv_handle *node, *tmp_node; + pthread_mutex_lock(&cmd->session->viewer_ready_lock); + cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) { struct relay_stream *stream; @@ -1229,6 +1233,7 @@ void set_viewer_ready_flag(struct relay_command *cmd) free(node); } + pthread_mutex_unlock(&cmd->session->viewer_ready_lock); return; } @@ -1350,11 +1355,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, * stream message is received, this list is emptied and streams are set * with the viewer ready flag. */ - if (stream->metadata_flag) { - stream->viewer_ready = 1; - } else { - queue_stream_handle(stream->stream_handle, cmd); - } + queue_stream_handle(stream->stream_handle, cmd); lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); @@ -2132,6 +2133,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, */ set_viewer_ready_flag(cmd); + /* + * Inform the viewer that there are new streams in the session. + */ + uatomic_set(&cmd->session->new_streams, 1); + reply.ret_code = htobe32(LTTNG_OK); send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); if (send_ret < 0) { @@ -2374,7 +2380,7 @@ int relay_process_data(struct relay_command *cmd) (stream->oldest_tracefile_id + 1) % stream->tracefile_count; } - vstream = live_find_viewer_stream_by_id(stream->stream_handle); + vstream = viewer_stream_find_by_id(stream->stream_handle); if (vstream) { /* * The viewer is reading a file about to be