Fix: notify the viewer if new streams got added
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 15 Jan 2014 16:54:38 +0000 (11:54 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 7 Feb 2014 20:27:33 +0000 (15:27 -0500)
As soon as new streams are available, the viewer must start reading
them, otherwise we risk to ignore complete streams.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c

index 80c3da57ac63d2ee0fd69b5942e063c7aefa4f16..1a691a849c11e972fee17059df4d5c7068846f01 100644 (file)
@@ -1142,6 +1142,40 @@ void destroy_viewer_stream(struct relay_viewer_stream *vstream)
        call_rcu(&vstream->rcu_node, deferred_free_viewer_stream);
 }
 
        call_rcu(&vstream->rcu_node, deferred_free_viewer_stream);
 }
 
+/*
+ * Atomically check if new streams got added in the session since the last
+ * check and reset the flag to 0.
+ *
+ * Returns 1 if new streams got added, 0 if nothing changed, a negative value
+ * on error.
+ */
+static
+int check_new_streams(uint64_t session_id, struct lttng_ht *sessions_ht)
+{
+       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_iter iter;
+       struct relay_session *session;
+       unsigned long current_val;
+       int ret;
+
+       lttng_ht_lookup(sessions_ht,
+                       (void *)((unsigned long) session_id), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               DBG("Relay session %" PRIu64 " not found", session_id);
+               ret = -1;
+               goto error;
+       }
+
+       session = caa_container_of(node, struct relay_session, session_n);
+
+       current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
+       ret = current_val;
+
+error:
+       return ret;
+}
+
 /*
  * Send the next index for a stream.
  *
 /*
  * Send the next index for a stream.
  *
@@ -1266,6 +1300,13 @@ int viewer_get_next_index(struct relay_command *cmd,
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
+       ret = check_new_streams(vstream->session_id, sessions_ht);
+       if (ret < 0) {
+               goto end_unlock;
+       } else if (ret == 1) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        pthread_mutex_lock(&vstream->overwrite_lock);
        if (vstream->abort_flag) {
                /*
        pthread_mutex_lock(&vstream->overwrite_lock);
        if (vstream->abort_flag) {
                /*
@@ -1354,7 +1395,8 @@ end:
  * Return 0 on success or else a negative value.
  */
 static
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_get_packet(struct relay_command *cmd)
+int viewer_get_packet(struct relay_command *cmd,
+               struct lttng_ht *sessions_ht)
 {
        int ret, send_data = 0;
        char *data = NULL;
 {
        int ret, send_data = 0;
        char *data = NULL;
@@ -1429,6 +1471,15 @@ int viewer_get_packet(struct relay_command *cmd)
                goto send_reply;
        }
 
                goto send_reply;
        }
 
+       ret = check_new_streams(stream->session_id, sessions_ht);
+       if (ret < 0) {
+               goto end_unlock;
+       } else if (ret == 1) {
+               reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+               reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+               goto send_reply;
+       }
+
        len = be32toh(get_packet_info.len);
        data = zmalloc(len);
        if (!data) {
        len = be32toh(get_packet_info.len);
        data = zmalloc(len);
        if (!data) {
@@ -1663,7 +1714,7 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
                ret = viewer_get_next_index(cmd, sessions_ht);
                break;
        case VIEWER_GET_PACKET:
                ret = viewer_get_next_index(cmd, sessions_ht);
                break;
        case VIEWER_GET_PACKET:
-               ret = viewer_get_packet(cmd);
+               ret = viewer_get_packet(cmd, sessions_ht);
                break;
        case VIEWER_GET_METADATA:
                ret = viewer_get_metadata(cmd);
                break;
        case VIEWER_GET_METADATA:
                ret = viewer_get_metadata(cmd);
index c08c17a4faf83d552d7d43ebe79610adf7d307d3..ab1ccd410e06773686f2bf6f3aefcc9c15ccb362 100644 (file)
@@ -83,6 +83,11 @@ struct relay_session {
         */
        uint64_t minor;
        uint64_t major;
         */
        uint64_t minor;
        uint64_t major;
+       /*
+        * Flag checked and exchanged with uatomic_cmpxchg to tell the
+        * viewer-side if new streams got added since the last check.
+        */
+       unsigned long new_streams;
 };
 
 /*
 };
 
 /*
index d8105e4058128bdc9a902bb8051fc991971f2b90..380b4f833042cad3dcf7809e1871016fda753506 100644 (file)
@@ -2128,6 +2128,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
         */
        set_viewer_ready_flag(cmd);
 
         */
        set_viewer_ready_flag(cmd);
 
+       /*
+        * Inform the viewer that there are new streams in the session.
+        */
+       uatomic_set(&cmd->session->new_streams, 1);
+
        reply.ret_code = htobe32(LTTNG_OK);
        send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
        if (send_ret < 0) {
        reply.ret_code = htobe32(LTTNG_OK);
        send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
        if (send_ret < 0) {
This page took 0.030333 seconds and 4 git commands to generate.