+/*
+ * Check for data pending for a given stream id from the session daemon.
+ */
+static
+int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ int ret;
+ uint64_t last_net_seq_num, stream_id;
+
+ DBG("Data pending command received");
+
+ if (!session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid data_pending struct size : %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ stream_id = be64toh(msg.stream_id);
+ last_net_seq_num = be64toh(msg.last_net_seq_num);
+
+ rcu_read_lock();
+ stream = relay_stream_from_stream_id(stream_id, streams_ht);
+ if (stream == NULL) {
+ ret = -1;
+ goto end_unlock;
+ }
+
+ DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
+ " and last_seq %" PRIu64, stream_id, stream->prev_seq,
+ last_net_seq_num);
+
+ /* Avoid wrapping issue */
+ if (((int64_t) (stream->prev_seq - last_net_seq_num)) >= 0) {
+ /* Data has in fact been written and is NOT pending */
+ ret = 0;
+ } else {
+ /* Data still being streamed thus pending */
+ ret = 1;
+ }
+
+ /* Pending check is now done. */
+ stream->data_pending_check_done = 1;
+
+end_unlock:
+ rcu_read_unlock();
+
+ reply.ret_code = htobe32(ret);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay data pending ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Wait for the control socket to reach a quiescent state.
+ *
+ * Note that for now, when receiving this command from the session daemon, this
+ * means that every subsequent commands or data received on the control socket
+ * has been handled. So, this is why we simply return OK here.
+ */
+static
+int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ uint64_t stream_id;
+ struct relay_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_quiescent_control msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ DBG("Checking quiescent state on control socket");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid begin data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ stream_id = be64toh(msg.stream_id);
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->stream_handle == stream_id) {
+ stream->data_pending_check_done = 1;
+ DBG("Relay quiescent control pending flag set to %" PRIu64,
+ stream_id);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay data quiescent control ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Initialize a data pending command. This means that a client is about to ask
+ * for data pending for each stream he/she holds. Simply iterate over all
+ * streams of a session and set the data_pending_check_done flag.
+ *
+ * This command returns to the client a LTTNG_OK code.
+ */
+static
+int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+
+ assert(recv_hdr);
+ assert(cmd);
+ assert(streams_ht);
+
+ DBG("Init streams for data pending");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid begin data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /*
+ * Iterate over all streams to set the begin data pending flag. For now, the
+ * streams are indexed by stream handle so we have to iterate over all
+ * streams to find the one associated with the right session_id.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->session->id == session_id) {
+ stream->data_pending_check_done = 0;
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(LTTNG_OK);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay begin data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * End data pending command. This will check, for a given session id, if each
+ * stream associated with it has its data_pending_check_done flag set. If not,
+ * this means that the client lost track of the stream but the data is still
+ * being streamed on our side. In this case, we inform the client that data is
+ * inflight.
+ *
+ * Return to the client if there is data in flight or not with a ret_code.
+ */
+static
+int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+ uint32_t is_data_inflight = 0;
+
+ assert(recv_hdr);
+ assert(cmd);
+ assert(streams_ht);
+
+ DBG("End data pending command");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid end data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /* Iterate over all streams to see if the begin data pending flag is set. */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->session->id == session_id &&
+ !stream->data_pending_check_done) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(is_data_inflight);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay end data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+