}
} else {
if (!stream->closed ||
- !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+ !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) {
(*nb_total)++;
}
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
struct relay_stream *rstream)
{
int ret = 0;
+ const uint32_t connection_major = rstream->trace->session->major;
+ const uint32_t connection_minor = rstream->trace->session->minor;
if (vstream->index_file) {
goto end;
ret = -ENOENT;
goto end;
}
- vstream->index_file = lttng_index_file_open(vstream->path_name,
- vstream->channel_name,
- vstream->stream->tracefile_count,
- vstream->current_tracefile_id);
+ vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only(
+ rstream->trace_chunk, rstream->path_name,
+ rstream->channel_name, rstream->tracefile_size,
+ vstream->current_tracefile_id,
+ lttng_to_index_major(connection_major, connection_minor),
+ lttng_to_index_minor(connection_major, connection_minor));
if (!vstream->index_file) {
ret = -1;
}
* overwrite caused by tracefile rotation (in association with
* unlink performed before overwrite).
*/
- if (!vstream->stream_fd) {
- char fullpath[PATH_MAX];
-
- if (vstream->stream->tracefile_count > 0) {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64,
- vstream->path_name,
- vstream->channel_name,
- vstream->current_tracefile_id);
- } else {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s",
- vstream->path_name,
- vstream->channel_name);
- }
+ if (!vstream->stream_file.fd) {
+ int fd;
+ char file_path[LTTNG_PATH_MAX];
+ enum lttng_trace_chunk_status status;
+
+ ret = utils_stream_file_path(rstream->path_name,
+ rstream->channel_name, rstream->tracefile_size,
+ vstream->current_tracefile_id, NULL, file_path,
+ sizeof(file_path));
if (ret < 0) {
goto error_put;
}
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening trace file");
+
+ status = lttng_trace_chunk_open_file(
+ vstream->stream_file.trace_chunk,
+ file_path, O_RDONLY, 0, &fd);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ PERROR("Failed to open trace file for viewer stream");
goto error_put;
}
- vstream->stream_fd = stream_fd_create(ret);
- if (!vstream->stream_fd) {
- if (close(ret)) {
- PERROR("close");
+ vstream->stream_file.fd = stream_fd_create(fd);
+ if (!vstream->stream_file.fd) {
+ if (close(fd)) {
+ PERROR("Failed to close viewer stream file");
}
goto error_put;
}
}
pthread_mutex_lock(&vstream->stream->lock);
- lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
- SEEK_SET);
+ lseek_ret = lseek(vstream->stream_file.fd->fd,
+ be64toh(get_packet_info.offset), SEEK_SET);
if (lseek_ret < 0) {
- PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
- (uint64_t) be64toh(get_packet_info.offset));
+ PERROR("lseek fd %d to offset %" PRIu64,
+ vstream->stream_file.fd->fd,
+ (uint64_t) be64toh(get_packet_info.offset));
goto error;
}
- read_len = lttng_read(vstream->stream_fd->fd,
- reply + sizeof(reply_header),
- packet_data_len);
+ read_len = lttng_read(vstream->stream_file.fd->fd,
+ reply + sizeof(reply_header), packet_data_len);
if (read_len < packet_data_len) {
PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- vstream->stream_fd->fd,
+ vstream->stream_file.fd->fd,
(uint64_t) be64toh(get_packet_info.offset));
goto error;
}
}
/* first time, we open the metadata file */
- if (!vstream->stream_fd) {
- char fullpath[PATH_MAX];
-
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
- vstream->channel_name);
+ if (!vstream->stream_file.fd) {
+ int fd;
+ char file_path[LTTNG_PATH_MAX];
+ enum lttng_trace_chunk_status status;
+ struct relay_stream *rstream = vstream->stream;
+
+ ret = utils_stream_file_path(rstream->path_name,
+ rstream->channel_name, rstream->tracefile_size,
+ vstream->current_tracefile_id, NULL, file_path,
+ sizeof(file_path));
if (ret < 0) {
goto error;
}
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening metadata file");
+
+ status = lttng_trace_chunk_open_file(
+ vstream->stream_file.trace_chunk,
+ file_path, O_RDONLY, 0, &fd);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ PERROR("Failed to open metadata file for viewer stream");
goto error;
}
- vstream->stream_fd = stream_fd_create(ret);
- if (!vstream->stream_fd) {
- if (close(ret)) {
- PERROR("close");
+ vstream->stream_file.fd = stream_fd_create(fd);
+ if (!vstream->stream_file.fd) {
+ if (close(fd)) {
+ PERROR("Failed to close viewer metadata file");
}
goto error;
}
goto error;
}
- read_len = lttng_read(vstream->stream_fd->fd, data, len);
+ read_len = lttng_read(vstream->stream_file.fd->fd, data, len);
if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
health_code_update();
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {