+/*
+ * relay_rotate_session_stream: rotate a stream to a new tracefile for the session
+ * rotation feature (not the tracefile rotation feature).
+ */
+static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_rotate_stream stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ size_t header_len;
+ size_t path_len;
+ struct lttng_buffer_view new_path_view;
+
+ DBG("Rotate stream received");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to rotate a stream before version check");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ if (session->major == 2 && session->minor < 11) {
+ ERR("Unsupported feature before 2.11");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ header_len = sizeof(struct lttcomm_relayd_rotate_stream);
+
+ if (payload->size < header_len) {
+ ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
+ header_len, payload->size);
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ memcpy(&stream_info, payload->data, header_len);
+
+ /* Convert to host */
+ stream_info.pathname_length = be32toh(stream_info.pathname_length);
+ stream_info.stream_id = be64toh(stream_info.stream_id);
+ stream_info.new_chunk_id = be64toh(stream_info.new_chunk_id);
+ stream_info.rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+
+ path_len = stream_info.pathname_length;
+ if (payload->size < header_len + path_len) {
+ ERR("Unexpected payload size in \"relay_rotate_session_stream\" including path: expected >= %zu bytes, got %zu bytes",
+ header_len + path_len, payload->size);
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ /* Ensure it fits in local filename length. */
+ if (path_len >= LTTNG_PATH_MAX) {
+ ret = -ENAMETOOLONG;
+ ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes",
+ path_len, LTTNG_PATH_MAX);
+ goto end;
+ }
+
+ new_path_view = lttng_buffer_view_from_view(payload, header_len,
+ stream_info.pathname_length);
+
+ stream = stream_get_by_id(stream_info.stream_id);
+ if (!stream) {
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Update the trace path (just the folder, the stream name does not
+ * change).
+ */
+ free(stream->prev_path_name);
+ stream->prev_path_name = stream->path_name;
+ stream->path_name = create_output_path(new_path_view.data);
+ if (!stream->path_name) {
+ ERR("Failed to create a new output path");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
+ -1, -1);
+ if (ret < 0) {
+ ERR("relay creating output directory");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ assert(stream->current_chunk_id.is_set);
+ stream->current_chunk_id.value = stream_info.new_chunk_id;
+
+ if (stream->is_metadata) {
+ /*
+ * Metadata streams have no index; consider its rotation
+ * complete.
+ */
+ stream->index_rotated = true;
+ /*
+ * The metadata stream is sent only over the control connection
+ * so we know we have all the data to perform the stream
+ * rotation.
+ */
+ ret = do_rotate_stream_data(stream);
+ } else {
+ stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end_stream_unlock;
+ }
+
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end_stream_unlock;
+ }
+ }
+
+end_stream_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+end:
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"rotate session stream\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+
+end_no_reply:
+ return ret;
+}
+
+#define DBG_CMD(cmd_name, conn) \
+ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
+
+static int relay_process_control_command(struct relay_connection *conn,
+ const struct lttcomm_relayd_hdr *header,
+ const struct lttng_buffer_view *payload)
+{
+ int ret = 0;
+
+ switch (header->cmd) {
+ case RELAYD_CREATE_SESSION:
+ DBG_CMD("RELAYD_CREATE_SESSION", conn);
+ ret = relay_create_session(header, conn, payload);
+ break;
+ case RELAYD_ADD_STREAM:
+ DBG_CMD("RELAYD_ADD_STREAM", conn);
+ ret = relay_add_stream(header, conn, payload);
+ break;
+ case RELAYD_START_DATA:
+ DBG_CMD("RELAYD_START_DATA", conn);
+ ret = relay_start(header, conn, payload);
+ break;
+ case RELAYD_SEND_METADATA:
+ DBG_CMD("RELAYD_SEND_METADATA", conn);
+ ret = relay_recv_metadata(header, conn, payload);
+ break;
+ case RELAYD_VERSION:
+ DBG_CMD("RELAYD_VERSION", conn);
+ ret = relay_send_version(header, conn, payload);
+ break;
+ case RELAYD_CLOSE_STREAM:
+ DBG_CMD("RELAYD_CLOSE_STREAM", conn);
+ ret = relay_close_stream(header, conn, payload);
+ break;
+ case RELAYD_DATA_PENDING:
+ DBG_CMD("RELAYD_DATA_PENDING", conn);
+ ret = relay_data_pending(header, conn, payload);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn);
+ ret = relay_quiescent_control(header, conn, payload);
+ break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn);
+ ret = relay_begin_data_pending(header, conn, payload);
+ break;
+ case RELAYD_END_DATA_PENDING:
+ DBG_CMD("RELAYD_END_DATA_PENDING", conn);
+ ret = relay_end_data_pending(header, conn, payload);
+ break;
+ case RELAYD_SEND_INDEX:
+ DBG_CMD("RELAYD_SEND_INDEX", conn);
+ ret = relay_recv_index(header, conn, payload);
+ break;
+ case RELAYD_STREAMS_SENT:
+ DBG_CMD("RELAYD_STREAMS_SENT", conn);
+ ret = relay_streams_sent(header, conn, payload);
+ break;
+ case RELAYD_RESET_METADATA:
+ DBG_CMD("RELAYD_RESET_METADATA", conn);
+ ret = relay_reset_metadata(header, conn, payload);
+ break;
+ case RELAYD_ROTATE_STREAM:
+ DBG_CMD("RELAYD_ROTATE_STREAM", conn);
+ ret = relay_rotate_session_stream(header, conn, payload);
+ break;
+ case RELAYD_UPDATE_SYNC_INFO:
+ default:
+ ERR("Received unknown command (%u)", header->cmd);
+ relay_unknown_command(conn);
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+static enum relay_connection_status relay_process_control_receive_payload(
+ struct relay_connection *conn)
+{
+ int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
+ struct lttng_dynamic_buffer *reception_buffer =
+ &conn->protocol.ctrl.reception_buffer;
+ struct ctrl_connection_state_receive_payload *state =
+ &conn->protocol.ctrl.state.receive_payload;
+ struct lttng_buffer_view payload_view;
+
+ if (state->left_to_receive == 0) {
+ /* Short-circuit for payload-less commands. */
+ goto reception_complete;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock,
+ reception_buffer->data + state->received,
+ state->left_to_receive, MSG_DONTWAIT);
+ if (ret < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive command payload on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
+ goto end;
+ } else if (ret == 0) {
+ DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_CLOSED;
+ goto end;
+ }
+
+ assert(ret > 0);
+ assert(ret <= state->left_to_receive);
+
+ state->left_to_receive -= ret;
+ state->received += ret;
+
+ if (state->left_to_receive > 0) {
+ /*
+ * Can't transition to the protocol's next state, wait to
+ * receive the rest of the header.
+ */
+ DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
+ state->received, state->left_to_receive,
+ conn->sock->fd);
+ goto end;
+ }
+
+reception_complete:
+ DBG("Done receiving control command payload: fd = %i, payload size = %" PRIu64 " bytes",
+ conn->sock->fd, state->received);
+ /*
+ * The payload required to process the command has been received.
+ * A view to the reception buffer is forwarded to the various
+ * commands and the state of the control is reset on success.
+ *
+ * Commands are responsible for sending their reply to the peer.
+ */
+ payload_view = lttng_buffer_view_from_dynamic_buffer(reception_buffer,
+ 0, -1);
+ ret = relay_process_control_command(conn,
+ &state->header, &payload_view);
+ if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = connection_reset_protocol_state(conn);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
+end:
+ return status;
+}
+
+static enum relay_connection_status relay_process_control_receive_header(