Fix: notify the viewer if new streams got added
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 80c3da57ac63d2ee0fd69b5942e063c7aefa4f16..1a691a849c11e972fee17059df4d5c7068846f01 100644 (file)
@@ -1142,6 +1142,40 @@ void destroy_viewer_stream(struct relay_viewer_stream *vstream)
        call_rcu(&vstream->rcu_node, deferred_free_viewer_stream);
 }
 
+/*
+ * Atomically check if new streams got added in the session since the last
+ * check and reset the flag to 0.
+ *
+ * Returns 1 if new streams got added, 0 if nothing changed, a negative value
+ * on error.
+ */
+static
+int check_new_streams(uint64_t session_id, struct lttng_ht *sessions_ht)
+{
+       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_iter iter;
+       struct relay_session *session;
+       unsigned long current_val;
+       int ret;
+
+       lttng_ht_lookup(sessions_ht,
+                       (void *)((unsigned long) session_id), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               DBG("Relay session %" PRIu64 " not found", session_id);
+               ret = -1;
+               goto error;
+       }
+
+       session = caa_container_of(node, struct relay_session, session_n);
+
+       current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
+       ret = current_val;
+
+error:
+       return ret;
+}
+
 /*
  * Send the next index for a stream.
  *
@@ -1266,6 +1300,13 @@ int viewer_get_next_index(struct relay_command *cmd,
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
+       ret = check_new_streams(vstream->session_id, sessions_ht);
+       if (ret < 0) {
+               goto end_unlock;
+       } else if (ret == 1) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        pthread_mutex_lock(&vstream->overwrite_lock);
        if (vstream->abort_flag) {
                /*
@@ -1354,7 +1395,8 @@ end:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_get_packet(struct relay_command *cmd)
+int viewer_get_packet(struct relay_command *cmd,
+               struct lttng_ht *sessions_ht)
 {
        int ret, send_data = 0;
        char *data = NULL;
@@ -1429,6 +1471,15 @@ int viewer_get_packet(struct relay_command *cmd)
                goto send_reply;
        }
 
+       ret = check_new_streams(stream->session_id, sessions_ht);
+       if (ret < 0) {
+               goto end_unlock;
+       } else if (ret == 1) {
+               reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+               reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+               goto send_reply;
+       }
+
        len = be32toh(get_packet_info.len);
        data = zmalloc(len);
        if (!data) {
@@ -1663,7 +1714,7 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
                ret = viewer_get_next_index(cmd, sessions_ht);
                break;
        case VIEWER_GET_PACKET:
-               ret = viewer_get_packet(cmd);
+               ret = viewer_get_packet(cmd, sessions_ht);
                break;
        case VIEWER_GET_METADATA:
                ret = viewer_get_metadata(cmd);
This page took 0.023949 seconds and 4 git commands to generate.