Fix: code refactoring of viewer streams in relayd
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 8e6cb4d28a86ef35f59094c00774b177f1369cd3..02f676df8bc0e1bf56542e2c2394a5cabb59185c 100644 (file)
@@ -62,6 +62,7 @@
 #include "live.h"
 #include "health-relayd.h"
 #include "testpoint.h"
+#include "viewer-stream.h"
 
 /* command line options */
 char *opt_output_path;
@@ -1034,7 +1035,7 @@ static void destroy_stream(struct relay_stream *stream)
                }
        }
 
-       vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+       vstream = viewer_stream_find_by_id(stream->stream_handle);
        if (vstream) {
                /*
                 * Set the last good value into the viewer stream. This is done
@@ -1160,11 +1161,16 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
        session->sock = cmd->sock;
        session->minor = cmd->minor;
        session->major = cmd->major;
+       pthread_mutex_init(&session->viewer_ready_lock, NULL);
        cmd->session = session;
 
        reply.session_id = htobe64(session->id);
 
        switch (cmd->minor) {
+               case 1:
+               case 2:
+               case 3:
+                       break;
                case 4: /* LTTng sessiond 2.4 */
                default:
                        ret = cmd_create_session_2_4(cmd, session);
@@ -1203,6 +1209,8 @@ void set_viewer_ready_flag(struct relay_command *cmd)
 {
        struct relay_stream_recv_handle *node, *tmp_node;
 
+       pthread_mutex_lock(&cmd->session->viewer_ready_lock);
+
        cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
                struct relay_stream *stream;
 
@@ -1225,6 +1233,7 @@ void set_viewer_ready_flag(struct relay_command *cmd)
                free(node);
        }
 
+       pthread_mutex_unlock(&cmd->session->viewer_ready_lock);
        return;
 }
 
@@ -1346,11 +1355,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
         * stream message is received, this list is emptied and streams are set
         * with the viewer ready flag.
         */
-       if (stream->metadata_flag) {
-               stream->viewer_ready = 1;
-       } else {
-               queue_stream_handle(stream->stream_handle, cmd);
-       }
+       queue_stream_handle(stream->stream_handle, cmd);
 
        lttng_ht_node_init_ulong(&stream->stream_n,
                        (unsigned long) stream->stream_handle);
@@ -2128,6 +2133,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
         */
        set_viewer_ready_flag(cmd);
 
+       /*
+        * Inform the viewer that there are new streams in the session.
+        */
+       uatomic_set(&cmd->session->new_streams, 1);
+
        reply.ret_code = htobe32(LTTNG_OK);
        send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
        if (send_ret < 0) {
@@ -2370,7 +2380,7 @@ int relay_process_data(struct relay_command *cmd)
                                (stream->oldest_tracefile_id + 1) %
                                stream->tracefile_count;
                }
-               vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+               vstream = viewer_stream_find_by_id(stream->stream_handle);
                if (vstream) {
                        /*
                         * The viewer is reading a file about to be
This page took 0.024038 seconds and 4 git commands to generate.