relayd: open live viewer files from the current stream's trace chunk
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 8 Aug 2019 04:55:51 +0000 (00:55 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 9 Aug 2019 15:28:43 +0000 (11:28 -0400)
Since relay_streams' paths are now relative to a session's trace
chunk, file operations performed by the live protocol handler must
occur through a trace chunk.

Note that viewer streams hold a reference to the current trace chunk
of their corresponding relay stream. This is fine right now as no
session rotations may occur during the lifetime of a live session. If
this was allowed, this would be a problem since a session rotation
would not complete until all live viewer streams release their
reference to the trace chunk.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/viewer-stream.c
src/bin/lttng-relayd/viewer-stream.h
src/common/index/index.c
src/common/index/index.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h

index 9503ba470b70cd496291ec612f304c45446661d5..84fa1435196057c6a506b019eedf4bcf477da49b 100644 (file)
@@ -1138,6 +1138,8 @@ static int try_open_index(struct relay_viewer_stream *vstream,
                struct relay_stream *rstream)
 {
        int ret = 0;
+       const uint32_t connection_major = rstream->trace->session->major;
+       const uint32_t connection_minor = rstream->trace->session->minor;
 
        if (vstream->index_file) {
                goto end;
@@ -1150,10 +1152,12 @@ static int try_open_index(struct relay_viewer_stream *vstream,
                ret = -ENOENT;
                goto end;
        }
-       vstream->index_file = lttng_index_file_open(vstream->path_name,
-                       vstream->channel_name,
-                       vstream->stream->tracefile_count,
-                       vstream->current_tracefile_id);
+       vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only(
+                       rstream->trace_chunk, rstream->path_name,
+                       rstream->channel_name, rstream->tracefile_size,
+                       vstream->current_tracefile_id,
+                       lttng_to_index_major(connection_major, connection_minor),
+                       lttng_to_index_minor(connection_major, connection_minor));
        if (!vstream->index_file) {
                ret = -1;
        }
@@ -1361,31 +1365,30 @@ int viewer_get_next_index(struct relay_connection *conn)
         * overwrite caused by tracefile rotation (in association with
         * unlink performed before overwrite).
         */
-       if (!vstream->stream_fd) {
-               char fullpath[PATH_MAX];
-
-               if (vstream->stream->tracefile_count > 0) {
-                       ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64,
-                                       vstream->path_name,
-                                       vstream->channel_name,
-                                       vstream->current_tracefile_id);
-               } else {
-                       ret = snprintf(fullpath, PATH_MAX, "%s/%s",
-                                       vstream->path_name,
-                                       vstream->channel_name);
-               }
+       if (!vstream->stream_file.fd) {
+               int fd;
+               char file_path[LTTNG_PATH_MAX];
+               enum lttng_trace_chunk_status status;
+
+               ret = utils_stream_file_path(rstream->path_name,
+                               rstream->channel_name, rstream->tracefile_size,
+                               vstream->current_tracefile_id, NULL, file_path,
+                               sizeof(file_path));
                if (ret < 0) {
                        goto error_put;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening trace file");
+
+               status = lttng_trace_chunk_open_file(
+                               vstream->stream_file.trace_chunk,
+                               file_path, O_RDONLY, 0, &fd);
+               if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       PERROR("Failed to open trace file for viewer stream");
                        goto error_put;
                }
-               vstream->stream_fd = stream_fd_create(ret);
-               if (!vstream->stream_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
+               vstream->stream_file.fd = stream_fd_create(fd);
+               if (!vstream->stream_file.fd) {
+                       if (close(fd)) {
+                               PERROR("Failed to close viewer stream file");
                        }
                        goto error_put;
                }
@@ -1526,19 +1529,19 @@ int viewer_get_packet(struct relay_connection *conn)
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
-                       SEEK_SET);
+       lseek_ret = lseek(vstream->stream_file.fd->fd,
+                       be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
-               PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
-                       (uint64_t) be64toh(get_packet_info.offset));
+               PERROR("lseek fd %d to offset %" PRIu64,
+                               vstream->stream_file.fd->fd,
+                               (uint64_t) be64toh(get_packet_info.offset));
                goto error;
        }
-       read_len = lttng_read(vstream->stream_fd->fd,
-                       reply + sizeof(reply_header),
-                       packet_data_len);
+       read_len = lttng_read(vstream->stream_file.fd->fd,
+                       reply + sizeof(reply_header), packet_data_len);
        if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
-                               vstream->stream_fd->fd,
+                               vstream->stream_file.fd->fd,
                                (uint64_t) be64toh(get_packet_info.offset));
                goto error;
        }
@@ -1644,23 +1647,31 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
 
        /* first time, we open the metadata file */
-       if (!vstream->stream_fd) {
-               char fullpath[PATH_MAX];
-
-               ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
-                               vstream->channel_name);
+       if (!vstream->stream_file.fd) {
+               int fd;
+               char file_path[LTTNG_PATH_MAX];
+               enum lttng_trace_chunk_status status;
+               struct relay_stream *rstream = vstream->stream;
+
+               ret = utils_stream_file_path(rstream->path_name,
+                               rstream->channel_name, rstream->tracefile_size,
+                               vstream->current_tracefile_id, NULL, file_path,
+                               sizeof(file_path));
                if (ret < 0) {
                        goto error;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening metadata file");
+
+               status = lttng_trace_chunk_open_file(
+                               vstream->stream_file.trace_chunk,
+                               file_path, O_RDONLY, 0, &fd);
+               if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       PERROR("Failed to open metadata file for viewer stream");
                        goto error;
                }
-               vstream->stream_fd = stream_fd_create(ret);
-               if (!vstream->stream_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
+               vstream->stream_file.fd = stream_fd_create(fd);
+               if (!vstream->stream_file.fd) {
+                       if (close(fd)) {
+                               PERROR("Failed to close viewer metadata file");
                        }
                        goto error;
                }
@@ -1673,7 +1684,7 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
+       read_len = lttng_read(vstream->stream_file.fd->fd, data, len);
        if (read_len < len) {
                PERROR("Relay reading metadata file");
                goto error;
index 60aa4371d5cc446bb4dac4bdd654e13d45adc9ea..21294d1f32dfc700c33c1075b1343e36b84c2833 100644 (file)
@@ -43,7 +43,13 @@ static void viewer_stream_destroy_rcu(struct rcu_head *head)
 struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                enum lttng_viewer_seek seek_t)
 {
-       struct relay_viewer_stream *vstream;
+       struct relay_viewer_stream *vstream = NULL;
+       const bool acquired_reference = lttng_trace_chunk_get(
+                       stream->trace_chunk);
+
+       if (!acquired_reference) {
+               goto error;
+       }
 
        vstream = zmalloc(sizeof(*vstream));
        if (!vstream) {
@@ -51,6 +57,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                goto error;
        }
 
+       vstream->stream_file.trace_chunk = stream->trace_chunk;
        vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
        if (vstream->path_name == NULL) {
                PERROR("relay viewer path_name alloc");
@@ -118,10 +125,17 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
        if (stream->index_received_seqcount == 0) {
                vstream->index_file = NULL;
        } else {
-               vstream->index_file = lttng_index_file_open(vstream->path_name,
-                               vstream->channel_name,
-                               stream->tracefile_count,
-                               vstream->current_tracefile_id);
+               const uint32_t connection_major = stream->trace->session->major;
+               const uint32_t connection_minor = stream->trace->session->minor;
+
+               vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only(
+                               stream->trace_chunk, stream->path_name,
+                               stream->channel_name, stream->tracefile_size,
+                               vstream->current_tracefile_id,
+                               lttng_to_index_major(connection_major,
+                                               connection_minor),
+                               lttng_to_index_minor(connection_major,
+                                               connection_minor));
                if (!vstream->index_file) {
                        goto error_unlock;
                }
@@ -179,9 +193,9 @@ static void viewer_stream_release(struct urcu_ref *ref)
 
        viewer_stream_unpublish(vstream);
 
-       if (vstream->stream_fd) {
-               stream_fd_put(vstream->stream_fd);
-               vstream->stream_fd = NULL;
+       if (vstream->stream_file.fd) {
+               stream_fd_put(vstream->stream_file.fd);
+               vstream->stream_file.fd = NULL;
        }
        if (vstream->index_file) {
                lttng_index_file_put(vstream->index_file);
@@ -191,6 +205,7 @@ static void viewer_stream_release(struct urcu_ref *ref)
                stream_put(vstream->stream);
                vstream->stream = NULL;
        }
+       lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
        call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
 }
 
@@ -243,8 +258,10 @@ void viewer_stream_put(struct relay_viewer_stream *vstream)
 int viewer_stream_rotate(struct relay_viewer_stream *vstream)
 {
        int ret;
-       struct relay_stream *stream = vstream->stream;
        uint64_t new_id;
+       const struct relay_stream *stream = vstream->stream;
+       const uint32_t connection_major = stream->trace->session->major;
+       const uint32_t connection_minor = stream->trace->session->minor;
 
        /* Detect the last tracefile to open. */
        if (stream->index_received_seqcount
@@ -289,15 +306,20 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
                lttng_index_file_put(vstream->index_file);
                vstream->index_file = NULL;
        }
-       if (vstream->stream_fd) {
-               stream_fd_put(vstream->stream_fd);
-               vstream->stream_fd = NULL;
+       if (vstream->stream_file.fd) {
+               stream_fd_put(vstream->stream_file.fd);
+               vstream->stream_file.fd = NULL;
        }
-
-       vstream->index_file = lttng_index_file_open(vstream->path_name,
-                       vstream->channel_name,
-                       stream->tracefile_count,
-                       vstream->current_tracefile_id);
+       vstream->index_file =
+                       lttng_index_file_create_from_trace_chunk_read_only(
+                                       stream->trace_chunk, stream->path_name,
+                                       stream->channel_name,
+                                       stream->tracefile_size,
+                                       vstream->current_tracefile_id,
+                                       lttng_to_index_major(connection_major,
+                                                       connection_minor),
+                                       lttng_to_index_minor(connection_major,
+                                                       connection_minor));
        if (!vstream->index_file) {
                ret = -1;
                goto end;
index 3d9b076b75f533bf849e6eb6104fa33307d11f32..7c42dfad39a7444038400101b88334aae622237a 100644 (file)
@@ -49,8 +49,11 @@ struct relay_viewer_stream {
        /* Back ref to stream. */
        struct relay_stream *stream;
 
-       /* FD from which to read the stream data. */
-       struct stream_fd *stream_fd;
+       struct {
+               /* FD from which to read the stream data. */
+               struct stream_fd *fd;
+               struct lttng_trace_chunk *trace_chunk;
+       } stream_file;
        /* index file from which to read the index data. */
        struct lttng_index_file *index_file;
 
index 5a6fe3d8608bf56def8212199dcaaac540def172..4df0d1b49116e8e177beb358e523f9dcca5f7c90 100644 (file)
 
 #include "index.h"
 
-struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
+#define WRITE_FILE_FLAGS       O_WRONLY | O_CREAT | O_TRUNC
+#define READ_ONLY_FILE_FLAGS   O_RDONLY
+
+static struct lttng_index_file *_lttng_index_file_create_from_trace_chunk(
                struct lttng_trace_chunk *chunk,
-               const char *channel_path, char *stream_name,
+               const char *channel_path, const char *stream_name,
                uint64_t stream_file_size, uint64_t stream_file_index,
                uint32_t index_major, uint32_t index_minor,
-               bool unlink_existing_file)
+               bool unlink_existing_file,
+               int flags)
 {
        struct lttng_index_file *index_file;
        enum lttng_trace_chunk_status chunk_status;
@@ -47,9 +51,8 @@ struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
        char index_file_path[LTTNG_PATH_MAX];
        const uint32_t element_len = ctf_packet_index_len(index_major,
                        index_minor);
-       const int flags = O_WRONLY | O_CREAT | O_TRUNC;
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
-       bool acquired_reference = lttng_trace_chunk_get(chunk);
+       const bool acquired_reference = lttng_trace_chunk_get(chunk);
 
        assert(acquired_reference);
 
@@ -124,6 +127,31 @@ error:
        return NULL;
 }
 
+struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
+               struct lttng_trace_chunk *chunk,
+               const char *channel_path, const char *stream_name,
+               uint64_t stream_file_size, uint64_t stream_file_index,
+               uint32_t index_major, uint32_t index_minor,
+               bool unlink_existing_file)
+{
+       return _lttng_index_file_create_from_trace_chunk(chunk, channel_path,
+                       stream_name, stream_file_size, stream_file_index,
+                       index_major, index_minor, unlink_existing_file,
+                       WRITE_FILE_FLAGS);
+}
+
+struct lttng_index_file *lttng_index_file_create_from_trace_chunk_read_only(
+               struct lttng_trace_chunk *chunk,
+               const char *channel_path, const char *stream_name,
+               uint64_t stream_file_size, uint64_t stream_file_index,
+               uint32_t index_major, uint32_t index_minor)
+{
+       return _lttng_index_file_create_from_trace_chunk(chunk, channel_path,
+                       stream_name, stream_file_size, stream_file_index,
+                       index_major, index_minor, false,
+                       READ_ONLY_FILE_FLAGS);
+}
+
 /*
  * Write index values to the given index file.
  *
@@ -190,97 +218,6 @@ error:
        return -1;
 }
 
-/*
- * Open index file using a given path, channel name and tracefile count.
- *
- * Return allocated struct lttng_index_file, NULL on error.
- */
-struct lttng_index_file *lttng_index_file_open(const char *path_name,
-               const char *channel_name, uint64_t tracefile_count,
-               uint64_t tracefile_count_current)
-{
-       struct lttng_index_file *index_file;
-       int ret, read_fd;
-       ssize_t read_len;
-       char fullpath[PATH_MAX];
-       struct ctf_packet_index_file_hdr hdr;
-       uint32_t major, minor, element_len;
-
-       assert(path_name);
-       assert(channel_name);
-
-       index_file = zmalloc(sizeof(*index_file));
-       if (!index_file) {
-               PERROR("allocating lttng_index_file");
-               goto error;
-       }
-
-       if (tracefile_count > 0) {
-               ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
-                               PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name,
-                               channel_name, tracefile_count_current);
-       } else {
-               ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s"
-                               DEFAULT_INDEX_FILE_SUFFIX, path_name, channel_name);
-       }
-       if (ret < 0) {
-               PERROR("snprintf index path");
-               goto error;
-       }
-
-       DBG("Index opening file %s in read only", fullpath);
-       read_fd = open(fullpath, O_RDONLY);
-       if (read_fd < 0) {
-               PERROR("opening index in read-only");
-               goto error;
-       }
-
-       read_len = lttng_read(read_fd, &hdr, sizeof(hdr));
-       if (read_len < 0) {
-               PERROR("Reading index header");
-               goto error_close;
-       }
-
-       if (be32toh(hdr.magic) != CTF_INDEX_MAGIC) {
-               ERR("Invalid header magic");
-               goto error_close;
-       }
-       major = be32toh(hdr.index_major);
-       minor = be32toh(hdr.index_minor);
-       element_len = be32toh(hdr.packet_index_len);
-
-       if (major != CTF_INDEX_MAJOR) {
-               ERR("Invalid header version");
-               goto error_close;
-       }
-       if (element_len > sizeof(struct ctf_packet_index)) {
-               ERR("Index element length too long");
-               goto error_close;
-       }
-
-       index_file->fd = read_fd;
-       index_file->major = major;
-       index_file->minor = minor;
-       index_file->element_len = element_len;
-       urcu_ref_init(&index_file->ref);
-
-       return index_file;
-
-error_close:
-       if (read_fd >= 0) {
-               int close_ret;
-
-               close_ret = close(read_fd);
-               if (close_ret < 0) {
-                       PERROR("close read fd %d", read_fd);
-               }
-       }
-
-error:
-       free(index_file);
-       return NULL;
-}
-
 void lttng_index_file_get(struct lttng_index_file *index_file)
 {
        urcu_ref_get(&index_file->ref);
index b5ffc314998b483acbb9f0df47da5e2fcef10a45..3dbe458f3985cc6ab7b702e0afd501e96bc96dd5 100644 (file)
@@ -41,13 +41,15 @@ struct lttng_index_file {
  */
 struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
                struct lttng_trace_chunk *chunk,
-               const char *channel_path, char *stream_name,
+               const char *channel_path, const char *stream_name,
                uint64_t stream_file_size, uint64_t stream_count,
                uint32_t index_major, uint32_t index_minor,
                bool unlink_existing_file);
-struct lttng_index_file *lttng_index_file_open(const char *path_name,
-               const char *channel_name, uint64_t tracefile_count,
-               uint64_t tracefile_count_current);
+struct lttng_index_file *lttng_index_file_create_from_trace_chunk_read_only(
+               struct lttng_trace_chunk *chunk,
+               const char *channel_path, const char *stream_name,
+               uint64_t stream_file_size, uint64_t stream_file_index,
+               uint32_t index_major, uint32_t index_minor);
 int lttng_index_file_write(const struct lttng_index_file *index_file,
                const struct ctf_packet_index *element);
 int lttng_index_file_read(const struct lttng_index_file *index_file,
index 363a0e7b6dc02f8214d92175f64f4d06d48aa5b8..76c54a4a9f6724708f8e3f07153aaea29bddc0fa 100644 (file)
@@ -1123,7 +1123,7 @@ error:
 }
 
 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
-               unsigned int stream_count, uint64_t *new_chunk_id,
+               unsigned int stream_count, const uint64_t *new_chunk_id,
                const struct relayd_stream_rotation_position *positions)
 {
        int ret;
index 3448095e54bb27bdc3a7d5a85df3a7132ee0b6aa..9aa7abb7b28fb59d23ee48ad3390d15f5cec147a 100644 (file)
@@ -73,7 +73,7 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
 /* `positions` is an array of `stream_count` relayd_stream_rotation_position. */
 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
-               unsigned int stream_count, uint64_t *new_chunk_id,
+               unsigned int stream_count, const uint64_t *new_chunk_id,
                const struct relayd_stream_rotation_position *positions);
 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
                struct lttng_trace_chunk *chunk);
This page took 0.033091 seconds and 4 git commands to generate.