/*
- * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
- * David Goulet <dgoulet@efficios.com>
- * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2019 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <common/common.h>
-#include <common/utils.h>
#include <common/defaults.h>
+#include <common/fs-handle.h>
#include <common/sessiond-comm/relayd.h>
-#include <urcu/rculist.h>
+#include <common/utils.h>
#include <sys/stat.h>
+#include <urcu/rculist.h>
#include "lttng-relayd.h"
#include "index.h"
static void stream_complete_rotation(struct relay_stream *stream)
{
DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+ if (stream->ongoing_rotation.value.next_trace_chunk) {
+ tracefile_array_reset(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa,
+ stream->index_received_seqcount);
+ }
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
+ stream->completed_rotation_count++;
}
static int stream_create_data_output_file_from_trace_chunk(
struct relay_stream *stream,
struct lttng_trace_chunk *trace_chunk,
bool force_unlink,
- struct stream_fd **out_stream_fd)
+ struct fs_handle **out_file)
{
- int ret, fd;
+ int ret;
char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
ASSERT_LOCKED(stream->lock);
- assert(stream->trace_chunk);
ret = utils_stream_file_path(stream->path_name, stream->channel_name,
stream->tracefile_size, stream->tracefile_current_index,
}
}
- status = lttng_trace_chunk_open_file(
- trace_chunk, stream_path, flags, mode, &fd);
+ status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
+ flags, mode, out_file, false);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
-
- *out_stream_fd = stream_fd_create(fd);
- if (!*out_stream_fd) {
- if (close(ret)) {
- PERROR("Error closing stream file descriptor %d", ret);
- }
- ret = -1;
- goto end;
- }
end:
return ret;
}
{
int ret = 0;
- DBG("Rotating stream %" PRIu64 " data file",
- stream->stream_handle);
+ DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
+ stream->stream_handle, stream->tracefile_size_current);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream->tracefile_wrapped_around = false;
stream->tracefile_current_index = 0;
if (stream->ongoing_rotation.value.next_trace_chunk) {
- struct stream_fd *new_stream_fd = NULL;
enum lttng_trace_chunk_status chunk_status;
chunk_status = lttng_trace_chunk_create_subdirectory(
/* Rotate the data file. */
ret = stream_create_data_output_file_from_trace_chunk(stream,
stream->ongoing_rotation.value.next_trace_chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
if (ret < 0) {
ERR("Failed to rotate stream data file");
goto end;
}
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->pos_after_last_complete_data_index = 0;
stream->ongoing_rotation.value.data_rotated = true;
off_t lseek_ret, previous_stream_copy_origin;
uint64_t copy_bytes_left, misplaced_data_size;
bool acquired_reference;
- struct stream_fd *previous_stream_fd = NULL;
+ struct fs_handle *previous_stream_file = NULL;
struct lttng_trace_chunk *previous_chunk = NULL;
- if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+ if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
ERR("Protocol error encoutered in %s(): stream rotation "
"sequence number is before the current sequence number "
"and the next trace chunk is unset. Honoring this "
* the orinal stream_fd will be used to copy the "extra" data
* to the new file.
*/
- assert(stream->stream_fd);
- previous_stream_fd = stream->stream_fd;
- stream->stream_fd = NULL;
+ assert(stream->file);
+ previous_stream_file = stream->file;
+ stream->file = NULL;
assert(!stream->is_metadata);
assert(stream->tracefile_size_current >
goto end;
}
- assert(stream->stream_fd);
+ assert(stream->file);
/*
* Seek the current tracefile to the position at which the rotation
* should have occurred.
*/
- lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
- SEEK_SET);
+ lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
if (lseek_ret < 0) {
PERROR("Failed to seek to offset %" PRIu64
" while copying extra data received before a stream rotation",
const off_t copy_size_this_pass = min_t(
off_t, copy_bytes_left, sizeof(copy_buffer));
- io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
+ io_ret = fs_handle_read(previous_stream_file, copy_buffer,
copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to read %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- previous_stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
} else {
ERR("Failed to read %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- previous_stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
}
ret = -1;
goto end;
}
- io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
- copy_size_this_pass);
+ io_ret = fs_handle_write(
+ stream->file, copy_buffer, copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to write %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- stream->stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
} else {
ERR("Failed to write %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- stream->stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
}
ret = -1;
goto end;
}
/* Truncate the file to get rid of the excess data. */
- ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+ ret = fs_handle_truncate(
+ previous_stream_file, previous_stream_copy_origin);
if (ret) {
PERROR("Failed to truncate current stream file to offset %" PRIu64,
previous_stream_copy_origin);
ret = 0;
end:
lttng_trace_chunk_put(previous_chunk);
- stream_fd_put(previous_stream_fd);
return ret;
}
goto end;
}
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
+ stream->prev_data_seq);
+
if (stream->prev_data_seq == -1ULL ||
- stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+ stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+ stream->prev_data_seq <
+ stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* The next packet that will be written is not part of the next
* chunk yet.
*/
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+ DBG("Stream %" PRIu64 " data not yet ready for rotation "
+ "(rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
", prev_data_seq = %" PRIu64 ")",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
stream->prev_data_seq);
goto end;
- } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+ } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* prev_data_seq is checked here since indexes and rotation
* commands are serialized with respect to each other.
ret = -1;
goto end;
}
- stream->index_file = lttng_index_file_create_from_trace_chunk(
+ status = lttng_index_file_create_from_trace_chunk(
chunk, stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->tracefile_current_index,
lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor), true);
- if (!stream->index_file) {
+ lttng_to_index_minor(major, minor), true,
+ &stream->index_file);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end;
}
goto end;
}
- if (stream->prev_index_seq == -1ULL ||
- stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
- DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
+
+ if (!stream->received_packet_seq_num.is_set ||
+ LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+ stream->ongoing_rotation.value.packet_seq_num) {
+ DBG("Stream %" PRIu64 " index not yet ready for rotation "
+ "(rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
- stream->prev_index_seq);
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
goto end;
} else {
- /* The next index belongs to the new trace chunk; rotate. */
- assert(stream->prev_index_seq + 1 ==
- stream->ongoing_rotation.value.seq_num);
+ /*
+ * The next index belongs to the new trace chunk; rotate.
+ * In overwrite mode, the packet seq num may jump over the
+ * rotation position.
+ */
+ assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+ stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
- ret = create_index_file(stream,
- stream->ongoing_rotation.value.next_trace_chunk);
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
stream->ongoing_rotation.value.index_rotated = true;
+ /*
+ * Set the rotation pivot position for the data, now that we have the
+ * net_seq_num matching the packet_seq_num index pivot position.
+ */
+ stream->ongoing_rotation.value.prev_data_net_seq =
+ stream->prev_index_seq;
if (stream->ongoing_rotation.value.data_rotated &&
stream->ongoing_rotation.value.index_rotated) {
/* Rotation completed; reset its state. */
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
- struct stream_fd *new_stream_fd = NULL;
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
assert(acquired_reference);
stream->trace_chunk = chunk;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
end:
return ret;
}
end:
if (ret) {
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream_put(stream);
stream = NULL;
stream_unpublish(stream);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
{
int ret = 0;
const struct relay_stream_rotation rotation = {
- .seq_num = rotation_sequence_number,
+ .data_rotated = false,
+ .index_rotated = false,
+ .packet_seq_num = rotation_sequence_number,
+ .prev_data_net_seq = -1ULL,
.next_trace_chunk = next_trace_chunk,
};
}
LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
- DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+ DBG("Setting pending rotation: stream_id = %" PRIu64
+ ", rotate_at_packet_seq_num = %" PRIu64,
stream->stream_handle, rotation_sequence_number);
if (stream->is_metadata) {
/*
* A metadata stream has no index; consider it already rotated.
*/
stream->ongoing_rotation.value.index_rotated = true;
+ if (next_trace_chunk) {
+ /*
+ * The metadata will be received again in the new chunk.
+ */
+ stream->metadata_received = 0;
+ }
ret = stream_rotate_data_file(stream);
} else {
- ret = try_rotate_stream_data(stream);
+ ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
- ret = try_rotate_stream_index(stream);
+ ret = try_rotate_stream_data(stream);
if (ret < 0) {
goto end;
}
stream->closed = true;
/* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
+
+ /*
+ * If we are closed by an application exiting (per-pid buffers),
+ * we need to put our reference on the stream trace chunk right
+ * away, because otherwise still holding the reference on the
+ * trace chunk could allow a viewer stream (which holds a reference
+ * to the stream) to postpone destroy waiting for the chunk to cease
+ * to exist endlessly until the viewer is detached.
+ */
+
+ /* Put stream fd before put chunk. */
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
+ }
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
pthread_mutex_unlock(&stream->lock);
DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
int ret = 0;
ASSERT_LOCKED(stream->lock);
+
+ if (!stream->file || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
if (caa_likely(stream->tracefile_size == 0)) {
/* No size limit set; nothing to check. */
goto end;
stream->stream_handle,
stream->tracefile_size_current, packet_size,
stream->tracefile_current_index, new_file_index);
- tracefile_array_file_rotate(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, false, &stream->stream_fd);
+ stream->trace_chunk, false, &stream->file);
if (ret) {
ERR("Failed to perform trace file rotation of stream %" PRIu64,
stream->stream_handle);
* Reset current size because we just performed a stream
* rotation.
*/
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
*file_rotated = true;
} else {
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
+ if (!stream->file || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
if (packet) {
- write_ret = lttng_write(stream->stream_fd->fd,
- packet->data, packet->size);
+ write_ret = fs_handle_write(
+ stream->file, packet->data, packet->size);
if (write_ret != packet->size) {
PERROR("Failed to write to stream file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
const size_t padding_to_write_this_pass =
min(padding_to_write, sizeof(padding_buffer));
- write_ret = lttng_write(stream->stream_fd->fd,
- padding_buffer, padding_to_write_this_pass);
+ write_ret = fs_handle_write(stream->file, padding_buffer,
+ padding_to_write_this_pass);
if (write_ret != padding_to_write_this_pass) {
PERROR("Failed to write padding to file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
}
if (stream->is_metadata) {
- stream->metadata_received += packet ? packet->size : 0;
- stream->metadata_received += padding_len;
+ size_t recv_len;
+
+ recv_len = packet ? packet->size : 0;
+ recv_len += padding_len;
+ stream->metadata_received += recv_len;
+ if (recv_len) {
+ stream->no_new_metadata_notified = false;
+ }
}
DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ be64toh(index->index_data.packet_seq_num));
*flushed = true;
} else if (ret > 0) {
index->total_size = total_size;
ASSERT_LOCKED(stream->lock);
+ DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
+
/* Live beacon handling */
if (index_info->packet_size == 0) {
DBG("Received live beacon for stream %" PRIu64,
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
stream->prev_index_seq = index_info->net_seq_num;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ index_info->packet_seq_num);
ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
{
ASSERT_LOCKED(stream->lock);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ int ret;
+
+ ret = fs_handle_close(stream->file);
+ if (ret) {
+ ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
+ stream->channel_name,
+ stream->stream_handle);
+ }
+ stream->file = NULL;
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->prev_data_seq = 0;
stream->prev_index_seq = 0;
stream->pos_after_last_complete_data_index = 0;
return stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, true, &stream->stream_fd);
+ stream->trace_chunk, true, &stream->file);
}
void print_relay_streams(void)