Fix: Wait for in-flight data before closing a stream
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index adb044f1d3ebd4b6b6d58a035c6e4d9eece706b9..e769daf88fe99b6c0e01d2006861ea8c6a804957 100644 (file)
@@ -71,6 +71,7 @@
 #include "session.h"
 #include "stream.h"
 #include "connection.h"
+#include "tracefile-array.h"
 
 /* command line options */
 char *opt_output_path;
@@ -1265,9 +1266,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+
+       /*
+        * Set last_net_seq_num before the close flag. Required by data
+        * pending check.
+        */
        pthread_mutex_lock(&stream->lock);
-       stream->closed = true;
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+       pthread_mutex_unlock(&stream->lock);
+
+       /*
+        * This is one of the conditions which may trigger a stream close
+        * with the others being:
+        *     1) A close command is received for a stream
+        *     2) The control connection owning the stream is closed
+        *     3) We have received all of the stream's data _after_ a close
+        *        request.
+        */
+       try_stream_close(stream);
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
@@ -1286,7 +1302,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        viewer_stream_put(vstream);
                }
        }
-       pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 
 end:
@@ -1890,7 +1905,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                 * Only flag a stream inactive when it has already
                 * received data and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0
+               if (stream->index_received_seqcount > 0
                                && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end =
                                be64toh(index_info.timestamp_end);
@@ -1918,7 +1933,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2091,7 +2107,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
                fd = index_create_file(stream->path_name, stream->channel_name,
                                -1, -1, stream->tracefile_size,
-                               stream->current_tracefile_id);
+                               tracefile_array_get_file_index_head(stream->tfa));
                if (fd < 0) {
                        ret = -1;
                        /* Put self-ref for this index due to error. */
@@ -2120,7 +2136,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
@@ -2146,7 +2163,7 @@ static int relay_process_data(struct relay_connection *conn)
        uint64_t net_seq_num;
        uint32_t data_size;
        struct relay_session *session;
-       bool new_stream = false;
+       bool new_stream = false, close_requested = false;
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2204,35 +2221,23 @@ static int relay_process_data(struct relay_connection *conn)
        if (stream->tracefile_size > 0 &&
                        (stream->tracefile_size_current + data_size) >
                        stream->tracefile_size) {
-               uint64_t new_id;
+               uint64_t old_id, new_id;
+
+               old_id = tracefile_array_get_file_index_head(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa);
+
+               /* new_id is updated by utils_rotate_stream_file. */
+               new_id = old_id;
 
-               new_id = (stream->current_tracefile_id + 1) %
-                       stream->tracefile_count;
-               /*
-                * Move viewer oldest available data position forward if
-                * we are overwriting a tracefile.
-                */
-               if (new_id == stream->oldest_tracefile_id) {
-                       stream->oldest_tracefile_id =
-                               (stream->oldest_tracefile_id + 1) %
-                               stream->tracefile_count;
-               }
                ret = utils_rotate_stream_file(stream->path_name,
                                stream->channel_name, stream->tracefile_size,
                                stream->tracefile_count, -1,
                                -1, stream->stream_fd->fd,
-                               &stream->current_tracefile_id,
-                               &stream->stream_fd->fd);
+                               &new_id, &stream->stream_fd->fd);
                if (ret < 0) {
                        ERR("Rotating stream output file");
                        goto end_stream_unlock;
                }
-               stream->current_tracefile_seq++;
-               if (stream->current_tracefile_seq
-                       - stream->oldest_tracefile_seq >=
-                               stream->tracefile_count) {
-                       stream->oldest_tracefile_seq++;
-               }
                /*
                 * Reset current size because we just performed a stream
                 * rotation.
@@ -2277,7 +2282,12 @@ static int relay_process_data(struct relay_connection *conn)
        stream->prev_seq = net_seq_num;
 
 end_stream_unlock:
+       close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
+       if (close_requested) {
+               try_stream_close(stream);
+       }
+
        if (new_stream) {
                pthread_mutex_lock(&session->lock);
                uatomic_set(&session->new_streams, 1);
This page took 0.025196 seconds and 4 git commands to generate.