Fix: unpublish stream on close
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index adb044f1d3ebd4b6b6d58a035c6e4d9eece706b9..fe52702c77489209dcb43208206bb6e7dd1fb04c 100644 (file)
@@ -71,6 +71,7 @@
 #include "session.h"
 #include "stream.h"
 #include "connection.h"
+#include "tracefile-array.h"
 
 /* command line options */
 char *opt_output_path;
@@ -1265,9 +1266,17 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+
+       /*
+        * Set last_net_seq_num before the close flag. Required by data
+        * pending check.
+        */
        pthread_mutex_lock(&stream->lock);
-       stream->closed = true;
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+       pthread_mutex_unlock(&stream->lock);
+
+       stream_close(stream);
+
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
@@ -1286,7 +1295,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        viewer_stream_put(vstream);
                }
        }
-       pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 
 end:
@@ -1890,7 +1898,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                 * Only flag a stream inactive when it has already
                 * received data and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0
+               if (stream->index_received_seqcount > 0
                                && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end =
                                be64toh(index_info.timestamp_end);
@@ -1918,7 +1926,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2091,7 +2100,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
                fd = index_create_file(stream->path_name, stream->channel_name,
                                -1, -1, stream->tracefile_size,
-                               stream->current_tracefile_id);
+                               tracefile_array_get_file_index_head(stream->tfa));
                if (fd < 0) {
                        ret = -1;
                        /* Put self-ref for this index due to error. */
@@ -2120,7 +2129,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
@@ -2204,35 +2214,23 @@ static int relay_process_data(struct relay_connection *conn)
        if (stream->tracefile_size > 0 &&
                        (stream->tracefile_size_current + data_size) >
                        stream->tracefile_size) {
-               uint64_t new_id;
+               uint64_t old_id, new_id;
+
+               old_id = tracefile_array_get_file_index_head(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa);
+
+               /* new_id is updated by utils_rotate_stream_file. */
+               new_id = old_id;
 
-               new_id = (stream->current_tracefile_id + 1) %
-                       stream->tracefile_count;
-               /*
-                * Move viewer oldest available data position forward if
-                * we are overwriting a tracefile.
-                */
-               if (new_id == stream->oldest_tracefile_id) {
-                       stream->oldest_tracefile_id =
-                               (stream->oldest_tracefile_id + 1) %
-                               stream->tracefile_count;
-               }
                ret = utils_rotate_stream_file(stream->path_name,
                                stream->channel_name, stream->tracefile_size,
                                stream->tracefile_count, -1,
                                -1, stream->stream_fd->fd,
-                               &stream->current_tracefile_id,
-                               &stream->stream_fd->fd);
+                               &new_id, &stream->stream_fd->fd);
                if (ret < 0) {
                        ERR("Rotating stream output file");
                        goto end_stream_unlock;
                }
-               stream->current_tracefile_seq++;
-               if (stream->current_tracefile_seq
-                       - stream->oldest_tracefile_seq >=
-                               stream->tracefile_count) {
-                       stream->oldest_tracefile_seq++;
-               }
                /*
                 * Reset current size because we just performed a stream
                 * rotation.
This page took 0.024513 seconds and 4 git commands to generate.