Fix concurrency issues while overwriting tracefiles in live
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index e51ff5cdf53495d4848a2867cfc952e9f2534bc5..480c459ce6a49724f54d1b6ef71c0b5a6d458a1b 100644 (file)
@@ -801,23 +801,34 @@ int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream,
                }
        }
        viewer_stream->tracefile_count_current = tracefile_id;
-       pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
 
        if (viewer_stream->abort_flag == 0) {
-               ret = close(viewer_stream->index_read_fd);
-               if (ret < 0) {
-                       PERROR("close index file");
+               if (viewer_stream->index_read_fd > 0) {
+                       ret = close(viewer_stream->index_read_fd);
+                       if (ret < 0) {
+                               PERROR("close index file %d",
+                                               viewer_stream->index_read_fd);
+                       }
+                       viewer_stream->index_read_fd = -1;
                }
-               ret = close(viewer_stream->read_fd);
-               if (ret < 0) {
-                       PERROR("close tracefile");
+               if (viewer_stream->read_fd > 0) {
+                       ret = close(viewer_stream->read_fd);
+                       if (ret < 0) {
+                               PERROR("close tracefile %d",
+                                               viewer_stream->read_fd);
+                       }
+                       viewer_stream->read_fd = -1;
                }
        } else {
                viewer_stream->abort_flag = 0;
        }
 
+       viewer_stream->index_read_fd = -1;
        viewer_stream->read_fd = -1;
 
+       if (stream) {
+               pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+       }
        ret = open_index(viewer_stream);
        if (ret < 0) {
                goto error;
@@ -1125,25 +1136,26 @@ int viewer_get_next_index(struct relay_command *cmd,
                                goto end_unlock;
                        }
                }
-               if (rstream->beacon_ts_end != -1ULL &&
-                               vstream->last_sent_index == rstream->total_index_received) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
-                       viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
-                       goto send_reply;
-               }
-               /*
-                * Reader and writer are working in the same tracefile, so we care
-                * about the number of index received and sent. Otherwise, we read
-                * up to EOF.
-                */
                pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
-               if (rstream->tracefile_count_current == vstream->tracefile_count_current
-                               && rstream->total_index_received <= vstream->last_sent_index
-                               && !vstream->close_write_flag) {
-                       pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
-                       /* No new index to send, retry later. */
-                       viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
-                       goto send_reply;
+               if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
+                       if (rstream->beacon_ts_end != -1ULL &&
+                               vstream->last_sent_index == rstream->total_index_received) {
+                               viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+                               viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
+                               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+                               goto send_reply;
+                       /*
+                        * Reader and writer are working in the same tracefile, so we care
+                        * about the number of index received and sent. Otherwise, we read
+                        * up to EOF.
+                        */
+                       } else if (rstream->total_index_received <= vstream->last_sent_index
+                                       && !vstream->close_write_flag) {
+                               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+                               /* No new index to send, retry later. */
+                               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                               goto send_reply;
+                       }
                }
                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
        } else if (!rstream && vstream->close_write_flag &&
@@ -1161,8 +1173,23 @@ int viewer_get_next_index(struct relay_command *cmd,
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
+       pthread_mutex_lock(&vstream->overwrite_lock);
+       if (vstream->abort_flag) {
+               /*
+                * The file is being overwritten by the writer, we cannot
+                * use it.
+                */
+               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+               pthread_mutex_unlock(&vstream->overwrite_lock);
+               ret = rotate_viewer_stream(vstream, rstream);
+               if (ret < 0) {
+                       goto end_unlock;
+               }
+               goto send_reply;
+       }
        ret = lttng_read(vstream->index_read_fd, &packet_index,
                        sizeof(packet_index));
+       pthread_mutex_unlock(&vstream->overwrite_lock);
        if (ret < sizeof(packet_index)) {
                /*
                 * The tracefile is closed in write, so we read up to EOF.
@@ -1175,17 +1202,9 @@ int viewer_get_next_index(struct relay_command *cmd,
                                goto end_unlock;
                        }
                } else {
-                       /*
-                        * If the read fd was closed by the streaming side, the
-                        * abort_flag will be set to 1, otherwise it is an error.
-                        */
-                       if (vstream->abort_flag != 1) {
-                               PERROR("Relay reading index file");
-                               viewer_index.status = htobe32(VIEWER_INDEX_ERR);
-                               goto send_reply;
-                       } else {
-                               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
-                       }
+                       PERROR("Relay reading index file %d",
+                                       vstream->index_read_fd);
+                       viewer_index.status = htobe32(VIEWER_INDEX_ERR);
                }
                goto send_reply;
        } else {
This page took 0.024746 seconds and 4 git commands to generate.