Fix: relayd: failure to read index entry or stream packet after clear
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 4238d3b53ff217e05613e5ed7aab0aef72f11921..6759bed17fd7a16d4703f3341eeaf9b2a1c0a390 100644 (file)
@@ -1,30 +1,21 @@
 /*
- * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
- *                      David Goulet <dgoulet@efficios.com>
- *               2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *               2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2019 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #define _LGPL_SOURCE
 #include <common/common.h>
-#include <common/utils.h>
 #include <common/defaults.h>
+#include <common/fs-handle.h>
 #include <common/sessiond-comm/relayd.h>
-#include <urcu/rculist.h>
+#include <common/utils.h>
 #include <sys/stat.h>
+#include <urcu/rculist.h>
 
 #include "lttng-relayd.h"
 #include "index.h"
@@ -80,15 +71,16 @@ static void stream_complete_rotation(struct relay_stream *stream)
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
        stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
+       stream->completed_rotation_count++;
 }
 
 static int stream_create_data_output_file_from_trace_chunk(
                struct relay_stream *stream,
                struct lttng_trace_chunk *trace_chunk,
                bool force_unlink,
-               struct stream_fd **out_stream_fd)
+               struct fs_handle **out_file)
 {
-       int ret, fd;
+       int ret;
        char stream_path[LTTNG_PATH_MAX];
        enum lttng_trace_chunk_status status;
        const int flags = O_RDWR | O_CREAT | O_TRUNC;
@@ -127,22 +119,13 @@ static int stream_create_data_output_file_from_trace_chunk(
                }
        }
 
-       status = lttng_trace_chunk_open_file(
-                       trace_chunk, stream_path, flags, mode, &fd, false);
+       status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
+                       flags, mode, out_file, false);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->channel_name);
                ret = -1;
                goto end;
        }
-
-       *out_stream_fd = stream_fd_create(fd);
-       if (!*out_stream_fd) {
-               if (close(ret)) {
-                       PERROR("Error closing stream file descriptor %d", ret);
-               }
-               ret = -1;
-               goto end;
-       }
 end:
        return ret;
 }
@@ -154,16 +137,15 @@ static int stream_rotate_data_file(struct relay_stream *stream)
        DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
                        stream->stream_handle, stream->tracefile_size_current);
 
-       if (stream->stream_fd) {
-               stream_fd_put(stream->stream_fd);
-               stream->stream_fd = NULL;
+       if (stream->file) {
+               fs_handle_close(stream->file);
+               stream->file = NULL;
        }
 
        stream->tracefile_wrapped_around = false;
        stream->tracefile_current_index = 0;
 
        if (stream->ongoing_rotation.value.next_trace_chunk) {
-               struct stream_fd *new_stream_fd = NULL;
                enum lttng_trace_chunk_status chunk_status;
 
                chunk_status = lttng_trace_chunk_create_subdirectory(
@@ -177,8 +159,7 @@ static int stream_rotate_data_file(struct relay_stream *stream)
                /* Rotate the data file. */
                ret = stream_create_data_output_file_from_trace_chunk(stream,
                                stream->ongoing_rotation.value.next_trace_chunk,
-                               false, &new_stream_fd);
-               stream->stream_fd = new_stream_fd;
+                               false, &stream->file);
                if (ret < 0) {
                        ERR("Failed to rotate stream data file");
                        goto end;
@@ -213,7 +194,7 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        off_t lseek_ret, previous_stream_copy_origin;
        uint64_t copy_bytes_left, misplaced_data_size;
        bool acquired_reference;
-       struct stream_fd *previous_stream_fd = NULL;
+       struct fs_handle *previous_stream_file = NULL;
        struct lttng_trace_chunk *previous_chunk = NULL;
 
        if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
@@ -244,9 +225,9 @@ static int rotate_truncate_stream(struct relay_stream *stream)
         * the orinal stream_fd will be used to copy the "extra" data
         * to the new file.
         */
-       assert(stream->stream_fd);
-       previous_stream_fd = stream->stream_fd;
-       stream->stream_fd = NULL;
+       assert(stream->file);
+       previous_stream_file = stream->file;
+       stream->file = NULL;
 
        assert(!stream->is_metadata);
        assert(stream->tracefile_size_current >
@@ -261,13 +242,12 @@ static int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
-       assert(stream->stream_fd);
+       assert(stream->file);
        /*
         * Seek the current tracefile to the position at which the rotation
         * should have occurred.
         */
-       lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
-                       SEEK_SET);
+       lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
        if (lseek_ret < 0) {
                PERROR("Failed to seek to offset %" PRIu64
                       " while copying extra data received before a stream rotation",
@@ -283,41 +263,41 @@ static int rotate_truncate_stream(struct relay_stream *stream)
                const off_t copy_size_this_pass = min_t(
                                off_t, copy_bytes_left, sizeof(copy_buffer));
 
-               io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
+               io_ret = fs_handle_read(previous_stream_file, copy_buffer,
                                copy_size_this_pass);
                if (io_ret < (ssize_t) copy_size_this_pass) {
                        if (io_ret == -1) {
                                PERROR("Failed to read %" PRIu64
-                                      " bytes from fd %i in %s(), returned %zi",
+                                      " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
                                                copy_size_this_pass,
-                                               previous_stream_fd->fd,
-                                               __FUNCTION__, io_ret);
+                                               __FUNCTION__, io_ret,
+                                               stream->stream_handle);
                        } else {
                                ERR("Failed to read %" PRIu64
-                                   " bytes from fd %i in %s(), returned %zi",
+                                      " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
                                                copy_size_this_pass,
-                                               previous_stream_fd->fd,
-                                               __FUNCTION__, io_ret);
+                                               __FUNCTION__, io_ret,
+                                               stream->stream_handle);
                        }
                        ret = -1;
                        goto end;
                }
 
-               io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
-                               copy_size_this_pass);
+               io_ret = fs_handle_write(
+                               stream->file, copy_buffer, copy_size_this_pass);
                if (io_ret < (ssize_t) copy_size_this_pass) {
                        if (io_ret == -1) {
                                PERROR("Failed to write %" PRIu64
-                                      " bytes from fd %i in %s(), returned %zi",
+                                      " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
                                                copy_size_this_pass,
-                                               stream->stream_fd->fd,
-                                               __FUNCTION__, io_ret);
+                                               __FUNCTION__, io_ret,
+                                               stream->stream_handle);
                        } else {
                                ERR("Failed to write %" PRIu64
-                                   " bytes from fd %i in %s(), returned %zi",
+                                      " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
                                                copy_size_this_pass,
-                                               stream->stream_fd->fd,
-                                               __FUNCTION__, io_ret);
+                                               __FUNCTION__, io_ret,
+                                               stream->stream_handle);
                        }
                        ret = -1;
                        goto end;
