Fix: don't expose empty streams
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index cacad2916f281d7589f3324b536d1ecef4853419..aa0586814e6ba55adeb6c2b35d5c31f1789bcc51 100644 (file)
@@ -267,7 +267,7 @@ end_unlock:
 static
 int make_viewer_streams(struct relay_session *session,
                enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent,
-               uint32_t *nb_created)
+               uint32_t *nb_created, bool *closed)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -281,6 +281,10 @@ int make_viewer_streams(struct relay_session *session,
         */
        pthread_mutex_lock(&session->lock);
 
+       if (session->connection_closed) {
+               *closed = true;
+       }
+
        /*
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
@@ -309,6 +313,12 @@ int make_viewer_streams(struct relay_session *session,
                        if (!stream->published) {
                                goto next;
                        }
+                       /*
+                        * Stream has no data, don't consider it yet.
+                        */
+                       if (stream->prev_seq == -1ULL) {
+                               goto next;
+                       }
                        vstream = viewer_stream_get_by_id(stream->stream_handle);
                        if (!vstream) {
                                vstream = viewer_stream_create(stream, seek_t);
@@ -323,17 +333,34 @@ int make_viewer_streams(struct relay_session *session,
                                        /* Update number of created stream counter. */
                                        (*nb_created)++;
                                }
+                               /*
+                                * Ensure a self-reference is preserved even
+                                * after we have put our local reference.
+                                */
+                               viewer_stream_get(vstream);
                        } else {
                                if (!vstream->sent_flag && nb_unsent) {
                                        /* Update number of unsent stream counter. */
                                        (*nb_unsent)++;
                                }
-                               viewer_stream_put(vstream);
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               (*nb_total)++;
+                               if (stream->is_metadata) {
+                                       if (!stream->closed ||
+                                                       stream->metadata_received > vstream->metadata_sent) {
+                                               (*nb_total)++;
+                                       }
+                               } else {
+                                       if (!stream->closed ||
+                                               !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+
+                                               (*nb_total)++;
+                                       }
+                               }
                        }
+                       /* Put local reference. */
+                       viewer_stream_put(vstream);
                next:
                        stream_put(stream);
                }
@@ -862,6 +889,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
        struct lttng_viewer_new_streams_response response;
        struct relay_session *session;
        uint64_t session_id;
+       bool closed = false;
 
        assert(conn);
 
@@ -897,7 +925,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
        ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
-                       &nb_created);
+                       &nb_created, &closed);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -906,12 +934,12 @@ int viewer_get_new_streams(struct relay_connection *conn)
        response.streams_count = htobe32(nb_streams);
 
        /*
-        * If the session is closed and we have no new streams to send,
-        * it means that the viewer has already received the whole trace
-        * for this session and should now close it.
+        * If the session is closed, HUP when there are no more streams
+        * with data.
         */
-       if (nb_total == 0 && session->connection_closed) {
+       if (closed && nb_total == 0) {
                send_streams = 0;
+               response.streams_count = 0;
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
                goto send_reply;
        }
@@ -964,6 +992,7 @@ int viewer_attach_session(struct relay_connection *conn)
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
        struct relay_session *session = NULL;
+       bool closed = false;
 
        assert(conn);
 
@@ -1022,12 +1051,26 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
-       ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
+       ret = make_viewer_streams(session, seek_type, &nb_streams, NULL,
+                       NULL, &closed);
        if (ret < 0) {
                goto end_put_session;
        }
        response.streams_count = htobe32(nb_streams);
 
+       /*
+        * If the session is closed when the viewer is attaching, it
+        * means some of the streams may have been concurrently removed,
+        * so we don't allow the viewer to attach, even if there are
+        * streams available.
+        */
+       if (closed) {
+               send_streams = 0;
+               response.streams_count = 0;
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+               goto send_reply;
+       }
+
 send_reply:
        health_code_update();
        ret = send_response(conn->sock, &response, sizeof(response));
@@ -1254,8 +1297,8 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
-               ret = -1;
-               goto end;
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               goto send_reply;
        }
 
        /* Use back. ref. Protected by refcounts. */
@@ -1378,7 +1421,9 @@ int viewer_get_next_index(struct relay_connection *conn)
        viewer_index.stream_id = packet_index.stream_id;
 
 send_reply:
-       pthread_mutex_unlock(&rstream->lock);
+       if (rstream) {
+               pthread_mutex_unlock(&rstream->lock);
+       }
 
        if (metadata_viewer_stream) {
                pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
@@ -1439,8 +1484,6 @@ int viewer_get_packet(struct relay_connection *conn)
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply;
        struct relay_viewer_stream *vstream = NULL;
-       struct ctf_trace *ctf_trace;
-       struct relay_viewer_stream *metadata_viewer_stream = NULL;
 
        DBG2("Relay get data packet");
 
@@ -1458,60 +1501,11 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
-               goto error;
-       }
-
-       ctf_trace = vstream->stream->trace;
-
-       /* metadata_viewer_stream may be NULL. */
-       metadata_viewer_stream =
-                       ctf_trace_get_viewer_metadata_stream(ctf_trace);
-
-       if (metadata_viewer_stream) {
-               bool get_packet_err = false;
-
-               pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
-               DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64,
-                       metadata_viewer_stream->stream->metadata_received,
-                       metadata_viewer_stream->metadata_sent);
-               if (!metadata_viewer_stream->stream->metadata_received ||
-                               metadata_viewer_stream->stream->metadata_received >
-                                       metadata_viewer_stream->metadata_sent) {
-                       /*
-                        * We prevent the client from reading a data stream as
-                        * long as there is metadata left to consume. This
-                        * ensures that the client won't receive data of which
-                        * it can't make sense.
-                        */
-                       get_packet_err = true;
-               }
-               pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
-               viewer_stream_put(metadata_viewer_stream);
-               if (get_packet_err) {
-                       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
-                       reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
-                       goto send_reply_nolock;
-               }
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+               goto send_reply_nolock;
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       /*
-        * The vstream->stream_fd used here has been opened by
-        * get_next_index. It is opened there because this is what
-        * allows us to grab a reference to the file with stream lock
-        * held, thus protecting us against overwrite caused by
-        * tracefile rotation. Since tracefile rotation unlinks the old
-        * data file, we are ensured that we won't have our data
-        * overwritten under us.
-        */
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               goto end_free;
-       } else if (ret == 1) {
-               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
-               reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-               goto send_reply;
-       }
 
        len = be32toh(get_packet_info.len);
        data = zmalloc(len);
@@ -1610,7 +1604,15 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
        if (!vstream) {
-               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+               /*
+                * The metadata stream can be closed by a CLOSE command
+                * just before we attach. It can also be closed by
+                * per-pid tracing during tracing. Therefore, it is
+                * possible that we cannot find this viewer stream.
+                * Reply back to the client with an error if we cannot
+                * find it.
+                */
+               reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
        pthread_mutex_lock(&vstream->stream->lock);
This page took 0.026217 seconds and 4 git commands to generate.