Fix: relayd: live client not notified of inactive streams
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 0355cf9d91f06cf95c62a84efd61ccb07fa29f60..9c1c64d81c6aa4161697348858aa42d0a828512d 100644 (file)
@@ -88,6 +88,27 @@ static uint64_t last_relay_viewer_session_id;
 static pthread_mutex_t last_relay_viewer_session_id_lock =
                PTHREAD_MUTEX_INITIALIZER;
 
+static const char *
+lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code)
+{
+       switch (code) {
+       case LTTNG_VIEWER_INDEX_OK:
+               return "INDEX_OK";
+       case LTTNG_VIEWER_INDEX_RETRY:
+               return "INDEX_RETRY";
+       case LTTNG_VIEWER_INDEX_HUP:
+               return "INDEX_HUP";
+       case LTTNG_VIEWER_INDEX_ERR:
+               return "INDEX_ERR";
+       case LTTNG_VIEWER_INDEX_INACTIVE:
+               return "INDEX_INACTIVE";
+       case LTTNG_VIEWER_INDEX_EOF:
+               return "INDEX_EOF";
+       default:
+               abort();
+       }
+}
+
 /*
  * Cleanup the daemon
  */
@@ -156,28 +177,30 @@ static
 int check_new_streams(struct relay_connection *conn)
 {
        struct relay_session *session;
-       unsigned long current_val;
        int ret = 0;
 
+       rcu_read_lock();
        if (!conn->viewer_session) {
                goto end;
        }
-       rcu_read_lock();
-       cds_list_for_each_entry_rcu(session,
-                       &conn->viewer_session->session_list,
-                       viewer_session_node) {
+
+       cds_list_for_each_entry_rcu(
+               session, &conn->viewer_session->session_list, viewer_session_node)
+       {
                if (!session_get(session)) {
                        continue;
                }
-               current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-               ret = current_val;
+
+               ret = uatomic_read(&session->new_streams);
                session_put(session);
                if (ret == 1) {
                        goto end;
                }
        }
+
 end:
        rcu_read_unlock();
+       DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd);
        return ret;
 }
 
@@ -1187,6 +1210,8 @@ int viewer_get_new_streams(struct relay_connection *conn)
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply_unlock;
        }
+
+       uatomic_set(&session->new_streams, 0);
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
@@ -1601,6 +1626,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        struct relay_stream *rstream = NULL;
        struct ctf_trace *ctf_trace = NULL;
        struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       bool attached_sessions_have_new_streams = false;
 
        assert(conn);
 
@@ -1648,6 +1674,17 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       ret = check_new_streams(conn);
+       if (ret < 0) {
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams in the attached sessions, returning status=%s",
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               goto send_reply;
+       } else if (ret == 1) {
+               attached_sessions_have_new_streams = true;
+       }
+
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
@@ -1710,6 +1747,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                 */
                goto send_reply;
        }
+
        /* At this point, ret is 0 thus we will be able to read the index. */
        assert(!ret);
 
@@ -1769,14 +1807,6 @@ int viewer_get_next_index(struct relay_connection *conn)
                vstream->stream_file.handle = fs_handle;
        }
 
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
-               goto send_reply;
-       } else if (ret == 1) {
-               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-       }
-
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
                ERR("Relay error reading index file");
@@ -1821,6 +1851,10 @@ send_reply:
                pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
        }
 
+       if (attached_sessions_have_new_streams) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        viewer_index.flags = htobe32(viewer_index.flags);
        health_code_update();
 
This page took 0.024843 seconds and 4 git commands to generate.