@@ -326,7 +306,8 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        }
 
        /* Truncate the file to get rid of the excess data. */
-       ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+       ret = fs_handle_truncate(
+                       previous_stream_file, previous_stream_copy_origin);
        if (ret) {
                PERROR("Failed to truncate current stream file to offset %" PRIu64,
                                previous_stream_copy_origin);
@@ -349,7 +330,6 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        ret = 0;
 end:
        lttng_trace_chunk_put(previous_chunk);
-       stream_fd_put(previous_stream_fd);
        return ret;
 }
 
@@ -561,7 +541,6 @@ static int stream_set_trace_chunk(struct relay_stream *stream,
        int ret = 0;
        enum lttng_trace_chunk_status status;
        bool acquired_reference;
-       struct stream_fd *new_stream_fd = NULL;
 
        status = lttng_trace_chunk_create_subdirectory(chunk,
                        stream->path_name);
@@ -575,13 +554,12 @@ static int stream_set_trace_chunk(struct relay_stream *stream,
        assert(acquired_reference);
        stream->trace_chunk = chunk;
 
-       if (stream->stream_fd) {
-               stream_fd_put(stream->stream_fd);
-               stream->stream_fd = NULL;
+       if (stream->file) {
+               fs_handle_close(stream->file);
+               stream->file = NULL;
        }
        ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
-                       false, &new_stream_fd);
-       stream->stream_fd = new_stream_fd;
+                       false, &stream->file);
 end:
        return ret;
 }
