projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
lttng-view: clean-up: remove commented and unused references to lttv
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
stream.c
diff --git
a/src/bin/lttng-relayd/stream.c
b/src/bin/lttng-relayd/stream.c
index 62b426360ac1830def68230f3d40d860d968520c..1e51547fa2bc928924cf79bbce9fbbd8b2add379 100644
(file)
--- a/
src/bin/lttng-relayd/stream.c
+++ b/
src/bin/lttng-relayd/stream.c
@@
-20,11
+20,12
@@
#define _LGPL_SOURCE
#include <common/common.h>
#define _LGPL_SOURCE
#include <common/common.h>
-#include <common/utils.h>
#include <common/defaults.h>
#include <common/defaults.h>
+#include <common/fs-handle.h>
#include <common/sessiond-comm/relayd.h>
#include <common/sessiond-comm/relayd.h>
-#include <
urcu/rculist
.h>
+#include <
common/utils
.h>
#include <sys/stat.h>
#include <sys/stat.h>
+#include <urcu/rculist.h>
#include "lttng-relayd.h"
#include "index.h"
#include "lttng-relayd.h"
#include "index.h"
@@
-72,6
+73,11
@@
end:
static void stream_complete_rotation(struct relay_stream *stream)
{
DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
static void stream_complete_rotation(struct relay_stream *stream)
{
DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+ if (stream->ongoing_rotation.value.next_trace_chunk) {
+ tracefile_array_reset(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa,
+ stream->index_received_seqcount);
+ }
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
@@
-81,9
+87,9
@@
static int stream_create_data_output_file_from_trace_chunk(
struct relay_stream *stream,
struct lttng_trace_chunk *trace_chunk,
bool force_unlink,
struct relay_stream *stream,
struct lttng_trace_chunk *trace_chunk,
bool force_unlink,
- struct
stream_fd **out_stream_fd
)
+ struct
fs_handle **out_file
)
{
{
- int ret
, fd
;
+ int ret;
char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
@@
-122,22
+128,13
@@
static int stream_create_data_output_file_from_trace_chunk(
}
}
}
}
- status = lttng_trace_chunk_open_f
ile(
-
trace_chunk, stream_path, flags, mode, &fd
, false);
+ status = lttng_trace_chunk_open_f
s_handle(trace_chunk, stream_path,
+
flags, mode, out_file
, false);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
-
- *out_stream_fd = stream_fd_create(fd);
- if (!*out_stream_fd) {
- if (close(ret)) {
- PERROR("Error closing stream file descriptor %d", ret);
- }
- ret = -1;
- goto end;
- }
end:
return ret;
}
end:
return ret;
}
@@
-149,16
+146,15
@@
static int stream_rotate_data_file(struct relay_stream *stream)
DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
stream->stream_handle, stream->tracefile_size_current);
DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
stream->stream_handle, stream->tracefile_size_current);
- if (stream->
stream_fd
) {
-
stream_fd_put(stream->stream_fd
);
- stream->
stream_fd
= NULL;
+ if (stream->
file
) {
+
fs_handle_close(stream->file
);
+ stream->
file
= NULL;
}
stream->tracefile_wrapped_around = false;
stream->tracefile_current_index = 0;
if (stream->ongoing_rotation.value.next_trace_chunk) {
}
stream->tracefile_wrapped_around = false;
stream->tracefile_current_index = 0;
if (stream->ongoing_rotation.value.next_trace_chunk) {
- struct stream_fd *new_stream_fd = NULL;
enum lttng_trace_chunk_status chunk_status;
chunk_status = lttng_trace_chunk_create_subdirectory(
enum lttng_trace_chunk_status chunk_status;
chunk_status = lttng_trace_chunk_create_subdirectory(
@@
-172,8
+168,7
@@
static int stream_rotate_data_file(struct relay_stream *stream)
/* Rotate the data file. */
ret = stream_create_data_output_file_from_trace_chunk(stream,
stream->ongoing_rotation.value.next_trace_chunk,
/* Rotate the data file. */
ret = stream_create_data_output_file_from_trace_chunk(stream,
stream->ongoing_rotation.value.next_trace_chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
if (ret < 0) {
ERR("Failed to rotate stream data file");
goto end;
if (ret < 0) {
ERR("Failed to rotate stream data file");
goto end;
@@
-208,7
+203,7
@@
static int rotate_truncate_stream(struct relay_stream *stream)
off_t lseek_ret, previous_stream_copy_origin;
uint64_t copy_bytes_left, misplaced_data_size;
bool acquired_reference;
off_t lseek_ret, previous_stream_copy_origin;
uint64_t copy_bytes_left, misplaced_data_size;
bool acquired_reference;
- struct
stream_fd *previous_stream_fd
= NULL;
+ struct
fs_handle *previous_stream_file
= NULL;
struct lttng_trace_chunk *previous_chunk = NULL;
if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
struct lttng_trace_chunk *previous_chunk = NULL;
if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
@@
-239,9
+234,9
@@
static int rotate_truncate_stream(struct relay_stream *stream)
* the orinal stream_fd will be used to copy the "extra" data
* to the new file.
*/
* the orinal stream_fd will be used to copy the "extra" data
* to the new file.
*/
- assert(stream->
stream_fd
);
- previous_stream_f
d = stream->stream_fd
;
- stream->
stream_fd
= NULL;
+ assert(stream->
file
);
+ previous_stream_f
ile = stream->file
;
+ stream->
file
= NULL;
assert(!stream->is_metadata);
assert(stream->tracefile_size_current >
assert(!stream->is_metadata);
assert(stream->tracefile_size_current >
@@
-256,13
+251,12
@@
static int rotate_truncate_stream(struct relay_stream *stream)
goto end;
}
goto end;
}
- assert(stream->
stream_fd
);
+ assert(stream->
file
);
/*
* Seek the current tracefile to the position at which the rotation
* should have occurred.
*/
/*
* Seek the current tracefile to the position at which the rotation
* should have occurred.
*/
- lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
- SEEK_SET);
+ lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
if (lseek_ret < 0) {
PERROR("Failed to seek to offset %" PRIu64
" while copying extra data received before a stream rotation",
if (lseek_ret < 0) {
PERROR("Failed to seek to offset %" PRIu64
" while copying extra data received before a stream rotation",
@@
-278,41
+272,41
@@
static int rotate_truncate_stream(struct relay_stream *stream)
const off_t copy_size_this_pass = min_t(
off_t, copy_bytes_left, sizeof(copy_buffer));
const off_t copy_size_this_pass = min_t(
off_t, copy_bytes_left, sizeof(copy_buffer));
- io_ret =
lttng_read(previous_stream_fd->fd
, copy_buffer,
+ io_ret =
fs_handle_read(previous_stream_file
, copy_buffer,
copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to read %" PRIu64
copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to read %" PRIu64
- " bytes from
fd %i in %s(), returned %zi"
,
+ " bytes from
previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
copy_size_this_pass,
copy_size_this_pass,
-
previous_stream_fd->fd
,
-
__FUNCTION__, io_ret
);
+
__FUNCTION__, io_ret
,
+
stream->stream_handle
);
} else {
ERR("Failed to read %" PRIu64
} else {
ERR("Failed to read %" PRIu64
-
" bytes from fd %i in %s(), returned %zi"
,
+
" bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
copy_size_this_pass,
copy_size_this_pass,
-
previous_stream_fd->fd
,
-
__FUNCTION__, io_ret
);
+
__FUNCTION__, io_ret
,
+
stream->stream_handle
);
}
ret = -1;
goto end;
}
}
ret = -1;
goto end;
}
- io_ret =
lttng_write(stream->stream_fd->fd, copy_buffer,
- copy_size_this_pass);
+ io_ret =
fs_handle_write(
+
stream->file, copy_buffer,
copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to write %" PRIu64
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to write %" PRIu64
- " bytes from
fd %i in %s(), returned %zi"
,
+ " bytes from
previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
copy_size_this_pass,
copy_size_this_pass,
-
stream->stream_fd->fd
,
-
__FUNCTION__, io_ret
);
+
__FUNCTION__, io_ret
,
+
stream->stream_handle
);
} else {
ERR("Failed to write %" PRIu64
} else {
ERR("Failed to write %" PRIu64
-
" bytes from fd %i in %s(), returned %zi"
,
+
" bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
copy_size_this_pass,
copy_size_this_pass,
-
stream->stream_fd->fd
,
-
__FUNCTION__, io_ret
);
+
__FUNCTION__, io_ret
,
+
stream->stream_handle
);
}
ret = -1;
goto end;
}
ret = -1;
goto end;
@@
-321,7
+315,8
@@
static int rotate_truncate_stream(struct relay_stream *stream)
}
/* Truncate the file to get rid of the excess data. */
}
/* Truncate the file to get rid of the excess data. */
- ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+ ret = fs_handle_truncate(
+ previous_stream_file, previous_stream_copy_origin);
if (ret) {
PERROR("Failed to truncate current stream file to offset %" PRIu64,
previous_stream_copy_origin);
if (ret) {
PERROR("Failed to truncate current stream file to offset %" PRIu64,
previous_stream_copy_origin);
@@
-344,7
+339,6
@@
static int rotate_truncate_stream(struct relay_stream *stream)
ret = 0;
end:
lttng_trace_chunk_put(previous_chunk);
ret = 0;
end:
lttng_trace_chunk_put(previous_chunk);
- stream_fd_put(previous_stream_fd);
return ret;
}
return ret;
}
@@
-525,8
+519,10
@@
static int try_rotate_stream_index(struct relay_stream *stream)
stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
- ret = create_index_file(stream,
- stream->ongoing_rotation.value.next_trace_chunk);
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
stream->ongoing_rotation.value.index_rotated = true;
/*
stream->ongoing_rotation.value.index_rotated = true;
/*
@@
-554,7
+550,6
@@
static int stream_set_trace_chunk(struct relay_stream *stream,
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
- struct stream_fd *new_stream_fd = NULL;
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
@@
-568,13
+563,12
@@
static int stream_set_trace_chunk(struct relay_stream *stream,
assert(acquired_reference);
stream->trace_chunk = chunk;
assert(acquired_reference);
stream->trace_chunk = chunk;
- if (stream->
stream_fd
) {
-
stream_fd_put(stream->stream_fd
);
- stream->
stream_fd
= NULL;
+ if (stream->
file
) {
+
fs_handle_close(stream->file
);
+ stream->
file
= NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
}
ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
end:
return ret;
}
end:
return ret;
}
@@
-677,9
+671,9
@@
struct relay_stream *stream_create(struct ctf_trace *trace,
end:
if (ret) {
end:
if (ret) {
- if (stream->
stream_fd
) {
-
stream_fd_put(stream->stream_fd
);
- stream->
stream_fd
= NULL;
+ if (stream->
file
) {
+
fs_handle_close(stream->file
);
+ stream->
file
= NULL;
}
stream_put(stream);
stream = NULL;
}
stream_put(stream);
stream = NULL;
@@
-802,9
+796,9
@@
static void stream_release(struct urcu_ref *ref)
stream_unpublish(stream);
stream_unpublish(stream);
- if (stream->
stream_fd
) {
-
stream_fd_put(stream->stream_fd
);
- stream->
stream_fd
= NULL;
+ if (stream->
file
) {
+
fs_handle_close(stream->file
);
+ stream->
file
= NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
@@
-868,6
+862,12
@@
int stream_set_pending_rotation(struct relay_stream *stream,
* A metadata stream has no index; consider it already rotated.
*/
stream->ongoing_rotation.value.index_rotated = true;
* A metadata stream has no index; consider it already rotated.
*/
stream->ongoing_rotation.value.index_rotated = true;
+ if (next_trace_chunk) {
+ /*
+ * The metadata will be received again in the new chunk.
+ */
+ stream->metadata_received = 0;
+ }
ret = stream_rotate_data_file(stream);
} else {
ret = try_rotate_stream_index(stream);
ret = stream_rotate_data_file(stream);
} else {
ret = try_rotate_stream_index(stream);
@@
-978,9
+978,9
@@
void try_stream_close(struct relay_stream *stream)
*/
/* Put stream fd before put chunk. */
*/
/* Put stream fd before put chunk. */
- if (stream->
stream_fd
) {
-
stream_fd_put(stream->stream_fd
);
- stream->
stream_fd
= NULL;
+ if (stream->
file
) {
+
fs_handle_close(stream->file
);
+ stream->
file
= NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
@@
-1000,7
+1000,7
@@
int stream_init_packet(struct relay_stream *stream, size_t packet_size,
ASSERT_LOCKED(stream->lock);
ASSERT_LOCKED(stream->lock);
- if (!stream->
stream_fd
|| !stream->trace_chunk) {
+ if (!stream->
file
|| !stream->trace_chunk) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
@@
-1034,12
+1034,12
@@
int stream_init_packet(struct relay_stream *stream, size_t packet_size,
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
- if (stream->
stream_fd
) {
-
stream_fd_put(stream->stream_fd
);
- stream->
stream_fd
= NULL;
+ if (stream->
file
) {
+
fs_handle_close(stream->file
);
+ stream->
file
= NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream,
}
ret = stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, false, &stream->
stream_fd
);
+ stream->trace_chunk, false, &stream->
file
);
if (ret) {
ERR("Failed to perform trace file rotation of stream %" PRIu64,
stream->stream_handle);
if (ret) {
ERR("Failed to perform trace file rotation of stream %" PRIu64,
stream->stream_handle);
@@
-1074,15
+1074,15
@@
int stream_write(struct relay_stream *stream,
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
- if (!stream->
stream_fd
|| !stream->trace_chunk) {
+ if (!stream->
file
|| !stream->trace_chunk) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
goto end;
}
if (packet) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
goto end;
}
if (packet) {
- write_ret =
lttng_write(stream->stream_fd->fd,
- packet->data, packet->size);
+ write_ret =
fs_handle_write(
+
stream->file,
packet->data, packet->size);
if (write_ret != packet->size) {
PERROR("Failed to write to stream file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
if (write_ret != packet->size) {
PERROR("Failed to write to stream file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
@@
-1096,8
+1096,8
@@
int stream_write(struct relay_stream *stream,
const size_t padding_to_write_this_pass =
min(padding_to_write, sizeof(padding_buffer));
const size_t padding_to_write_this_pass =
min(padding_to_write, sizeof(padding_buffer));
- write_ret =
lttng_write(stream->stream_fd->fd
,
- padding_
buffer, padding_
to_write_this_pass);
+ write_ret =
fs_handle_write(stream->file, padding_buffer
,
+ padding_to_write_this_pass);
if (write_ret != padding_to_write_this_pass) {
PERROR("Failed to write padding to file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
if (write_ret != padding_to_write_this_pass) {
PERROR("Failed to write padding to file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
@@
-1338,9
+1338,16
@@
int stream_reset_file(struct relay_stream *stream)
{
ASSERT_LOCKED(stream->lock);
{
ASSERT_LOCKED(stream->lock);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ int ret;
+
+ ret = fs_handle_close(stream->file);
+ if (ret) {
+ ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
+ stream->channel_name,
+ stream->stream_handle);
+ }
+ stream->file = NULL;
}
DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
}
DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
@@
-1353,7
+1360,7
@@
int stream_reset_file(struct relay_stream *stream)
stream->pos_after_last_complete_data_index = 0;
return stream_create_data_output_file_from_trace_chunk(stream,
stream->pos_after_last_complete_data_index = 0;
return stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, true, &stream->
stream_fd
);
+ stream->trace_chunk, true, &stream->
file
);
}
void print_relay_streams(void)
}
void print_relay_streams(void)
This page took
0.033048 seconds
and
4
git commands to generate.