-end:
- return ret;
-}
-
-/*
- * Check if a stream should perform a rotation (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream(struct relay_stream *stream)
-{
- int ret = 0;
-
- /* No rotation expected. */
- if (stream->rotate_at_seq_num == -1ULL) {
- goto end;
- }
-
- if (stream->prev_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " no yet ready for rotation",
- stream->stream_handle);
- goto end;
- } else if (stream->prev_seq > stream->rotate_at_seq_num) {
- DBG("Rotation after too much data has been written in tracefile "
- "for stream %" PRIu64 ", need to truncate before "
- "rotating", stream->stream_handle);
- ret = rotate_truncate_stream(stream);
- if (ret) {
- ERR("Failed to truncate stream");
- goto end;
- }
- } else {
- /* stream->prev_seq == stream->rotate_at_seq_num */
- DBG("Stream %" PRIu64 " ready for rotation",
- stream->stream_handle);
- ret = do_rotate_stream(stream);
- }
-
-end:
- return ret;
-}
-
-/*
- * relay_recv_metadata: receive the metadata for the session.
- */
-static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_connection *conn)
-{
- int ret = 0;
- ssize_t size_ret;
- struct relay_session *session = conn->session;
- struct lttcomm_relayd_metadata_payload *metadata_struct;
- struct relay_stream *metadata_stream;
- uint64_t data_size, payload_size;
-
- if (!session) {
- ERR("Metadata sent before version check");
- ret = -1;
- goto end;
- }
-
- data_size = payload_size = be64toh(recv_hdr->data_size);
- if (data_size < sizeof(struct lttcomm_relayd_metadata_payload)) {
- ERR("Incorrect data size");
- ret = -1;
- goto end;
- }
- payload_size -= sizeof(struct lttcomm_relayd_metadata_payload);
-
- if (data_buffer_size < data_size) {
- /* In case the realloc fails, we can free the memory */
- char *tmp_data_ptr;
-
- tmp_data_ptr = realloc(data_buffer, data_size);
- if (!tmp_data_ptr) {
- ERR("Allocating data buffer");
- free(data_buffer);
- ret = -1;
- goto end;
- }
- data_buffer = tmp_data_ptr;
- data_buffer_size = data_size;
- }
- memset(data_buffer, 0, data_size);
- DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
- size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
- if (size_ret < 0 || size_ret != data_size) {
- if (size_ret == 0) {
- /* Orderly shutdown. Not necessary to print an error. */
- DBG("Socket %d did an orderly shutdown", conn->sock->fd);
- } else {
- ERR("Relay didn't receive the whole metadata");
- }
- ret = -1;
- goto end;
- }
- metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
-
- metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id));
- if (!metadata_stream) {
- ret = -1;
- goto end;
- }
-
- pthread_mutex_lock(&metadata_stream->lock);
-
- size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload,
- payload_size);
- if (size_ret < payload_size) {
- ERR("Relay error writing metadata on file");
- ret = -1;
- goto end_put;
- }
-
- size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
- be32toh(metadata_struct->padding_size));
- if (size_ret < 0) {
- goto end_put;
- }
-
- metadata_stream->metadata_received +=
- payload_size + be32toh(metadata_struct->padding_size);
- DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
- metadata_stream->metadata_received);
-
- ret = try_rotate_stream(metadata_stream);
- if (ret < 0) {
- goto end_put;
- }
-
-end_put:
- pthread_mutex_unlock(&metadata_stream->lock);
- stream_put(metadata_stream);
-end:
- return ret;
-}
-
-/*
- * relay_send_version: send relayd version number
- */
-static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_connection *conn)
-{
- int ret;
- struct lttcomm_relayd_version reply, msg;
- bool compatible = true;
-
- conn->version_check_done = 1;
-
- /* Get version from the other side. */
- ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
- if (ret < 0 || ret != sizeof(msg)) {
- if (ret == 0) {
- /* Orderly shutdown. Not necessary to print an error. */
- DBG("Socket %d did an orderly shutdown", conn->sock->fd);
- } else {
- ERR("Relay failed to receive the version values.");
- }
- ret = -1;
- goto end;
- }
-
- memset(&reply, 0, sizeof(reply));
- reply.major = RELAYD_VERSION_COMM_MAJOR;
- reply.minor = RELAYD_VERSION_COMM_MINOR;
-
- /* Major versions must be the same */
- if (reply.major != be32toh(msg.major)) {
- DBG("Incompatible major versions (%u vs %u), deleting session",
- reply.major, be32toh(msg.major));
- compatible = false;
- }
-
- conn->major = reply.major;
- /* We adapt to the lowest compatible version */
- if (reply.minor <= be32toh(msg.minor)) {
- conn->minor = reply.minor;
- } else {
- conn->minor = be32toh(msg.minor);
- }
-
- reply.major = htobe32(reply.major);
- reply.minor = htobe32(reply.minor);
- ret = conn->sock->ops->sendmsg(conn->sock, &reply,
- sizeof(struct lttcomm_relayd_version), 0);
- if (ret < 0) {
- ERR("Relay sending version");
- }
-
- if (!compatible) {
- ret = -1;
- goto end;
- }
-
- DBG("Version check done using protocol %u.%u", conn->major,
- conn->minor);
-
-end:
- return ret;
-}
-
-/*
- * Check for data pending for a given stream id from the session daemon.
- */
-static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_connection *conn)
-{
- struct relay_session *session = conn->session;
- struct lttcomm_relayd_data_pending msg;
- struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
- int ret;
- uint64_t last_net_seq_num, stream_id;
-
- DBG("Data pending command received");
-
- if (!session || conn->version_check_done == 0) {
- ERR("Trying to check for data before version check");
- ret = -1;
- goto end_no_session;
- }
-
- ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
- if (ret < sizeof(msg)) {
- if (ret == 0) {
- /* Orderly shutdown. Not necessary to print an error. */
- DBG("Socket %d did an orderly shutdown", conn->sock->fd);
- } else {
- ERR("Relay didn't receive valid data_pending struct size : %d",
- ret);
- }
- ret = -1;
- goto end_no_session;
- }
-
- stream_id = be64toh(msg.stream_id);
- last_net_seq_num = be64toh(msg.last_net_seq_num);
-
- stream = stream_get_by_id(stream_id);
- if (stream == NULL) {
- ret = -1;
- goto end;
- }
-
- pthread_mutex_lock(&stream->lock);
-
- DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
- " and last_seq %" PRIu64, stream_id, stream->prev_seq,
- last_net_seq_num);
-
- /* Avoid wrapping issue */
- if (((int64_t) (stream->prev_seq - last_net_seq_num)) >= 0) {
- /* Data has in fact been written and is NOT pending */
- ret = 0;
- } else {
- /* Data still being streamed thus pending */
- ret = 1;
- }
-
- stream->data_pending_check_done = true;
- pthread_mutex_unlock(&stream->lock);
-
- stream_put(stream);