From: Jérémie Galarneau Date: Thu, 30 Jan 2020 06:27:16 +0000 (-0500) Subject: relayd: replace uses of block FDs by the fs_handle interface X-Git-Tag: v2.12.0-rc1~36 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=8bb66c3cd60938352927ee865759433387324250 relayd: replace uses of block FDs by the fs_handle interface Replace all usage of "raw" block-device file descriptors for relay_streams, viewer_streams, and index files by the fs_handle. Wrappers are introduced for read, write, seek and truncate operations in order to reduce code duplication as all uses of fs_handles implies getting an fd, using it, and putting it back. Those operations allow the fd-tracker to suspend and restore fs_handles as needed. The stream_fd util is eliminated as it is completely replaced by the fs_handle interface. Signed-off-by: Jérémie Galarneau Change-Id: Iedff88d27aeba3891d4e8818b9e08e4b16a927cc --- diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index 4fc35dc0f..c950be385 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -18,7 +18,6 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ viewer-stream.h viewer-stream.c \ session.c session.h \ stream.c stream.h \ - stream-fd.c stream-fd.h \ connection.c connection.h \ viewer-session.c viewer-session.h \ tracefile-array.c tracefile-array.h \ @@ -36,5 +35,4 @@ lttng_relayd_LDADD = -lurcu-common -lurcu \ $(top_builddir)/src/common/health/libhealth.la \ $(top_builddir)/src/common/config/libconfig.la \ $(top_builddir)/src/common/testpoint/libtestpoint.la \ - $(top_builddir)/src/common/fd-tracker/libfd-tracker.la \ $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index bdbd11330..d8bd518b9 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -273,7 +273,6 @@ int relay_index_try_flush(struct relay_index *index) { int ret = 1; bool flushed = false; - int fd; pthread_mutex_lock(&index->lock); if (index->flushed) { @@ -283,10 +282,9 @@ int relay_index_try_flush(struct relay_index *index) if (!index->has_index_data || !index->index_file) { goto skip; } - fd = index->index_file->fd; - DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 - " on fd %d", index->stream->stream_handle, - index->index_n.key, fd); + + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64, + index->stream->stream_handle, index->index_n.key); flushed = true; index->flushed = true; ret = lttng_index_file_write(index->index_file, &index->index_data); @@ -401,7 +399,6 @@ int relay_index_switch_all_files(struct relay_stream *stream) rcu_read_lock(); cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index, index_n.node) { - DBG("Update index to fd %d", stream->index_file->fd); ret = relay_index_switch_file(index, stream->index_file, stream->pos_after_last_complete_data_index); if (ret) { diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h index 8466e91a4..11c0c5d0c 100644 --- a/src/bin/lttng-relayd/index.h +++ b/src/bin/lttng-relayd/index.h @@ -26,8 +26,6 @@ #include #include -#include "stream-fd.h" - struct relay_stream; struct relay_connection; struct lttcomm_relayd_index; diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index eaa4b1b73..eed82a6c8 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -18,8 +18,10 @@ */ #define _LGPL_SOURCE +#include #include #include +#include #include #include #include @@ -33,40 +35,39 @@ #include #include #include -#include +#include #include -#include #include -#include -#include +#include -#include #include +#include #include #include -#include #include +#include +#include #include #include -#include #include #include +#include #include #include -#include +#include #include "cmd.h" +#include "connection.h" +#include "ctf-trace.h" +#include "health-relayd.h" #include "live.h" #include "lttng-relayd.h" -#include "utils.h" -#include "health-relayd.h" -#include "testpoint.h" -#include "viewer-stream.h" -#include "stream.h" #include "session.h" -#include "ctf-trace.h" -#include "connection.h" +#include "stream.h" +#include "testpoint.h" +#include "utils.h" #include "viewer-session.h" +#include "viewer-stream.h" #define SESSION_BUF_DEFAULT_COUNT 16 @@ -1629,10 +1630,10 @@ int viewer_get_next_index(struct relay_connection *conn) * overwrite caused by tracefile rotation (in association with * unlink performed before overwrite). */ - if (!vstream->stream_file.fd) { - int fd; + if (!vstream->stream_file.handle) { char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; + struct fs_handle *fs_handle; ret = utils_stream_file_path(rstream->path_name, rstream->channel_name, rstream->tracefile_size, @@ -1647,9 +1648,9 @@ int viewer_get_next_index(struct relay_connection *conn) * missing if the stream has been closed (application exits with * per-pid buffers) and a clear command has been performed. */ - status = lttng_trace_chunk_open_file( + status = lttng_trace_chunk_open_fs_handle( vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fd, true); + file_path, O_RDONLY, 0, &fs_handle, true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && rstream->closed) { @@ -1659,13 +1660,7 @@ int viewer_get_next_index(struct relay_connection *conn) PERROR("Failed to open trace file for viewer stream"); goto error_put; } - vstream->stream_file.fd = stream_fd_create(fd); - if (!vstream->stream_file.fd) { - if (close(fd)) { - PERROR("Failed to close viewer stream file"); - } - goto error_put; - } + vstream->stream_file.handle = fs_handle; } ret = check_new_streams(conn); @@ -1678,8 +1673,7 @@ int viewer_get_next_index(struct relay_connection *conn) ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { - ERR("Relay error reading index file %d", - vstream->index_file->fd); + ERR("Relay error reading index file"); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } else { @@ -1769,6 +1763,7 @@ int viewer_get_packet(struct relay_connection *conn) uint32_t reply_size = sizeof(reply_header); uint32_t packet_data_len = 0; ssize_t read_len; + uint64_t stream_id; DBG2("Relay get data packet"); @@ -1783,11 +1778,12 @@ int viewer_get_packet(struct relay_connection *conn) /* From this point on, the error label can be reached. */ memset(&reply_header, 0, sizeof(reply_header)); + stream_id = (uint64_t) be64toh(get_packet_info.stream_id); - vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id)); + vstream = viewer_stream_get_by_id(stream_id); if (!vstream) { DBG("Client requested packet of unknown stream id %" PRIu64, - (uint64_t) be64toh(get_packet_info.stream_id)); + stream_id); reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); goto send_reply_nolock; } else { @@ -1803,19 +1799,21 @@ int viewer_get_packet(struct relay_connection *conn) } pthread_mutex_lock(&vstream->stream->lock); - lseek_ret = lseek(vstream->stream_file.fd->fd, + lseek_ret = fs_handle_seek(vstream->stream_file.handle, be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { - PERROR("lseek fd %d to offset %" PRIu64, - vstream->stream_file.fd->fd, + PERROR("Failed to seek file system handle of viewer stream %" PRIu64 + " to offset %" PRIu64, + stream_id, (uint64_t) be64toh(get_packet_info.offset)); goto error; } - read_len = lttng_read(vstream->stream_file.fd->fd, + read_len = fs_handle_read(vstream->stream_file.handle, reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { - PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, - vstream->stream_file.fd->fd, + PERROR("Failed to read from file system handle of viewer stream id %" PRIu64 + ", offset: %" PRIu64, + stream_id, (uint64_t) be64toh(get_packet_info.offset)); goto error; } @@ -1849,8 +1847,7 @@ send_reply_nolock: goto end_free; } - DBG("Sent %u bytes for stream %" PRIu64, reply_size, - (uint64_t) be64toh(get_packet_info.stream_id)); + DBG("Sent %u bytes for stream %" PRIu64, reply_size, stream_id); end_free: free(reply); @@ -1870,6 +1867,7 @@ static int viewer_get_metadata(struct relay_connection *conn) { int ret = 0; + int fd = -1; ssize_t read_len; uint64_t len = 0; char *data = NULL; @@ -1939,8 +1937,8 @@ int viewer_get_metadata(struct relay_connection *conn) len = vstream->stream->metadata_received - vstream->metadata_sent; /* first time, we open the metadata file */ - if (!vstream->stream_file.fd) { - int fd; + if (!vstream->stream_file.handle) { + struct fs_handle *fs_handle; char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; struct relay_stream *rstream = vstream->stream; @@ -1958,9 +1956,9 @@ int viewer_get_metadata(struct relay_connection *conn) * missing if the stream has been closed (application exits with * per-pid buffers) and a clear command has been performed. */ - status = lttng_trace_chunk_open_file( + status = lttng_trace_chunk_open_fs_handle( vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fd, true); + file_path, O_RDONLY, 0, &fs_handle, true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); @@ -1973,13 +1971,7 @@ int viewer_get_metadata(struct relay_connection *conn) PERROR("Failed to open metadata file for viewer stream"); goto error; } - vstream->stream_file.fd = stream_fd_create(fd); - if (!vstream->stream_file.fd) { - if (close(fd)) { - PERROR("Failed to close viewer metadata file"); - } - goto error; - } + vstream->stream_file.handle = fs_handle; } reply.len = htobe64(len); @@ -1989,7 +1981,14 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - read_len = lttng_read(vstream->stream_file.fd->fd, data, len); + fd = fs_handle_get_fd(vstream->stream_file.handle); + if (fd < 0) { + ERR("Failed to restore viewer stream file system handle"); + goto error; + } + read_len = lttng_read(fd, data, len); + fs_handle_put_fd(vstream->stream_file.handle); + fd = -1; if (read_len < len) { PERROR("Relay reading metadata file"); goto error; diff --git a/src/bin/lttng-relayd/stream-fd.c b/src/bin/lttng-relayd/stream-fd.c deleted file mode 100644 index f8c0c5343..000000000 --- a/src/bin/lttng-relayd/stream-fd.c +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2015 - Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#define _LGPL_SOURCE -#include - -#include "stream-fd.h" - -struct stream_fd *stream_fd_create(int fd) -{ - struct stream_fd *sf; - - sf = zmalloc(sizeof(*sf)); - if (!sf) { - goto end; - } - urcu_ref_init(&sf->ref); - sf->fd = fd; -end: - return sf; -} - -void stream_fd_get(struct stream_fd *sf) -{ - urcu_ref_get(&sf->ref); -} - -static void stream_fd_release(struct urcu_ref *ref) -{ - struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref); - int ret; - - ret = close(sf->fd); - if (ret) { - PERROR("Error closing stream FD %d", sf->fd); - } - free(sf); -} - -void stream_fd_put(struct stream_fd *sf) -{ - if (!sf) { - return; - } - urcu_ref_put(&sf->ref, stream_fd_release); -} diff --git a/src/bin/lttng-relayd/stream-fd.h b/src/bin/lttng-relayd/stream-fd.h deleted file mode 100644 index 64f3b16a5..000000000 --- a/src/bin/lttng-relayd/stream-fd.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef _STREAM_FD_H -#define _STREAM_FD_H - -/* - * Copyright (C) 2015 - Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include - -struct stream_fd { - int fd; - struct urcu_ref ref; -}; - -struct stream_fd *stream_fd_create(int fd); -void stream_fd_get(struct stream_fd *sf); -void stream_fd_put(struct stream_fd *sf); - -#endif /* _STREAM_FD_H */ diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 4238d3b53..1e51547fa 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -20,11 +20,12 @@ #define _LGPL_SOURCE #include -#include #include +#include #include -#include +#include #include +#include #include "lttng-relayd.h" #include "index.h" @@ -86,9 +87,9 @@ static int stream_create_data_output_file_from_trace_chunk( struct relay_stream *stream, struct lttng_trace_chunk *trace_chunk, bool force_unlink, - struct stream_fd **out_stream_fd) + struct fs_handle **out_file) { - int ret, fd; + int ret; char stream_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; const int flags = O_RDWR | O_CREAT | O_TRUNC; @@ -127,22 +128,13 @@ static int stream_create_data_output_file_from_trace_chunk( } } - status = lttng_trace_chunk_open_file( - trace_chunk, stream_path, flags, mode, &fd, false); + status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path, + flags, mode, out_file, false); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to open stream file \"%s\"", stream->channel_name); ret = -1; goto end; } - - *out_stream_fd = stream_fd_create(fd); - if (!*out_stream_fd) { - if (close(ret)) { - PERROR("Error closing stream file descriptor %d", ret); - } - ret = -1; - goto end; - } end: return ret; } @@ -154,16 +146,15 @@ static int stream_rotate_data_file(struct relay_stream *stream) DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64, stream->stream_handle, stream->tracefile_size_current); - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } stream->tracefile_wrapped_around = false; stream->tracefile_current_index = 0; if (stream->ongoing_rotation.value.next_trace_chunk) { - struct stream_fd *new_stream_fd = NULL; enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_trace_chunk_create_subdirectory( @@ -177,8 +168,7 @@ static int stream_rotate_data_file(struct relay_stream *stream) /* Rotate the data file. */ ret = stream_create_data_output_file_from_trace_chunk(stream, stream->ongoing_rotation.value.next_trace_chunk, - false, &new_stream_fd); - stream->stream_fd = new_stream_fd; + false, &stream->file); if (ret < 0) { ERR("Failed to rotate stream data file"); goto end; @@ -213,7 +203,7 @@ static int rotate_truncate_stream(struct relay_stream *stream) off_t lseek_ret, previous_stream_copy_origin; uint64_t copy_bytes_left, misplaced_data_size; bool acquired_reference; - struct stream_fd *previous_stream_fd = NULL; + struct fs_handle *previous_stream_file = NULL; struct lttng_trace_chunk *previous_chunk = NULL; if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) { @@ -244,9 +234,9 @@ static int rotate_truncate_stream(struct relay_stream *stream) * the orinal stream_fd will be used to copy the "extra" data * to the new file. */ - assert(stream->stream_fd); - previous_stream_fd = stream->stream_fd; - stream->stream_fd = NULL; + assert(stream->file); + previous_stream_file = stream->file; + stream->file = NULL; assert(!stream->is_metadata); assert(stream->tracefile_size_current > @@ -261,13 +251,12 @@ static int rotate_truncate_stream(struct relay_stream *stream) goto end; } - assert(stream->stream_fd); + assert(stream->file); /* * Seek the current tracefile to the position at which the rotation * should have occurred. */ - lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin, - SEEK_SET); + lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET); if (lseek_ret < 0) { PERROR("Failed to seek to offset %" PRIu64 " while copying extra data received before a stream rotation", @@ -283,41 +272,41 @@ static int rotate_truncate_stream(struct relay_stream *stream) const off_t copy_size_this_pass = min_t( off_t, copy_bytes_left, sizeof(copy_buffer)); - io_ret = lttng_read(previous_stream_fd->fd, copy_buffer, + io_ret = fs_handle_read(previous_stream_file, copy_buffer, copy_size_this_pass); if (io_ret < (ssize_t) copy_size_this_pass) { if (io_ret == -1) { PERROR("Failed to read %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - previous_stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } else { ERR("Failed to read %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - previous_stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } ret = -1; goto end; } - io_ret = lttng_write(stream->stream_fd->fd, copy_buffer, - copy_size_this_pass); + io_ret = fs_handle_write( + stream->file, copy_buffer, copy_size_this_pass); if (io_ret < (ssize_t) copy_size_this_pass) { if (io_ret == -1) { PERROR("Failed to write %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - stream->stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } else { ERR("Failed to write %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - stream->stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } ret = -1; goto end; @@ -326,7 +315,8 @@ static int rotate_truncate_stream(struct relay_stream *stream) } /* Truncate the file to get rid of the excess data. */ - ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin); + ret = fs_handle_truncate( + previous_stream_file, previous_stream_copy_origin); if (ret) { PERROR("Failed to truncate current stream file to offset %" PRIu64, previous_stream_copy_origin); @@ -349,7 +339,6 @@ static int rotate_truncate_stream(struct relay_stream *stream) ret = 0; end: lttng_trace_chunk_put(previous_chunk); - stream_fd_put(previous_stream_fd); return ret; } @@ -561,7 +550,6 @@ static int stream_set_trace_chunk(struct relay_stream *stream, int ret = 0; enum lttng_trace_chunk_status status; bool acquired_reference; - struct stream_fd *new_stream_fd = NULL; status = lttng_trace_chunk_create_subdirectory(chunk, stream->path_name); @@ -575,13 +563,12 @@ static int stream_set_trace_chunk(struct relay_stream *stream, assert(acquired_reference); stream->trace_chunk = chunk; - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } ret = stream_create_data_output_file_from_trace_chunk(stream, chunk, - false, &new_stream_fd); - stream->stream_fd = new_stream_fd; + false, &stream->file); end: return ret; } @@ -684,9 +671,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace, end: if (ret) { - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } stream_put(stream); stream = NULL; @@ -809,9 +796,9 @@ static void stream_release(struct urcu_ref *ref) stream_unpublish(stream); - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } if (stream->index_file) { lttng_index_file_put(stream->index_file); @@ -991,9 +978,9 @@ void try_stream_close(struct relay_stream *stream) */ /* Put stream fd before put chunk. */ - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } if (stream->index_file) { lttng_index_file_put(stream->index_file); @@ -1013,7 +1000,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size, ASSERT_LOCKED(stream->lock); - if (!stream->stream_fd || !stream->trace_chunk) { + if (!stream->file || !stream->trace_chunk) { ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s", stream->stream_handle, stream->channel_name); ret = -1; @@ -1047,12 +1034,12 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size, tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE); stream->tracefile_current_index = new_file_index; - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } ret = stream_create_data_output_file_from_trace_chunk(stream, - stream->trace_chunk, false, &stream->stream_fd); + stream->trace_chunk, false, &stream->file); if (ret) { ERR("Failed to perform trace file rotation of stream %" PRIu64, stream->stream_handle); @@ -1087,15 +1074,15 @@ int stream_write(struct relay_stream *stream, memset(padding_buffer, 0, min(sizeof(padding_buffer), padding_to_write)); - if (!stream->stream_fd || !stream->trace_chunk) { + if (!stream->file || !stream->trace_chunk) { ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s", stream->stream_handle, stream->channel_name); ret = -1; goto end; } if (packet) { - write_ret = lttng_write(stream->stream_fd->fd, - packet->data, packet->size); + write_ret = fs_handle_write( + stream->file, packet->data, packet->size); if (write_ret != packet->size) { PERROR("Failed to write to stream file of %sstream %" PRIu64, stream->is_metadata ? "metadata " : "", @@ -1109,8 +1096,8 @@ int stream_write(struct relay_stream *stream, const size_t padding_to_write_this_pass = min(padding_to_write, sizeof(padding_buffer)); - write_ret = lttng_write(stream->stream_fd->fd, - padding_buffer, padding_to_write_this_pass); + write_ret = fs_handle_write(stream->file, padding_buffer, + padding_to_write_this_pass); if (write_ret != padding_to_write_this_pass) { PERROR("Failed to write padding to file of %sstream %" PRIu64, stream->is_metadata ? "metadata " : "", @@ -1351,9 +1338,16 @@ int stream_reset_file(struct relay_stream *stream) { ASSERT_LOCKED(stream->lock); - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + int ret; + + ret = fs_handle_close(stream->file); + if (ret) { + ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64, + stream->channel_name, + stream->stream_handle); + } + stream->file = NULL; } DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64, @@ -1366,7 +1360,7 @@ int stream_reset_file(struct relay_stream *stream) stream->pos_after_last_complete_data_index = 0; return stream_create_data_output_file_from_trace_chunk(stream, - stream->trace_chunk, true, &stream->stream_fd); + stream->trace_chunk, true, &stream->file); } void print_relay_streams(void) diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index c88e725c0..ec56fe2fa 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -31,7 +31,6 @@ #include #include "session.h" -#include "stream-fd.h" #include "tracefile-array.h" struct lttcomm_relayd_index; @@ -79,8 +78,7 @@ struct relay_stream { /* seq num to encounter before closing. */ uint64_t last_net_seq_num; - /* FD on which to write the stream data. */ - struct stream_fd *stream_fd; + struct fs_handle *file; /* index file on which to write the index data. */ struct lttng_index_file *index_file; diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 5c19fb974..9d852b661 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -158,8 +158,8 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, * If we never received a data file for the current stream, delay the * opening, otherwise open it right now. */ - if (stream->stream_fd) { - int fd, ret; + if (stream->file) { + int ret; char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; @@ -171,26 +171,20 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error_unlock; } - status = lttng_trace_chunk_open_file( - vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fd, true); + status = lttng_trace_chunk_open_fs_handle( + vstream->stream_file.trace_chunk, file_path, + O_RDONLY, 0, &vstream->stream_file.handle, + true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error_unlock; } - vstream->stream_file.fd = stream_fd_create(fd); - if (!vstream->stream_file.fd) { - if (close(fd)) { - PERROR("Failed to close viewer %sfile", - stream->is_metadata ? "metadata " : ""); - } - goto error_unlock; - } } if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { off_t lseek_ret; - lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END); + lseek_ret = fs_handle_seek( + vstream->index_file->file, 0, SEEK_END); if (lseek_ret < 0) { goto error_unlock; } @@ -241,9 +235,9 @@ static void viewer_stream_release(struct urcu_ref *ref) viewer_stream_unpublish(vstream); - if (vstream->stream_file.fd) { - stream_fd_put(vstream->stream_file.fd); - vstream->stream_file.fd = NULL; + if (vstream->stream_file.handle) { + fs_handle_close(vstream->stream_file.handle); + vstream->stream_file.handle = NULL; } if (vstream->index_file) { lttng_index_file_put(vstream->index_file); @@ -304,9 +298,9 @@ void viewer_stream_close_files(struct relay_viewer_stream *vstream) lttng_index_file_put(vstream->index_file); vstream->index_file = NULL; } - if (vstream->stream_file.fd) { - stream_fd_put(vstream->stream_file.fd); - vstream->stream_file.fd = NULL; + if (vstream->stream_file.handle) { + fs_handle_close(vstream->stream_file.handle); + vstream->stream_file.handle = NULL; } } diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h index a3262e242..63fb71cb6 100644 --- a/src/bin/lttng-relayd/viewer-stream.h +++ b/src/bin/lttng-relayd/viewer-stream.h @@ -50,8 +50,7 @@ struct relay_viewer_stream { struct relay_stream *stream; struct { - /* FD from which to read the stream data. */ - struct stream_fd *fd; + struct fs_handle *handle; struct lttng_trace_chunk *trace_chunk; } stream_file; /* index file from which to read the index data. */ diff --git a/src/common/Makefile.am b/src/common/Makefile.am index b89ce2a09..261e1e076 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -74,7 +74,8 @@ endif libcommon_la_LIBADD = \ $(top_builddir)/src/common/config/libconfig.la \ $(top_builddir)/src/common/compat/libcompat.la \ - $(top_builddir)/src/common/hashtable/libhashtable.la + $(top_builddir)/src/common/hashtable/libhashtable.la \ + $(top_builddir)/src/common/fd-tracker/libfd-tracker.la if BUILD_LIB_COMPAT SUBDIRS += compat diff --git a/src/common/fs-handle.c b/src/common/fs-handle.c index e90f06d32..58763a0b2 100644 --- a/src/common/fs-handle.c +++ b/src/common/fs-handle.c @@ -15,8 +15,9 @@ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include #include +#include +#include LTTNG_HIDDEN int fs_handle_get_fd(struct fs_handle *handle) @@ -41,3 +42,71 @@ int fs_handle_close(struct fs_handle *handle) { return handle->close(handle); } + +LTTNG_HIDDEN +ssize_t fs_handle_read(struct fs_handle *handle, void *buf, size_t count) +{ + ssize_t ret; + const int fd = fs_handle_get_fd(handle); + + if (fd < 0) { + ret = -1; + goto end; + } + + ret = lttng_read(fd, buf, count); + fs_handle_put_fd(handle); +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t fs_handle_write(struct fs_handle *handle, const void *buf, size_t count) +{ + ssize_t ret; + const int fd = fs_handle_get_fd(handle); + + if (fd < 0) { + ret = -1; + goto end; + } + + ret = lttng_write(fd, buf, count); + fs_handle_put_fd(handle); +end: + return ret; +} + +LTTNG_HIDDEN +int fs_handle_truncate(struct fs_handle *handle, off_t offset) +{ + int ret; + const int fd = fs_handle_get_fd(handle); + + if (fd < 0) { + ret = -1; + goto end; + } + + ret = ftruncate(fd, offset); + fs_handle_put_fd(handle); +end: + return ret; +} + +LTTNG_HIDDEN +int fs_handle_seek(struct fs_handle *handle, off_t offset, int whence) +{ + int ret; + const int fd = fs_handle_get_fd(handle); + + if (fd < 0) { + ret = -1; + goto end; + } + + ret = lseek(fd, offset, whence); + fs_handle_put_fd(handle); +end: + return ret; +} diff --git a/src/common/fs-handle.h b/src/common/fs-handle.h index e90ba3ca2..6f9bad1e7 100644 --- a/src/common/fs-handle.h +++ b/src/common/fs-handle.h @@ -19,6 +19,7 @@ #define FS_HANDLE_H #include +#include struct fs_handle; @@ -68,4 +69,16 @@ int fs_handle_unlink(struct fs_handle *handle); LTTNG_HIDDEN int fs_handle_close(struct fs_handle *handle); +LTTNG_HIDDEN +ssize_t fs_handle_read(struct fs_handle *handle, void *buf, size_t count); + +LTTNG_HIDDEN +ssize_t fs_handle_write(struct fs_handle *handle, const void *buf, size_t count); + +LTTNG_HIDDEN +int fs_handle_truncate(struct fs_handle *handle, off_t offset); + +LTTNG_HIDDEN +int fs_handle_seek(struct fs_handle *handle, off_t offset, int whence); + #endif /* FS_HANDLE_H */ diff --git a/src/common/index/index.c b/src/common/index/index.c index 9f7e5f67a..e68a79df5 100644 --- a/src/common/index/index.c +++ b/src/common/index/index.c @@ -44,7 +44,8 @@ static enum lttng_trace_chunk_status _lttng_index_file_create_from_trace_chunk( { struct lttng_index_file *index_file; enum lttng_trace_chunk_status chunk_status; - int ret, fd = -1; + int ret; + struct fs_handle *fs_handle = NULL; ssize_t size_ret; struct ctf_packet_index_file_hdr hdr; char index_directory_path[LTTNG_PATH_MAX]; @@ -103,15 +104,15 @@ static enum lttng_trace_chunk_status _lttng_index_file_create_from_trace_chunk( } } - chunk_status = lttng_trace_chunk_open_file(chunk, index_file_path, - flags, mode, &fd, expect_no_file); + chunk_status = lttng_trace_chunk_open_fs_handle(chunk, index_file_path, + flags, mode, &fs_handle, expect_no_file); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error; } if (flags == WRITE_FILE_FLAGS) { ctf_packet_index_file_hdr_init(&hdr, index_major, index_minor); - size_ret = lttng_write(fd, &hdr, sizeof(hdr)); + size_ret = fs_handle_write(fs_handle, &hdr, sizeof(hdr)); if (size_ret < sizeof(hdr)) { PERROR("Failed to write index header"); chunk_status = LTTNG_TRACE_CHUNK_STATUS_ERROR; @@ -121,7 +122,7 @@ static enum lttng_trace_chunk_status _lttng_index_file_create_from_trace_chunk( } else { uint32_t element_len; - size_ret = lttng_read(fd, &hdr, sizeof(hdr)); + size_ret = fs_handle_read(fs_handle, &hdr, sizeof(hdr)); if (size_ret < 0) { PERROR("Failed to read index header"); chunk_status = LTTNG_TRACE_CHUNK_STATUS_ERROR; @@ -152,7 +153,7 @@ static enum lttng_trace_chunk_status _lttng_index_file_create_from_trace_chunk( } index_file->element_len = element_len; } - index_file->fd = fd; + index_file->file = fs_handle; index_file->major = index_major; index_file->minor = index_minor; urcu_ref_init(&index_file->ref); @@ -161,8 +162,8 @@ static enum lttng_trace_chunk_status _lttng_index_file_create_from_trace_chunk( return LTTNG_TRACE_CHUNK_STATUS_OK; error: - if (fd >= 0) { - ret = close(fd); + if (fs_handle) { + ret = fs_handle_close(fs_handle); if (ret < 0) { PERROR("Failed to close file descriptor of index file"); } @@ -206,21 +207,17 @@ enum lttng_trace_chunk_status lttng_index_file_create_from_trace_chunk_read_only int lttng_index_file_write(const struct lttng_index_file *index_file, const struct ctf_packet_index *element) { - int fd; - size_t len; ssize_t ret; + const size_t len = index_file->element_len;; assert(index_file); assert(element); - fd = index_file->fd; - len = index_file->element_len; - - if (fd < 0) { + if (!index_file->file) { goto error; } - ret = lttng_write(fd, element, len); + ret = fs_handle_write(index_file->file, element, len); if (ret < len) { PERROR("writing index file"); goto error; @@ -240,16 +237,15 @@ int lttng_index_file_read(const struct lttng_index_file *index_file, struct ctf_packet_index *element) { ssize_t ret; - int fd = index_file->fd; - size_t len = index_file->element_len; + const size_t len = index_file->element_len; assert(element); - if (fd < 0) { + if (!index_file->file) { goto error; } - ret = lttng_read(fd, element, len); + ret = fs_handle_read(index_file->file, element, len); if (ret < 0) { PERROR("read index file"); goto error; @@ -274,7 +270,7 @@ static void lttng_index_file_release(struct urcu_ref *ref) struct lttng_index_file *index_file = caa_container_of(ref, struct lttng_index_file, ref); - if (close(index_file->fd)) { + if (fs_handle_close(index_file->file)) { PERROR("close index fd"); } lttng_trace_chunk_put(index_file->trace_chunk); diff --git a/src/common/index/index.h b/src/common/index/index.h index f42e31e2d..e6f79d7bd 100644 --- a/src/common/index/index.h +++ b/src/common/index/index.h @@ -23,11 +23,12 @@ #include #include -#include #include "ctf-index.h" +#include +#include struct lttng_index_file { - int fd; + struct fs_handle *file; uint32_t major; uint32_t minor; uint32_t element_len; diff --git a/src/common/trace-chunk.c b/src/common/trace-chunk.c index 326d42ce2..f045b50a9 100644 --- a/src/common/trace-chunk.c +++ b/src/common/trace-chunk.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -136,6 +137,24 @@ struct lttng_trace_chunk_registry { struct cds_lfht *ht; }; +struct fs_handle_untracked { + struct fs_handle parent; + int fd; + struct { + struct lttng_directory_handle *directory_handle; + char *path; + } location; +}; + +static +int fs_handle_untracked_get_fd(struct fs_handle *handle); +static +void fs_handle_untracked_put_fd(struct fs_handle *handle); +static +int fs_handle_untracked_unlink(struct fs_handle *handle); +static +int fs_handle_untracked_close(struct fs_handle *handle); + static const char *close_command_names[] = { [LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED] = @@ -156,6 +175,92 @@ chunk_command close_command_post_release_funcs[] = { lttng_trace_chunk_delete_post_release, }; +static +struct fs_handle *fs_handle_untracked_create( + struct lttng_directory_handle *directory_handle, + const char *path, + int fd) +{ + struct fs_handle_untracked *handle = NULL; + bool reference_acquired; + char *path_copy = strdup(path); + + assert(fd >= 0); + if (!path_copy) { + PERROR("Failed to copy file path while creating untracked filesystem handle"); + goto end; + } + + handle = zmalloc(sizeof(typeof(*handle))); + if (!handle) { + PERROR("Failed to allocate untracked filesystem handle"); + goto end; + } + + handle->parent = (typeof(handle->parent)) { + .get_fd = fs_handle_untracked_get_fd, + .put_fd = fs_handle_untracked_put_fd, + .unlink = fs_handle_untracked_unlink, + .close = fs_handle_untracked_close, + }; + + handle->fd = fd; + reference_acquired = lttng_directory_handle_get(directory_handle); + assert(reference_acquired); + handle->location.directory_handle = directory_handle; + /* Ownership is transferred. */ + handle->location.path = path_copy; + path_copy = NULL; +end: + free(path_copy); + return handle ? &handle->parent : NULL; +} + +static +int fs_handle_untracked_get_fd(struct fs_handle *_handle) +{ + struct fs_handle_untracked *handle = container_of( + _handle, struct fs_handle_untracked, parent); + + return handle->fd; +} + +static +void fs_handle_untracked_put_fd(struct fs_handle *_handle) +{ + /* no-op. */ +} + +static +int fs_handle_untracked_unlink(struct fs_handle *_handle) +{ + struct fs_handle_untracked *handle = container_of( + _handle, struct fs_handle_untracked, parent); + + return lttng_directory_handle_unlink_file( + handle->location.directory_handle, + handle->location.path); +} + +static +void fs_handle_untracked_destroy(struct fs_handle_untracked *handle) +{ + lttng_directory_handle_put(handle->location.directory_handle); + free(handle->location.path); + free(handle); +} + +static +int fs_handle_untracked_close(struct fs_handle *_handle) +{ + struct fs_handle_untracked *handle = container_of( + _handle, struct fs_handle_untracked, parent); + int ret = close(handle->fd); + + fs_handle_untracked_destroy(handle); + return ret; +} + static bool lttng_trace_chunk_registry_element_equals( const struct lttng_trace_chunk_registry_element *a, @@ -1230,16 +1335,19 @@ void lttng_trace_chunk_remove_file( assert(!ret); } -LTTNG_HIDDEN -enum lttng_trace_chunk_status lttng_trace_chunk_open_file( - struct lttng_trace_chunk *chunk, const char *file_path, - int flags, mode_t mode, int *out_fd, bool expect_no_file) +static +enum lttng_trace_chunk_status _lttng_trace_chunk_open_fs_handle_locked( + struct lttng_trace_chunk *chunk, + const char *file_path, + int flags, + mode_t mode, + struct fs_handle **out_handle, + bool expect_no_file) { int ret; enum lttng_trace_chunk_status status = LTTNG_TRACE_CHUNK_STATUS_OK; DBG("Opening trace chunk file \"%s\"", file_path); - pthread_mutex_lock(&chunk->lock); if (!chunk->credentials.is_set) { /* * Fatal error, credentials must be set before a @@ -1260,10 +1368,26 @@ enum lttng_trace_chunk_status lttng_trace_chunk_open_file( if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto end; } - ret = lttng_directory_handle_open_file_as_user( - chunk->chunk_directory, file_path, flags, mode, - chunk->credentials.value.use_current_user ? - NULL : &chunk->credentials.value.user); + if (chunk->fd_tracker) { + assert(chunk->credentials.value.use_current_user); + *out_handle = fd_tracker_open_fs_handle(chunk->fd_tracker, + chunk->chunk_directory, file_path, flags, &mode); + ret = *out_handle ? 0 : -1; + } else { + ret = lttng_directory_handle_open_file_as_user( + chunk->chunk_directory, file_path, flags, mode, + chunk->credentials.value.use_current_user ? + NULL : + &chunk->credentials.value.user); + if (ret >= 0) { + *out_handle = fs_handle_untracked_create( + chunk->chunk_directory, file_path, ret); + if (!*out_handle) { + status = LTTNG_TRACE_CHUNK_STATUS_ERROR; + goto end; + } + } + } if (ret < 0) { if (errno == ENOENT && expect_no_file) { status = LTTNG_TRACE_CHUNK_STATUS_NO_FILE; @@ -1275,9 +1399,59 @@ enum lttng_trace_chunk_status lttng_trace_chunk_open_file( lttng_trace_chunk_remove_file(chunk, file_path); goto end; } - *out_fd = ret; end: + return status; +} + +LTTNG_HIDDEN +enum lttng_trace_chunk_status lttng_trace_chunk_open_fs_handle( + struct lttng_trace_chunk *chunk, + const char *file_path, + int flags, + mode_t mode, + struct fs_handle **out_handle, + bool expect_no_file) +{ + enum lttng_trace_chunk_status status; + + pthread_mutex_lock(&chunk->lock); + status = _lttng_trace_chunk_open_fs_handle_locked(chunk, file_path, + flags, mode, out_handle, expect_no_file); + pthread_mutex_unlock(&chunk->lock); + return status; +} + +LTTNG_HIDDEN +enum lttng_trace_chunk_status lttng_trace_chunk_open_file( + struct lttng_trace_chunk *chunk, + const char *file_path, + int flags, + mode_t mode, + int *out_fd, + bool expect_no_file) +{ + enum lttng_trace_chunk_status status; + struct fs_handle *fs_handle; + + pthread_mutex_lock(&chunk->lock); + /* + * Using this method is never valid when an fd_tracker is being + * used since the resulting file descriptor would not be tracked. + */ + assert(!chunk->fd_tracker); + status = _lttng_trace_chunk_open_fs_handle_locked(chunk, file_path, + flags, mode, &fs_handle, expect_no_file); pthread_mutex_unlock(&chunk->lock); + + if (status == LTTNG_TRACE_CHUNK_STATUS_OK) { + *out_fd = fs_handle_get_fd(fs_handle); + /* + * Does not close the fd; we just "unbox" it from the fs_handle. + */ + fs_handle_untracked_destroy(container_of( + fs_handle, struct fs_handle_untracked, parent)); + } + return status; } diff --git a/src/common/trace-chunk.h b/src/common/trace-chunk.h index 52fce9960..8b43a25f7 100644 --- a/src/common/trace-chunk.h +++ b/src/common/trace-chunk.h @@ -18,12 +18,13 @@ #ifndef LTTNG_TRACE_CHUNK_H #define LTTNG_TRACE_CHUNK_H -#include -#include #include +#include +#include +#include +#include #include #include -#include /* * A trace chunk is a group of directories and files forming a (or a set of) @@ -175,8 +176,21 @@ enum lttng_trace_chunk_status lttng_trace_chunk_create_subdirectory( LTTNG_HIDDEN enum lttng_trace_chunk_status lttng_trace_chunk_open_file( - struct lttng_trace_chunk *chunk, const char *filename, - int flags, mode_t mode, int *out_fd, bool expect_no_file); + struct lttng_trace_chunk *chunk, + const char *filename, + int flags, + mode_t mode, + int *out_fd, + bool expect_no_file); + +LTTNG_HIDDEN +enum lttng_trace_chunk_status lttng_trace_chunk_open_fs_handle( + struct lttng_trace_chunk *chunk, + const char *filename, + int flags, + mode_t mode, + struct fs_handle **out_handle, + bool expect_no_file); LTTNG_HIDDEN int lttng_trace_chunk_unlink_file(struct lttng_trace_chunk *chunk,