From ebb29c10d382c55529138ae70eb5a05bf3ccb9a6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Thu, 8 Aug 2019 00:55:51 -0400 Subject: [PATCH] relayd: open live viewer files from the current stream's trace chunk MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Since relay_streams' paths are now relative to a session's trace chunk, file operations performed by the live protocol handler must occur through a trace chunk. Note that viewer streams hold a reference to the current trace chunk of their corresponding relay stream. This is fine right now as no session rotations may occur during the lifetime of a live session. If this was allowed, this would be a problem since a session rotation would not complete until all live viewer streams release their reference to the trace chunk. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/live.c | 101 +++++++++++---------- src/bin/lttng-relayd/viewer-stream.c | 56 ++++++++---- src/bin/lttng-relayd/viewer-stream.h | 7 +- src/common/index/index.c | 129 +++++++-------------------- src/common/index/index.h | 10 ++- src/common/relayd/relayd.c | 2 +- src/common/relayd/relayd.h | 2 +- 7 files changed, 141 insertions(+), 166 deletions(-) diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 9503ba470..84fa14351 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1138,6 +1138,8 @@ static int try_open_index(struct relay_viewer_stream *vstream, struct relay_stream *rstream) { int ret = 0; + const uint32_t connection_major = rstream->trace->session->major; + const uint32_t connection_minor = rstream->trace->session->minor; if (vstream->index_file) { goto end; @@ -1150,10 +1152,12 @@ static int try_open_index(struct relay_viewer_stream *vstream, ret = -ENOENT; goto end; } - vstream->index_file = lttng_index_file_open(vstream->path_name, - vstream->channel_name, - vstream->stream->tracefile_count, - vstream->current_tracefile_id); + vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only( + rstream->trace_chunk, rstream->path_name, + rstream->channel_name, rstream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, connection_minor), + lttng_to_index_minor(connection_major, connection_minor)); if (!vstream->index_file) { ret = -1; } @@ -1361,31 +1365,30 @@ int viewer_get_next_index(struct relay_connection *conn) * overwrite caused by tracefile rotation (in association with * unlink performed before overwrite). */ - if (!vstream->stream_fd) { - char fullpath[PATH_MAX]; - - if (vstream->stream->tracefile_count > 0) { - ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, - vstream->path_name, - vstream->channel_name, - vstream->current_tracefile_id); - } else { - ret = snprintf(fullpath, PATH_MAX, "%s/%s", - vstream->path_name, - vstream->channel_name); - } + if (!vstream->stream_file.fd) { + int fd; + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + + ret = utils_stream_file_path(rstream->path_name, + rstream->channel_name, rstream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); if (ret < 0) { goto error_put; } - ret = open(fullpath, O_RDONLY); - if (ret < 0) { - PERROR("Relay opening trace file"); + + status = lttng_trace_chunk_open_file( + vstream->stream_file.trace_chunk, + file_path, O_RDONLY, 0, &fd); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + PERROR("Failed to open trace file for viewer stream"); goto error_put; } - vstream->stream_fd = stream_fd_create(ret); - if (!vstream->stream_fd) { - if (close(ret)) { - PERROR("close"); + vstream->stream_file.fd = stream_fd_create(fd); + if (!vstream->stream_file.fd) { + if (close(fd)) { + PERROR("Failed to close viewer stream file"); } goto error_put; } @@ -1526,19 +1529,19 @@ int viewer_get_packet(struct relay_connection *conn) } pthread_mutex_lock(&vstream->stream->lock); - lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), - SEEK_SET); + lseek_ret = lseek(vstream->stream_file.fd->fd, + be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { - PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd, - (uint64_t) be64toh(get_packet_info.offset)); + PERROR("lseek fd %d to offset %" PRIu64, + vstream->stream_file.fd->fd, + (uint64_t) be64toh(get_packet_info.offset)); goto error; } - read_len = lttng_read(vstream->stream_fd->fd, - reply + sizeof(reply_header), - packet_data_len); + read_len = lttng_read(vstream->stream_file.fd->fd, + reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, - vstream->stream_fd->fd, + vstream->stream_file.fd->fd, (uint64_t) be64toh(get_packet_info.offset)); goto error; } @@ -1644,23 +1647,31 @@ int viewer_get_metadata(struct relay_connection *conn) } /* first time, we open the metadata file */ - if (!vstream->stream_fd) { - char fullpath[PATH_MAX]; - - ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name, - vstream->channel_name); + if (!vstream->stream_file.fd) { + int fd; + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + struct relay_stream *rstream = vstream->stream; + + ret = utils_stream_file_path(rstream->path_name, + rstream->channel_name, rstream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); if (ret < 0) { goto error; } - ret = open(fullpath, O_RDONLY); - if (ret < 0) { - PERROR("Relay opening metadata file"); + + status = lttng_trace_chunk_open_file( + vstream->stream_file.trace_chunk, + file_path, O_RDONLY, 0, &fd); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + PERROR("Failed to open metadata file for viewer stream"); goto error; } - vstream->stream_fd = stream_fd_create(ret); - if (!vstream->stream_fd) { - if (close(ret)) { - PERROR("close"); + vstream->stream_file.fd = stream_fd_create(fd); + if (!vstream->stream_file.fd) { + if (close(fd)) { + PERROR("Failed to close viewer metadata file"); } goto error; } @@ -1673,7 +1684,7 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - read_len = lttng_read(vstream->stream_fd->fd, data, len); + read_len = lttng_read(vstream->stream_file.fd->fd, data, len); if (read_len < len) { PERROR("Relay reading metadata file"); goto error; diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 60aa4371d..21294d1f3 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -43,7 +43,13 @@ static void viewer_stream_destroy_rcu(struct rcu_head *head) struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, enum lttng_viewer_seek seek_t) { - struct relay_viewer_stream *vstream; + struct relay_viewer_stream *vstream = NULL; + const bool acquired_reference = lttng_trace_chunk_get( + stream->trace_chunk); + + if (!acquired_reference) { + goto error; + } vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -51,6 +57,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } + vstream->stream_file.trace_chunk = stream->trace_chunk; vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); if (vstream->path_name == NULL) { PERROR("relay viewer path_name alloc"); @@ -118,10 +125,17 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, if (stream->index_received_seqcount == 0) { vstream->index_file = NULL; } else { - vstream->index_file = lttng_index_file_open(vstream->path_name, - vstream->channel_name, - stream->tracefile_count, - vstream->current_tracefile_id); + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; + + vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only( + stream->trace_chunk, stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor)); if (!vstream->index_file) { goto error_unlock; } @@ -179,9 +193,9 @@ static void viewer_stream_release(struct urcu_ref *ref) viewer_stream_unpublish(vstream); - if (vstream->stream_fd) { - stream_fd_put(vstream->stream_fd); - vstream->stream_fd = NULL; + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; } if (vstream->index_file) { lttng_index_file_put(vstream->index_file); @@ -191,6 +205,7 @@ static void viewer_stream_release(struct urcu_ref *ref) stream_put(vstream->stream); vstream->stream = NULL; } + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); } @@ -243,8 +258,10 @@ void viewer_stream_put(struct relay_viewer_stream *vstream) int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - struct relay_stream *stream = vstream->stream; uint64_t new_id; + const struct relay_stream *stream = vstream->stream; + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; /* Detect the last tracefile to open. */ if (stream->index_received_seqcount @@ -289,15 +306,20 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) lttng_index_file_put(vstream->index_file); vstream->index_file = NULL; } - if (vstream->stream_fd) { - stream_fd_put(vstream->stream_fd); - vstream->stream_fd = NULL; + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; } - - vstream->index_file = lttng_index_file_open(vstream->path_name, - vstream->channel_name, - stream->tracefile_count, - vstream->current_tracefile_id); + vstream->index_file = + lttng_index_file_create_from_trace_chunk_read_only( + stream->trace_chunk, stream->path_name, + stream->channel_name, + stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor)); if (!vstream->index_file) { ret = -1; goto end; diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h index 3d9b076b7..7c42dfad3 100644 --- a/src/bin/lttng-relayd/viewer-stream.h +++ b/src/bin/lttng-relayd/viewer-stream.h @@ -49,8 +49,11 @@ struct relay_viewer_stream { /* Back ref to stream. */ struct relay_stream *stream; - /* FD from which to read the stream data. */ - struct stream_fd *stream_fd; + struct { + /* FD from which to read the stream data. */ + struct stream_fd *fd; + struct lttng_trace_chunk *trace_chunk; + } stream_file; /* index file from which to read the index data. */ struct lttng_index_file *index_file; diff --git a/src/common/index/index.c b/src/common/index/index.c index 5a6fe3d86..4df0d1b49 100644 --- a/src/common/index/index.c +++ b/src/common/index/index.c @@ -31,12 +31,16 @@ #include "index.h" -struct lttng_index_file *lttng_index_file_create_from_trace_chunk( +#define WRITE_FILE_FLAGS O_WRONLY | O_CREAT | O_TRUNC +#define READ_ONLY_FILE_FLAGS O_RDONLY + +static struct lttng_index_file *_lttng_index_file_create_from_trace_chunk( struct lttng_trace_chunk *chunk, - const char *channel_path, char *stream_name, + const char *channel_path, const char *stream_name, uint64_t stream_file_size, uint64_t stream_file_index, uint32_t index_major, uint32_t index_minor, - bool unlink_existing_file) + bool unlink_existing_file, + int flags) { struct lttng_index_file *index_file; enum lttng_trace_chunk_status chunk_status; @@ -47,9 +51,8 @@ struct lttng_index_file *lttng_index_file_create_from_trace_chunk( char index_file_path[LTTNG_PATH_MAX]; const uint32_t element_len = ctf_packet_index_len(index_major, index_minor); - const int flags = O_WRONLY | O_CREAT | O_TRUNC; const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; - bool acquired_reference = lttng_trace_chunk_get(chunk); + const bool acquired_reference = lttng_trace_chunk_get(chunk); assert(acquired_reference); @@ -124,6 +127,31 @@ error: return NULL; } +struct lttng_index_file *lttng_index_file_create_from_trace_chunk( + struct lttng_trace_chunk *chunk, + const char *channel_path, const char *stream_name, + uint64_t stream_file_size, uint64_t stream_file_index, + uint32_t index_major, uint32_t index_minor, + bool unlink_existing_file) +{ + return _lttng_index_file_create_from_trace_chunk(chunk, channel_path, + stream_name, stream_file_size, stream_file_index, + index_major, index_minor, unlink_existing_file, + WRITE_FILE_FLAGS); +} + +struct lttng_index_file *lttng_index_file_create_from_trace_chunk_read_only( + struct lttng_trace_chunk *chunk, + const char *channel_path, const char *stream_name, + uint64_t stream_file_size, uint64_t stream_file_index, + uint32_t index_major, uint32_t index_minor) +{ + return _lttng_index_file_create_from_trace_chunk(chunk, channel_path, + stream_name, stream_file_size, stream_file_index, + index_major, index_minor, false, + READ_ONLY_FILE_FLAGS); +} + /* * Write index values to the given index file. * @@ -190,97 +218,6 @@ error: return -1; } -/* - * Open index file using a given path, channel name and tracefile count. - * - * Return allocated struct lttng_index_file, NULL on error. - */ -struct lttng_index_file *lttng_index_file_open(const char *path_name, - const char *channel_name, uint64_t tracefile_count, - uint64_t tracefile_count_current) -{ - struct lttng_index_file *index_file; - int ret, read_fd; - ssize_t read_len; - char fullpath[PATH_MAX]; - struct ctf_packet_index_file_hdr hdr; - uint32_t major, minor, element_len; - - assert(path_name); - assert(channel_name); - - index_file = zmalloc(sizeof(*index_file)); - if (!index_file) { - PERROR("allocating lttng_index_file"); - goto error; - } - - if (tracefile_count > 0) { - ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%" - PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name, - channel_name, tracefile_count_current); - } else { - ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s" - DEFAULT_INDEX_FILE_SUFFIX, path_name, channel_name); - } - if (ret < 0) { - PERROR("snprintf index path"); - goto error; - } - - DBG("Index opening file %s in read only", fullpath); - read_fd = open(fullpath, O_RDONLY); - if (read_fd < 0) { - PERROR("opening index in read-only"); - goto error; - } - - read_len = lttng_read(read_fd, &hdr, sizeof(hdr)); - if (read_len < 0) { - PERROR("Reading index header"); - goto error_close; - } - - if (be32toh(hdr.magic) != CTF_INDEX_MAGIC) { - ERR("Invalid header magic"); - goto error_close; - } - major = be32toh(hdr.index_major); - minor = be32toh(hdr.index_minor); - element_len = be32toh(hdr.packet_index_len); - - if (major != CTF_INDEX_MAJOR) { - ERR("Invalid header version"); - goto error_close; - } - if (element_len > sizeof(struct ctf_packet_index)) { - ERR("Index element length too long"); - goto error_close; - } - - index_file->fd = read_fd; - index_file->major = major; - index_file->minor = minor; - index_file->element_len = element_len; - urcu_ref_init(&index_file->ref); - - return index_file; - -error_close: - if (read_fd >= 0) { - int close_ret; - - close_ret = close(read_fd); - if (close_ret < 0) { - PERROR("close read fd %d", read_fd); - } - } - -error: - free(index_file); - return NULL; -} - void lttng_index_file_get(struct lttng_index_file *index_file) { urcu_ref_get(&index_file->ref); diff --git a/src/common/index/index.h b/src/common/index/index.h index b5ffc3149..3dbe458f3 100644 --- a/src/common/index/index.h +++ b/src/common/index/index.h @@ -41,13 +41,15 @@ struct lttng_index_file { */ struct lttng_index_file *lttng_index_file_create_from_trace_chunk( struct lttng_trace_chunk *chunk, - const char *channel_path, char *stream_name, + const char *channel_path, const char *stream_name, uint64_t stream_file_size, uint64_t stream_count, uint32_t index_major, uint32_t index_minor, bool unlink_existing_file); -struct lttng_index_file *lttng_index_file_open(const char *path_name, - const char *channel_name, uint64_t tracefile_count, - uint64_t tracefile_count_current); +struct lttng_index_file *lttng_index_file_create_from_trace_chunk_read_only( + struct lttng_trace_chunk *chunk, + const char *channel_path, const char *stream_name, + uint64_t stream_file_size, uint64_t stream_file_index, + uint32_t index_major, uint32_t index_minor); int lttng_index_file_write(const struct lttng_index_file *index_file, const struct ctf_packet_index *element); int lttng_index_file_read(const struct lttng_index_file *index_file, diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 363a0e7b6..76c54a4a9 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -1123,7 +1123,7 @@ error: } int relayd_rotate_streams(struct lttcomm_relayd_sock *sock, - unsigned int stream_count, uint64_t *new_chunk_id, + unsigned int stream_count, const uint64_t *new_chunk_id, const struct relayd_stream_rotation_position *positions) { int ret; diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 3448095e5..9aa7abb7b 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -73,7 +73,7 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); /* `positions` is an array of `stream_count` relayd_stream_rotation_position. */ int relayd_rotate_streams(struct lttcomm_relayd_sock *sock, - unsigned int stream_count, uint64_t *new_chunk_id, + unsigned int stream_count, const uint64_t *new_chunk_id, const struct relayd_stream_rotation_position *positions); int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, struct lttng_trace_chunk *chunk); -- 2.34.1