+ /*
+ * Rewind the current tracefile to the position at which the rotation
+ * should have occured.
+ */
+ lseek_ret = lseek(stream->stream_fd->fd,
+ stream->pos_after_last_complete_data_index, SEEK_SET);
+ if (lseek_ret < 0) {
+ PERROR("seek truncate stream");
+ ret = -1;
+ goto end;
+ }
+
+ /* Move data from the old file to the new file. */
+ while (pos < diff) {
+ uint64_t count, bytes_left;
+ ssize_t io_ret;
+
+ bytes_left = diff - pos;
+ count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
+ assert(count <= SIZE_MAX);
+
+ io_ret = lttng_read(stream->stream_fd->fd, buf, count);
+ if (io_ret < (ssize_t) count) {
+ char error_string[256];
+
+ snprintf(error_string, sizeof(error_string),
+ "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+ count, stream->stream_fd->fd, io_ret);
+ if (io_ret == -1) {
+ PERROR("%s", error_string);
+ } else {
+ ERR("%s", error_string);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ io_ret = lttng_write(new_fd, buf, count);
+ if (io_ret < (ssize_t) count) {
+ char error_string[256];
+
+ snprintf(error_string, sizeof(error_string),
+ "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+ count, new_fd, io_ret);
+ if (io_ret == -1) {
+ PERROR("%s", error_string);
+ } else {
+ ERR("%s", error_string);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ pos += count;
+ }
+
+ /* Truncate the file to get rid of the excess data. */
+ ret = ftruncate(stream->stream_fd->fd,
+ stream->pos_after_last_complete_data_index);
+ if (ret) {
+ PERROR("ftruncate");
+ goto end;
+ }
+
+ ret = close(stream->stream_fd->fd);
+ if (ret < 0) {
+ PERROR("Closing tracefile");
+ goto end;
+ }
+
+ ret = create_rotate_index_file(stream);
+ if (ret < 0) {
+ ERR("Rotate stream index file");
+ goto end;
+ }
+
+ /*
+ * Update the offset and FD of all the eventual indexes created by the
+ * data connection before the rotation command arrived.
+ */
+ ret = relay_index_switch_all_files(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end;
+ }
+
+ stream->stream_fd->fd = new_fd;
+ stream->tracefile_size_current = diff;
+ stream->pos_after_last_complete_data_index = 0;
+ stream->rotate_at_seq_num = -1ULL;
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if a stream should perform a rotation (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int try_rotate_stream(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ /* No rotation expected. */
+ if (stream->rotate_at_seq_num == -1ULL) {
+ goto end;
+ }
+
+ if (stream->prev_seq < stream->rotate_at_seq_num ||
+ stream->prev_seq == -1ULL) {
+ DBG("Stream %" PRIu64 " no yet ready for rotation",
+ stream->stream_handle);
+ goto end;
+ } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+ DBG("Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating", stream->stream_handle);
+ ret = rotate_truncate_stream(stream);
+ if (ret) {
+ ERR("Failed to truncate stream");
+ goto end;
+ }
+ } else {
+ /* stream->prev_seq == stream->rotate_at_seq_num */
+ DBG("Stream %" PRIu64 " ready for rotation",
+ stream->stream_handle);
+ ret = do_rotate_stream(stream);
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * relay_recv_metadata: receive the metadata for the session.
+ */
+static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret = 0;
+ ssize_t size_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_metadata_payload *metadata_struct;
+ struct relay_stream *metadata_stream;
+ uint64_t data_size, payload_size;
+
+ if (!session) {
+ ERR("Metadata sent before version check");
+ ret = -1;
+ goto end;
+ }
+
+ data_size = payload_size = be64toh(recv_hdr->data_size);
+ if (data_size < sizeof(struct lttcomm_relayd_metadata_payload)) {
+ ERR("Incorrect data size");
+ ret = -1;
+ goto end;
+ }
+ payload_size -= sizeof(struct lttcomm_relayd_metadata_payload);
+
+ if (data_buffer_size < data_size) {
+ /* In case the realloc fails, we can free the memory */
+ char *tmp_data_ptr;
+
+ tmp_data_ptr = realloc(data_buffer, data_size);
+ if (!tmp_data_ptr) {
+ ERR("Allocating data buffer");
+ free(data_buffer);
+ ret = -1;
+ goto end;
+ }
+ data_buffer = tmp_data_ptr;
+ data_buffer_size = data_size;
+ }
+ memset(data_buffer, 0, data_size);
+ DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
+ size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+ if (size_ret < 0 || size_ret != data_size) {
+ if (size_ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive the whole metadata");
+ }
+ ret = -1;
+ goto end;
+ }
+ metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
+
+ metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id));
+ if (!metadata_stream) {
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&metadata_stream->lock);
+
+ size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload,
+ payload_size);
+ if (size_ret < payload_size) {
+ ERR("Relay error writing metadata on file");
+ ret = -1;
+ goto end_put;
+ }
+
+ size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ be32toh(metadata_struct->padding_size));
+ if (size_ret < 0) {
+ goto end_put;
+ }
+
+ metadata_stream->metadata_received +=
+ payload_size + be32toh(metadata_struct->padding_size);
+ DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
+ metadata_stream->metadata_received);
+
+ ret = try_rotate_stream(metadata_stream);
+ if (ret < 0) {
+ goto end_put;
+ }
+
+end_put:
+ pthread_mutex_unlock(&metadata_stream->lock);
+ stream_put(metadata_stream);
+end:
+ return ret;
+}
+
+/*
+ * relay_send_version: send relayd version number
+ */
+static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret;
+ struct lttcomm_relayd_version reply, msg;
+ bool compatible = true;
+
+ conn->version_check_done = 1;
+
+ /* Get version from the other side. */
+ ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+ if (ret < 0 || ret != sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay failed to receive the version values.");
+ }
+ ret = -1;
+ goto end;
+ }
+
+ memset(&reply, 0, sizeof(reply));
+ reply.major = RELAYD_VERSION_COMM_MAJOR;
+ reply.minor = RELAYD_VERSION_COMM_MINOR;