+ ssize_t send_ret;
+ int ret;
+ uint64_t stream_seq;
+
+ DBG("Data pending command received");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (payload->size < sizeof(msg)) {
+ ERR("Unexpected payload size in \"relay_data_pending\": expected >= %zu bytes, got %zu bytes",
+ sizeof(msg), payload->size);
+ ret = -1;
+ goto end_no_session;
+ }
+ memcpy(&msg, payload->data, sizeof(msg));
+ msg.stream_id = be64toh(msg.stream_id);
+ msg.last_net_seq_num = be64toh(msg.last_net_seq_num);
+
+ stream = stream_get_by_id(msg.stream_id);
+ if (stream == NULL) {
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+
+ if (session_streams_have_index(session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_data_seq;
+ }
+ DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64
+ ", prev_index_seq %" PRIu64
+ ", and last_seq %" PRIu64, msg.stream_id,
+ stream->prev_data_seq, stream->prev_index_seq,
+ msg.last_net_seq_num);
+
+ /* Avoid wrapping issue */
+ if (((int64_t) (stream_seq - msg.last_net_seq_num)) >= 0) {
+ /* Data has in fact been written and is NOT pending */
+ ret = 0;
+ } else {
+ /* Data still being streamed thus pending */
+ ret = 1;
+ }
+
+ stream->data_pending_check_done = true;
+ pthread_mutex_unlock(&stream->lock);
+
+ stream_put(stream);
+end:
+
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(ret);
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"data pending\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Wait for the control socket to reach a quiescent state.
+ *
+ * Note that for now, when receiving this command from the session
+ * daemon, this means that every subsequent commands or data received on
+ * the control socket has been handled. So, this is why we simply return
+ * OK here.
+ */
+static int relay_quiescent_control(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_stream *stream;
+ struct lttcomm_relayd_quiescent_control msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ DBG("Checking quiescent state on control socket");
+
+ if (!conn->session || !conn->version_check_done) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (payload->size < sizeof(msg)) {
+ ERR("Unexpected payload size in \"relay_quiescent_control\": expected >= %zu bytes, got %zu bytes",
+ sizeof(msg), payload->size);
+ ret = -1;
+ goto end_no_session;
+ }
+ memcpy(&msg, payload->data, sizeof(msg));
+ msg.stream_id = be64toh(msg.stream_id);
+
+ stream = stream_get_by_id(msg.stream_id);
+ if (!stream) {
+ goto reply;
+ }
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = true;
+ pthread_mutex_unlock(&stream->lock);
+
+ DBG("Relay quiescent control pending flag set to %" PRIu64, msg.stream_id);
+ stream_put(stream);
+reply:
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"quiescent control\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ } else {
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Initialize a data pending command. This means that a consumer is about
+ * to ask for data pending for each stream it holds. Simply iterate over
+ * all streams of a session and set the data_pending_check_done flag.
+ *
+ * This command returns to the client a LTTNG_OK code.
+ */
+static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+
+ assert(recv_hdr);
+ assert(conn);
+
+ DBG("Init streams for data pending");
+
+ if (!conn->session || !conn->version_check_done) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (payload->size < sizeof(msg)) {
+ ERR("Unexpected payload size in \"relay_begin_data_pending\": expected >= %zu bytes, got %zu bytes",
+ sizeof(msg), payload->size);
+ ret = -1;
+ goto end_no_session;
+ }
+ memcpy(&msg, payload->data, sizeof(msg));
+ msg.session_id = be64toh(msg.session_id);
+
+ /*
+ * Iterate over all streams to set the begin data pending flag.
+ * For now, the streams are indexed by stream handle so we have
+ * to iterate over all streams to find the one associated with
+ * the right session_id.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session->id == msg.session_id) {
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = false;
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
+ }
+ stream_put(stream);
+ }
+ rcu_read_unlock();
+
+ memset(&reply, 0, sizeof(reply));
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(LTTNG_OK);
+
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"begin data pending\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ } else {
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * End data pending command. This will check, for a given session id, if
+ * each stream associated with it has its data_pending_check_done flag
+ * set. If not, this means that the client lost track of the stream but
+ * the data is still being streamed on our side. In this case, we inform
+ * the client that data is in flight.
+ *
+ * Return to the client if there is data in flight or not with a ret_code.
+ */
+static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint32_t is_data_inflight = 0;
+
+ DBG("End data pending command");
+
+ if (!conn->session || !conn->version_check_done) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (payload->size < sizeof(msg)) {
+ ERR("Unexpected payload size in \"relay_end_data_pending\": expected >= %zu bytes, got %zu bytes",
+ sizeof(msg), payload->size);
+ ret = -1;
+ goto end_no_session;
+ }
+ memcpy(&msg, payload->data, sizeof(msg));
+ msg.session_id = be64toh(msg.session_id);
+
+ /*
+ * Iterate over all streams to see if the begin data pending
+ * flag is set.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session->id != msg.session_id) {
+ stream_put(stream);
+ continue;
+ }
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->data_pending_check_done) {
+ uint64_t stream_seq;
+
+ if (session_streams_have_index(conn->session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_data_seq;
+ }
+ if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ }
+ rcu_read_unlock();
+
+ memset(&reply, 0, sizeof(reply));
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(is_data_inflight);
+
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"end data pending\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ } else {
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive an index for a specific stream.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_index index_info;
+ struct relay_index *index;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ size_t msg_len;
+
+ assert(conn);
+
+ DBG("Relay receiving index");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ msg_len = lttcomm_relayd_index_len(
+ lttng_to_index_major(conn->major, conn->minor),
+ lttng_to_index_minor(conn->major, conn->minor));
+ if (payload->size < msg_len) {
+ ERR("Unexpected payload size in \"relay_recv_index\": expected >= %zu bytes, got %zu bytes",
+ msg_len, payload->size);
+ ret = -1;
+ goto end_no_session;
+ }
+ memcpy(&index_info, payload->data, msg_len);
+ index_info.relay_stream_id = be64toh(index_info.relay_stream_id);
+ index_info.net_seq_num = be64toh(index_info.net_seq_num);
+ index_info.packet_size = be64toh(index_info.packet_size);
+ index_info.content_size = be64toh(index_info.content_size);
+ index_info.timestamp_begin = be64toh(index_info.timestamp_begin);
+ index_info.timestamp_end = be64toh(index_info.timestamp_end);
+ index_info.events_discarded = be64toh(index_info.events_discarded);
+ index_info.stream_id = be64toh(index_info.stream_id);
+
+ if (conn->minor >= 8) {
+ index_info.stream_instance_id =
+ be64toh(index_info.stream_instance_id);
+ index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
+ }
+
+ stream = stream_get_by_id(index_info.relay_stream_id);
+ if (!stream) {
+ ERR("stream_get_by_id not found");
+ ret = -1;
+ goto end;
+ }
+ pthread_mutex_lock(&stream->lock);
+
+ /* Live beacon handling */
+ if (index_info.packet_size == 0) {
+ DBG("Received live beacon for stream %" PRIu64,
+ stream->stream_handle);
+
+ /*
+ * Only flag a stream inactive when it has already
+ * received data and no indexes are in flight.
+ */
+ if (stream->index_received_seqcount > 0
+ && stream->indexes_in_flight == 0) {
+ stream->beacon_ts_end = index_info.timestamp_end;
+ }
+ ret = 0;
+ goto end_stream_put;
+ } else {
+ stream->beacon_ts_end = -1ULL;
+ }
+
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = index_info.stream_id;
+ }
+ index = relay_index_get_by_id_or_create(stream, index_info.net_seq_num);
+ if (!index) {
+ ret = -1;
+ ERR("relay_index_get_by_id_or_create index NULL");
+ goto end_stream_put;
+ }
+ if (set_index_control_data(index, &index_info, conn)) {
+ ERR("set_index_control_data error");
+ relay_index_put(index);
+ ret = -1;
+ goto end_stream_put;
+ }
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
+ stream->pos_after_last_complete_data_index += index->total_size;
+ stream->prev_index_seq = index_info.net_seq_num;
+
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end_stream_put;
+ }
+ } else if (ret > 0) {
+ /* no flush. */
+ ret = 0;
+ } else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
+ ret = -1;
+ }
+
+end_stream_put:
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+
+end:
+
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"recv index\" command reply (ret = %zd)", send_ret);
+ ret = -1;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int relay_streams_sent(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(conn);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!conn->session || !conn->version_check_done) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Publish every pending stream in the connection recv list which are
+ * now ready to be used by the viewer.
+ */
+ publish_connection_local_streams(conn);
+
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"streams sent\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ } else {
+ /* Success. */
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * relay_rotate_session_stream: rotate a stream to a new tracefile for the session
+ * rotation feature (not the tracefile rotation feature).
+ */
+static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_rotate_stream stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ size_t header_len;
+ size_t path_len;
+ struct lttng_buffer_view new_path_view;
+
+ DBG("Rotate stream received");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to rotate a stream before version check");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ if (session->major == 2 && session->minor < 11) {
+ ERR("Unsupported feature before 2.11");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ header_len = sizeof(struct lttcomm_relayd_rotate_stream);
+
+ if (payload->size < header_len) {
+ ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
+ header_len, payload->size);
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ memcpy(&stream_info, payload->data, header_len);
+
+ /* Convert to host */
+ stream_info.pathname_length = be32toh(stream_info.pathname_length);
+ stream_info.stream_id = be64toh(stream_info.stream_id);
+ stream_info.new_chunk_id = be64toh(stream_info.new_chunk_id);
+ stream_info.rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+
+ path_len = stream_info.pathname_length;
+ if (payload->size < header_len + path_len) {
+ ERR("Unexpected payload size in \"relay_rotate_session_stream\" including path: expected >= %zu bytes, got %zu bytes",
+ header_len + path_len, payload->size);
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ /* Ensure it fits in local filename length. */
+ if (path_len >= LTTNG_PATH_MAX) {
+ ret = -ENAMETOOLONG;
+ ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes",
+ path_len, LTTNG_PATH_MAX);
+ goto end;
+ }
+
+ new_path_view = lttng_buffer_view_from_view(payload, header_len,
+ stream_info.pathname_length);
+
+ stream = stream_get_by_id(stream_info.stream_id);
+ if (!stream) {
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * Update the trace path (just the folder, the stream name does not
+ * change).
+ */
+ free(stream->prev_path_name);
+ stream->prev_path_name = stream->path_name;
+ stream->path_name = create_output_path(new_path_view.data);
+ if (!stream->path_name) {
+ ERR("Failed to create a new output path");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
+ -1, -1);
+ if (ret < 0) {
+ ERR("relay creating output directory");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ assert(stream->current_chunk_id.is_set);
+ stream->current_chunk_id.value = stream_info.new_chunk_id;
+
+ if (stream->is_metadata) {
+ /*
+ * Metadata streams have no index; consider its rotation
+ * complete.
+ */
+ stream->index_rotated = true;
+ /*
+ * The metadata stream is sent only over the control connection
+ * so we know we have all the data to perform the stream
+ * rotation.
+ */
+ ret = do_rotate_stream_data(stream);
+ } else {
+ stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end_stream_unlock;
+ }
+
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end_stream_unlock;
+ }
+ }
+
+end_stream_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+end:
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"rotate session stream\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+
+end_no_reply:
+ return ret;
+}
+
+/*
+ * relay_mkdir: Create a folder on the disk.
+ */
+static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{