X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=043d19612ce0489efffad2b1f99fc1c8be52e98d;hp=2bc815813613054f58f39810b9c8df0dcade0117;hb=3a735fa767e8cd922c638a25e93876e37f7f66b1;hpb=cab6931e14405d5d966ced712028ede43b2d4cff diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 2bc815813..043d19612 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -2,6 +2,7 @@ * Copyright (C) 2013 - Julien Desfossez * David Goulet * 2015 - Mathieu Desnoyers + * 2019 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -21,6 +22,7 @@ #include #include #include +#include #include #include @@ -29,6 +31,11 @@ #include "stream.h" #include "viewer-stream.h" +#include +#include + +#define FILE_IO_STACK_BUFFER_SIZE 65536 + /* Should be called with RCU read-side lock held. */ bool stream_get(struct relay_stream *stream) { @@ -62,6 +69,447 @@ end: return stream; } +static void stream_complete_rotation(struct relay_stream *stream) +{ + DBG("Rotation completed for stream %" PRIu64, stream->stream_handle); + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk; + stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {}; +} + +/* + * If too much data has been written in a tracefile before we received the + * rotation command, we have to move the excess data to the new tracefile and + * perform the rotation. This can happen because the control and data + * connections are separate, the indexes as well as the commands arrive from + * the control connection and we have no control over the order so we could be + * in a situation where too much data has been received on the data connection + * before the rotation command on the control connection arrives. + */ +static int rotate_truncate_stream(struct relay_stream *stream) +{ + int ret, new_fd; + off_t lseek_ret; + uint64_t diff, pos = 0; + char buf[FILE_IO_STACK_BUFFER_SIZE]; + + assert(!stream->is_metadata); + + assert(stream->tracefile_size_current > + stream->pos_after_last_complete_data_index); + diff = stream->tracefile_size_current - + stream->pos_after_last_complete_data_index; + + /* Create the new tracefile. */ + new_fd = utils_create_stream_file(stream->path_name, + stream->channel_name, + stream->tracefile_size, stream->tracefile_count, + /* uid */ -1, /* gid */ -1, /* suffix */ NULL); + if (new_fd < 0) { + ERR("Failed to create new stream file at path %s for channel %s", + stream->path_name, stream->channel_name); + ret = -1; + goto end; + } + + /* + * Rewind the current tracefile to the position at which the rotation + * should have occurred. + */ + lseek_ret = lseek(stream->stream_fd->fd, + stream->pos_after_last_complete_data_index, SEEK_SET); + if (lseek_ret < 0) { + PERROR("seek truncate stream"); + ret = -1; + goto end; + } + + /* Move data from the old file to the new file. */ + while (pos < diff) { + uint64_t count, bytes_left; + ssize_t io_ret; + + bytes_left = diff - pos; + count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left; + assert(count <= SIZE_MAX); + + io_ret = lttng_read(stream->stream_fd->fd, buf, count); + if (io_ret < (ssize_t) count) { + char error_string[256]; + + snprintf(error_string, sizeof(error_string), + "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi", + count, stream->stream_fd->fd, io_ret); + if (io_ret == -1) { + PERROR("%s", error_string); + } else { + ERR("%s", error_string); + } + ret = -1; + goto end; + } + + io_ret = lttng_write(new_fd, buf, count); + if (io_ret < (ssize_t) count) { + char error_string[256]; + + snprintf(error_string, sizeof(error_string), + "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi", + count, new_fd, io_ret); + if (io_ret == -1) { + PERROR("%s", error_string); + } else { + ERR("%s", error_string); + } + ret = -1; + goto end; + } + + pos += count; + } + + /* Truncate the file to get rid of the excess data. */ + ret = ftruncate(stream->stream_fd->fd, + stream->pos_after_last_complete_data_index); + if (ret) { + PERROR("ftruncate"); + goto end; + } + + ret = close(stream->stream_fd->fd); + if (ret < 0) { + PERROR("Closing tracefile"); + goto end; + } + + /* + * Update the offset and FD of all the eventual indexes created by the + * data connection before the rotation command arrived. + */ + ret = relay_index_switch_all_files(stream); + if (ret < 0) { + ERR("Failed to rotate index file"); + goto end; + } + + stream->stream_fd->fd = new_fd; + stream->tracefile_size_current = diff; + stream->pos_after_last_complete_data_index = 0; + stream_complete_rotation(stream); + + ret = 0; + +end: + return ret; +} + +static int stream_create_data_output_file_from_trace_chunk( + struct relay_stream *stream, + struct lttng_trace_chunk *trace_chunk, + bool force_unlink, + struct stream_fd **out_stream_fd) +{ + int ret, fd; + char stream_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + const int flags = O_RDWR | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + + ASSERT_LOCKED(stream->lock); + assert(stream->trace_chunk); + + ret = utils_stream_file_path(stream->path_name, stream->channel_name, + stream->tracefile_size, stream->tracefile_current_index, + NULL, stream_path, sizeof(stream_path)); + if (ret < 0) { + goto end; + } + + if (stream->tracefile_wrapped_around || force_unlink) { + /* + * The on-disk ring-buffer has wrapped around. + * Newly created stream files will replace existing files. Since + * live clients may be consuming existing files, the file about + * to be replaced is unlinked in order to not overwrite its + * content. + */ + status = lttng_trace_chunk_unlink_file(trace_chunk, + stream_path); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + PERROR("Failed to unlink stream file \"%s\" during trace file rotation", + stream_path); + /* + * Don't abort if the file doesn't exist, it is + * unexpected, but should not be a fatal error. + */ + if (errno != ENOENT) { + ret = -1; + goto end; + } + } + } + + status = lttng_trace_chunk_open_file( + trace_chunk, stream_path, flags, mode, &fd); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to open stream file \"%s\"", stream->channel_name); + ret = -1; + goto end; + } + + *out_stream_fd = stream_fd_create(fd); + if (!*out_stream_fd) { + if (close(ret)) { + PERROR("Error closing stream file descriptor %d", ret); + } + ret = -1; + goto end; + } +end: + return ret; +} + +static int stream_rotate_data_file(struct relay_stream *stream) +{ + int ret = 0; + + DBG("Rotating stream %" PRIu64 " data file", + stream->stream_handle); + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + + stream->tracefile_wrapped_around = false; + stream->tracefile_current_index = 0; + + if (stream->ongoing_rotation.value.next_trace_chunk) { + struct stream_fd *new_stream_fd = NULL; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_trace_chunk_create_subdirectory( + stream->ongoing_rotation.value.next_trace_chunk, + stream->path_name); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + + /* Rotate the data file. */ + ret = stream_create_data_output_file_from_trace_chunk(stream, + stream->ongoing_rotation.value.next_trace_chunk, + false, &new_stream_fd); + stream->stream_fd = new_stream_fd; + if (ret < 0) { + ERR("Failed to rotate stream data file"); + goto end; + } + } + stream->tracefile_size_current = 0; + stream->pos_after_last_complete_data_index = 0; + stream->ongoing_rotation.value.data_rotated = true; + + if (stream->ongoing_rotation.value.index_rotated) { + /* Rotation completed; reset its state. */ + stream_complete_rotation(stream); + } +end: + return ret; +} + +/* + * Check if a stream's data file (as opposed to index) should be rotated + * (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static int try_rotate_stream_data(struct relay_stream *stream) +{ + int ret = 0; + + if (caa_likely(!stream->ongoing_rotation.is_set)) { + /* No rotation expected. */ + goto end; + } + + if (stream->ongoing_rotation.value.data_rotated) { + /* Rotation of the data file has already occurred. */ + goto end; + } + + if (stream->prev_data_seq == -1ULL || + stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) { + /* + * The next packet that will be written is not part of the next + * chunk yet. + */ + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 + ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->ongoing_rotation.value.seq_num, + stream->prev_data_seq); + goto end; + } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) { + /* + * prev_data_seq is checked here since indexes and rotation + * commands are serialized with respect to each other. + */ + DBG("Rotation after too much data has been written in tracefile " + "for stream %" PRIu64 ", need to truncate before " + "rotating", stream->stream_handle); + ret = rotate_truncate_stream(stream); + if (ret) { + ERR("Failed to truncate stream"); + goto end; + } + } else { + ret = stream_rotate_data_file(stream); + } + +end: + return ret; +} + +/* + * Close the current index file if it is open, and create a new one. + * + * Return 0 on success, -1 on error. + */ +static int create_index_file(struct relay_stream *stream, + struct lttng_trace_chunk *chunk) +{ + int ret; + uint32_t major, minor; + char *index_subpath = NULL; + enum lttng_trace_chunk_status status; + + ASSERT_LOCKED(stream->lock); + + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; + } + major = stream->trace->session->major; + minor = stream->trace->session->minor; + + if (!chunk) { + ret = 0; + goto end; + } + ret = asprintf(&index_subpath, "%s/%s", stream->path_name, + DEFAULT_INDEX_DIR); + if (ret < 0) { + goto end; + } + + status = lttng_trace_chunk_create_subdirectory(chunk, + index_subpath); + free(index_subpath); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + stream->index_file = lttng_index_file_create_from_trace_chunk( + chunk, stream->path_name, + stream->channel_name, stream->tracefile_size, + stream->tracefile_current_index, + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor), true); + if (!stream->index_file) { + ret = -1; + goto end; + } + + ret = 0; + +end: + return ret; +} + +/* + * Check if a stream's index file should be rotated (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static int try_rotate_stream_index(struct relay_stream *stream) +{ + int ret = 0; + + if (!stream->ongoing_rotation.is_set) { + /* No rotation expected. */ + goto end; + } + + if (stream->ongoing_rotation.value.index_rotated) { + /* Rotation of the index has already occurred. */ + goto end; + } + + if (stream->prev_index_seq == -1ULL || + stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->ongoing_rotation.value.seq_num, + stream->prev_index_seq); + goto end; + } else { + /* The next index belongs to the new trace chunk; rotate. */ + assert(stream->prev_index_seq + 1 == + stream->ongoing_rotation.value.seq_num); + DBG("Rotating stream %" PRIu64 " index file", + stream->stream_handle); + ret = create_index_file(stream, + stream->ongoing_rotation.value.next_trace_chunk); + stream->ongoing_rotation.value.index_rotated = true; + + if (stream->ongoing_rotation.value.data_rotated && + stream->ongoing_rotation.value.index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream_complete_rotation(stream); + } + } + +end: + return ret; +} + +static int stream_set_trace_chunk(struct relay_stream *stream, + struct lttng_trace_chunk *chunk) +{ + int ret = 0; + enum lttng_trace_chunk_status status; + bool acquired_reference; + struct stream_fd *new_stream_fd = NULL; + + status = lttng_trace_chunk_create_subdirectory(chunk, + stream->path_name); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + + lttng_trace_chunk_put(stream->trace_chunk); + acquired_reference = lttng_trace_chunk_get(chunk); + assert(acquired_reference); + stream->trace_chunk = chunk; + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + ret = stream_create_data_output_file_from_trace_chunk(stream, chunk, + false, &new_stream_fd); + stream->stream_fd = new_stream_fd; +end: + return ret; +} + /* * We keep ownership of path_name and channel_name. */ @@ -73,6 +521,8 @@ struct relay_stream *stream_create(struct ctf_trace *trace, int ret; struct relay_stream *stream = NULL; struct relay_session *session = trace->session; + bool acquired_reference = false; + struct lttng_trace_chunk *current_trace_chunk; stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { @@ -81,48 +531,48 @@ struct relay_stream *stream_create(struct ctf_trace *trace, } stream->stream_handle = stream_handle; - stream->prev_seq = -1ULL; + stream->prev_data_seq = -1ULL; + stream->prev_index_seq = -1ULL; stream->last_net_seq_num = -1ULL; stream->ctf_stream_id = -1ULL; stream->tracefile_size = tracefile_size; stream->tracefile_count = tracefile_count; stream->path_name = path_name; stream->channel_name = channel_name; + stream->beacon_ts_end = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; - stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!stream->indexes_ht) { - ERR("Cannot created indexes_ht"); + pthread_mutex_lock(&trace->session->lock); + current_trace_chunk = trace->session->current_trace_chunk; + if (current_trace_chunk) { + acquired_reference = lttng_trace_chunk_get(current_trace_chunk); + } + pthread_mutex_unlock(&trace->session->lock); + if (!acquired_reference) { + ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired", + channel_name); ret = -1; goto end; } - ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, - -1, -1); - if (ret < 0) { - ERR("relay creating output directory"); + stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!stream->indexes_ht) { + ERR("Cannot created indexes_ht"); + ret = -1; goto end; } - /* - * No need to use run_as API here because whatever we receive, - * the relayd uses its own credentials for the stream files. - */ - ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, -1, -1, NULL); - if (ret < 0) { - ERR("Create output file"); - goto end; - } - stream->stream_fd = stream_fd_create(ret); - if (!stream->stream_fd) { - if (close(ret)) { - PERROR("Error closing file %d", ret); - } + pthread_mutex_lock(&stream->lock); + ret = stream_set_trace_chunk(stream, current_trace_chunk); + pthread_mutex_unlock(&stream->lock); + if (ret) { + ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"", + trace->session->session_name, + stream->channel_name); ret = -1; goto end; } @@ -131,16 +581,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace, ret = -1; goto end; } - if (stream->tracefile_size) { - DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); - } else { - DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); - } - - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) { - stream->is_metadata = 1; - } + stream->is_metadata = !strcmp(stream->channel_name, + DEFAULT_METADATA_NAME); stream->in_recv_list = true; /* @@ -172,6 +615,7 @@ end: stream_put(stream); stream = NULL; } + lttng_trace_chunk_put(current_trace_chunk); return stream; error_no_alloc: @@ -299,26 +743,72 @@ static void stream_release(struct urcu_ref *ref) ctf_trace_put(stream->trace); stream->trace = NULL; } + stream_complete_rotation(stream); + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; call_rcu(&stream->rcu_node, stream_destroy_rcu); } void stream_put(struct relay_stream *stream) { - DBG("stream put for stream id %" PRIu64, stream->stream_handle); rcu_read_lock(); assert(stream->ref.refcount != 0); /* * Wait until we have processed all the stream packets before * actually putting our last stream reference. */ - DBG("stream put stream id %" PRIu64 " refcount %d", - stream->stream_handle, - (int) stream->ref.refcount); urcu_ref_put(&stream->ref, stream_release); rcu_read_unlock(); } +int stream_set_pending_rotation(struct relay_stream *stream, + struct lttng_trace_chunk *next_trace_chunk, + uint64_t rotation_sequence_number) +{ + int ret = 0; + const struct relay_stream_rotation rotation = { + .seq_num = rotation_sequence_number, + .next_trace_chunk = next_trace_chunk, + }; + + if (stream->ongoing_rotation.is_set) { + ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)"); + ret = -1; + goto end; + } + + if (next_trace_chunk) { + const bool reference_acquired = + lttng_trace_chunk_get(next_trace_chunk); + + assert(reference_acquired); + } + LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation); + + DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64, + stream->stream_handle, rotation_sequence_number); + if (stream->is_metadata) { + /* + * A metadata stream has no index; consider it already rotated. + */ + stream->ongoing_rotation.value.index_rotated = true; + ret = stream_rotate_data_file(stream); + } else { + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end; + } + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end; + } + } +end: + return ret; +} + void try_stream_close(struct relay_stream *stream) { bool session_aborted; @@ -360,6 +850,7 @@ void try_stream_close(struct relay_stream *stream) * a packet. Since those are sent in that order, we take * care of consumerd crashes. */ + DBG("relay_index_close_partial_fd"); relay_index_close_partial_fd(stream); /* * Use the highest net_seq_num we currently have pending @@ -367,11 +858,12 @@ void try_stream_close(struct relay_stream *stream) * at -1ULL if we cannot find any index. */ stream->last_net_seq_num = relay_index_find_last(stream); + DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num); /* Fall-through into the next check. */ } if (stream->last_net_seq_num != -1ULL && - ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0 + ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0 && !session_aborted) { /* * Don't close since we still have data pending. This @@ -405,6 +897,292 @@ void try_stream_close(struct relay_stream *stream) stream_put(stream); } +int stream_init_packet(struct relay_stream *stream, size_t packet_size, + bool *file_rotated) +{ + int ret = 0; + + ASSERT_LOCKED(stream->lock); + if (caa_likely(stream->tracefile_size == 0)) { + /* No size limit set; nothing to check. */ + goto end; + } + + /* + * Check if writing the new packet would exceed the maximal file size. + */ + if (caa_unlikely((stream->tracefile_size_current + packet_size) > + stream->tracefile_size)) { + const uint64_t new_file_index = + (stream->tracefile_current_index + 1) % + stream->tracefile_count; + + if (new_file_index < stream->tracefile_current_index) { + stream->tracefile_wrapped_around = true; + } + DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64 + ", current_file_size = %" PRIu64 + ", packet_size = %" PRIu64 ", current_file_index = %" PRIu64 + " new_file_index = %" PRIu64, + stream->stream_handle, + stream->tracefile_size_current, packet_size, + stream->tracefile_current_index, new_file_index); + tracefile_array_file_rotate(stream->tfa); + stream->tracefile_current_index = new_file_index; + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + ret = stream_create_data_output_file_from_trace_chunk(stream, + stream->trace_chunk, false, &stream->stream_fd); + if (ret) { + ERR("Failed to perform trace file rotation of stream %" PRIu64, + stream->stream_handle); + goto end; + } + + /* + * Reset current size because we just performed a stream + * rotation. + */ + stream->tracefile_size_current = 0; + *file_rotated = true; + } else { + *file_rotated = false; + } +end: + return ret; +} + +/* Note that the packet is not necessarily complete. */ +int stream_write(struct relay_stream *stream, + const struct lttng_buffer_view *packet, size_t padding_len) +{ + int ret = 0; + ssize_t write_ret; + size_t padding_to_write = padding_len; + char padding_buffer[FILE_IO_STACK_BUFFER_SIZE]; + + ASSERT_LOCKED(stream->lock); + memset(padding_buffer, 0, + min(sizeof(padding_buffer), padding_to_write)); + + if (packet) { + write_ret = lttng_write(stream->stream_fd->fd, + packet->data, packet->size); + if (write_ret != packet->size) { + PERROR("Failed to write to stream file of %sstream %" PRIu64, + stream->is_metadata ? "metadata " : "", + stream->stream_handle); + ret = -1; + goto end; + } + } + + while (padding_to_write > 0) { + const size_t padding_to_write_this_pass = + min(padding_to_write, sizeof(padding_buffer)); + + write_ret = lttng_write(stream->stream_fd->fd, + padding_buffer, padding_to_write_this_pass); + if (write_ret != padding_to_write_this_pass) { + PERROR("Failed to write padding to file of %sstream %" PRIu64, + stream->is_metadata ? "metadata " : "", + stream->stream_handle); + ret = -1; + goto end; + } + padding_to_write -= padding_to_write_this_pass; + } + + if (stream->is_metadata) { + stream->metadata_received += packet->size + padding_len; + } + + DBG("Wrote to %sstream %" PRIu64 ": data_length = %" PRIu64 ", padding_length = %" PRIu64, + stream->is_metadata ? "metadata " : "", + stream->stream_handle, + packet ? packet->size : 0, padding_len); +end: + return ret; +} + +/* + * Update index after receiving a packet for a data stream. + * + * Called with the stream lock held. + * + * Return 0 on success else a negative value. + */ +int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num, + bool rotate_index, bool *flushed, uint64_t total_size) +{ + int ret = 0; + uint64_t data_offset; + struct relay_index *index; + + ASSERT_LOCKED(stream->lock); + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, net_seq_num, stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence + * number. If it exists, the control thread has already received the + * data for it, thus we need to write it to disk. + */ + index = relay_index_get_by_id_or_create(stream, net_seq_num); + if (!index) { + ret = -1; + goto end; + } + + if (rotate_index || !stream->index_file) { + ret = create_index_file(stream, stream->trace_chunk); + if (ret) { + ERR("Failed to create index file for stream %" PRIu64, + stream->stream_handle); + /* Put self-ref for this index due to error. */ + relay_index_put(index); + index = NULL; + goto end; + } + } + + if (relay_index_set_file(index, stream->index_file, data_offset)) { + ret = -1; + /* Put self-ref for this index due to error. */ + relay_index_put(index); + index = NULL; + goto end; + } + + ret = relay_index_try_flush(index); + if (ret == 0) { + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; + *flushed = true; + } else if (ret > 0) { + index->total_size = total_size; + /* No flush. */ + ret = 0; + } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); + ret = -1; + } +end: + return ret; +} + +int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size, + uint64_t sequence_number, bool index_flushed) +{ + int ret = 0; + + ASSERT_LOCKED(stream->lock); + + stream->tracefile_size_current += packet_total_size; + if (index_flushed) { + stream->pos_after_last_complete_data_index = + stream->tracefile_size_current; + stream->prev_index_seq = sequence_number; + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end; + } + } + + stream->prev_data_seq = sequence_number; + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end; + } +end: + return ret; +} + +int stream_add_index(struct relay_stream *stream, + const struct lttcomm_relayd_index *index_info) +{ + int ret = 0; + struct relay_index *index; + + ASSERT_LOCKED(stream->lock); + + /* Live beacon handling */ + if (index_info->packet_size == 0) { + DBG("Received live beacon for stream %" PRIu64, + stream->stream_handle); + + /* + * Only flag a stream inactive when it has already + * received data and no indexes are in flight. + */ + if (stream->index_received_seqcount > 0 + && stream->indexes_in_flight == 0) { + stream->beacon_ts_end = index_info->timestamp_end; + } + ret = 0; + goto end; + } else { + stream->beacon_ts_end = -1ULL; + } + + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = index_info->stream_id; + } + + index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num); + if (!index) { + ret = -1; + ERR("Failed to get or create index %" PRIu64, + index_info->net_seq_num); + goto end; + } + if (relay_index_set_control_data(index, index_info, + stream->trace->session->minor)) { + ERR("set_index_control_data error"); + relay_index_put(index); + ret = -1; + goto end; + } + ret = relay_index_try_flush(index); + if (ret == 0) { + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; + stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info->net_seq_num; + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end; + } + } else if (ret > 0) { + /* no flush. */ + ret = 0; + } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); + ret = -1; + } +end: + return ret; +} + static void print_stream_indexes(struct relay_stream *stream) { struct lttng_ht_iter iter; @@ -426,6 +1204,26 @@ static void print_stream_indexes(struct relay_stream *stream) rcu_read_unlock(); } +int stream_reset_file(struct relay_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + + stream->tracefile_size_current = 0; + stream->prev_data_seq = 0; + stream->prev_index_seq = 0; + /* Note that this does not reset the tracefile array. */ + stream->tracefile_current_index = 0; + stream->pos_after_last_complete_data_index = 0; + + return stream_create_data_output_file_from_trace_chunk(stream, + stream->trace_chunk, true, &stream->stream_fd); +} + void print_relay_streams(void) { struct lttng_ht_iter iter;