@@ -684,9 +662,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 
 end:
        if (ret) {
-               if (stream->stream_fd) {
-                       stream_fd_put(stream->stream_fd);
-                       stream->stream_fd = NULL;
+               if (stream->file) {
+                       fs_handle_close(stream->file);
+                       stream->file = NULL;
                }
                stream_put(stream);
                stream = NULL;
@@ -809,9 +787,9 @@ static void stream_release(struct urcu_ref *ref)
 
        stream_unpublish(stream);
 
-       if (stream->stream_fd) {
-               stream_fd_put(stream->stream_fd);
-               stream->stream_fd = NULL;
+       if (stream->file) {
+               fs_handle_close(stream->file);
+               stream->file = NULL;
        }
        if (stream->index_file) {
                lttng_index_file_put(stream->index_file);
@@ -991,9 +969,9 @@ void try_stream_close(struct relay_stream *stream)
         */
 
        /* Put stream fd before put chunk. */
-       if (stream->stream_fd) {
-               stream_fd_put(stream->stream_fd);
-               stream->stream_fd = NULL;
+       if (stream->file) {
+               fs_handle_close(stream->file);
+               stream->file = NULL;
        }
        if (stream->index_file) {
                lttng_index_file_put(stream->index_file);
@@ -1013,7 +991,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
 
        ASSERT_LOCKED(stream->lock);
 
-       if (!stream->stream_fd || !stream->trace_chunk) {
+       if (!stream->file || !stream->trace_chunk) {
                ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
                                stream->stream_handle, stream->channel_name);
                ret = -1;
@@ -1047,12 +1025,12 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
-               if (stream->stream_fd) {
-                       stream_fd_put(stream->stream_fd);
-                       stream->stream_fd = NULL;
+               if (stream->file) {
+                       fs_handle_close(stream->file);
+                       stream->file = NULL;
                }
                ret = stream_create_data_output_file_from_trace_chunk(stream,
-                               stream->trace_chunk, false, &stream->stream_fd);
+                               stream->trace_chunk, false, &stream->file);
                if (ret) {
                        ERR("Failed to perform trace file rotation of stream %" PRIu64,
                                        stream->stream_handle);
@@ -1087,15 +1065,15 @@ int stream_write(struct relay_stream *stream,
        memset(padding_buffer, 0,
                        min(sizeof(padding_buffer), padding_to_write));
 
-       if (!stream->stream_fd || !stream->trace_chunk) {
+       if (!stream->file || !stream->trace_chunk) {
                ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
                                stream->stream_handle, stream->channel_name);
                ret = -1;
                goto end;
        }
        if (packet) {
-               write_ret = lttng_write(stream->stream_fd->fd,
-                               packet->data, packet->size);
+               write_ret = fs_handle_write(
+                               stream->file, packet->data, packet->size);
                if (write_ret != packet->size) {
                        PERROR("Failed to write to stream file of %sstream %" PRIu64,
                                        stream->is_metadata ? "metadata " : "",
@@ -1109,8 +1087,8 @@ int stream_write(struct relay_stream *stream,
                const size_t padding_to_write_this_pass =
                                min(padding_to_write, sizeof(padding_buffer));
 
-               write_ret = lttng_write(stream->stream_fd->fd,
-                               padding_buffer, padding_to_write_this_pass);
+               write_ret = fs_handle_write(stream->file, padding_buffer,
+                               padding_to_write_this_pass);
                if (write_ret != padding_to_write_this_pass) {
                        PERROR("Failed to write padding to file of %sstream %" PRIu64,
                                        stream->is_metadata ? "metadata " : "",
@@ -1351,9 +1329,16 @@ int stream_reset_file(struct relay_stream *stream)
 {
        ASSERT_LOCKED(stream->lock);
 
-       if (stream->stream_fd) {
-               stream_fd_put(stream->stream_fd);
-               stream->stream_fd = NULL;
+       if (stream->file) {
+               int ret;
+
+               ret = fs_handle_close(stream->file);
+               if (ret) {
+                       ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
+                                       stream->channel_name,
+                                       stream->stream_handle);
+               }
+               stream->file = NULL;
        }
 
        DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
@@ -1366,7 +1351,7 @@ int stream_reset_file(struct relay_stream *stream)
        stream->pos_after_last_complete_data_index = 0;
 
        return stream_create_data_output_file_from_trace_chunk(stream,
-                       stream->trace_chunk, true, &stream->stream_fd);
+                       stream->trace_chunk, true, &stream->file);
 }
 
 void print_relay_streams(void)
This page took 0.029965 seconds and 4 git commands to generate.