+end_no_session:
+ return ret;
+}
+
+/*
+ * Initialize a data pending command. This means that a client is about to ask
+ * for data pending for each stream he/she holds. Simply iterate over all
+ * streams of a session and set the data_pending_check_done flag.
+ *
+ * This command returns to the client a LTTNG_OK code.
+ */
+static
+int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+
+ assert(recv_hdr);
+ assert(cmd);
+
+ DBG("Init streams for data pending");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid begin data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /*
+ * Iterate over all streams to set the begin data pending flag. For now, the
+ * streams are indexed by stream handle so we have to iterate over all
+ * streams to find the one associated with the right session_id.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ stream_n.node) {
+ if (stream->session->id == session_id) {
+ stream->data_pending_check_done = 0;
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(LTTNG_OK);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay begin data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * End data pending command. This will check, for a given session id, if each
+ * stream associated with it has its data_pending_check_done flag set. If not,
+ * this means that the client lost track of the stream but the data is still
+ * being streamed on our side. In this case, we inform the client that data is
+ * inflight.
+ *
+ * Return to the client if there is data in flight or not with a ret_code.
+ */
+static
+int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+ uint32_t is_data_inflight = 0;
+
+ assert(recv_hdr);
+ assert(cmd);
+
+ DBG("End data pending command");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid end data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /* Iterate over all streams to see if the begin data pending flag is set. */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ stream_n.node) {
+ if (stream->session->id == session_id &&
+ !stream->data_pending_check_done) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(is_data_inflight);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay end data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive an index for a specific stream.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret, send_ret, index_created = 0;
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_index index_info;
+ struct relay_index *index, *wr_index = NULL;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t net_seq_num;
+
+ assert(cmd);
+
+ DBG("Relay receiving index");
+
+ if (!session || cmd->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &index_info,
+ sizeof(index_info), 0);
+ if (ret < sizeof(index_info)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid index struct size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ net_seq_num = be64toh(index_info.net_seq_num);
+
+ rcu_read_lock();
+ stream = relay_stream_find_by_id(be64toh(index_info.relay_stream_id));
+ if (!stream) {
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ /* Live beacon handling */
+ if (index_info.packet_size == 0) {
+ DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
+
+ /*
+ * Only flag a stream inactive when it has already received data.
+ */
+ if (stream->total_index_received > 0) {
+ stream->beacon_ts_end = be64toh(index_info.timestamp_end);
+ }
+ ret = 0;
+ goto end_rcu_unlock;
+ } else {
+ stream->beacon_ts_end = -1ULL;
+ }
+
+ index = relay_index_find(stream->stream_handle, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream->stream_handle, net_seq_num);
+ if (!index) {
+ goto end_rcu_unlock;
+ }
+ index_created = 1;
+ }
+
+ copy_index_control_data(index, &index_info);
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created, set the data in this
+ * object and write it on disk.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ copy_index_control_data(wr_index, &index_info);
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ /* Starting at 2.4, create the index file if none available. */
+ if (cmd->minor >= 4 && stream->index_fd < 0) {
+ ret = index_create_file(stream->path_name, stream->channel_name,
+ relayd_uid, relayd_gid, stream->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end_rcu_unlock;
+ }
+ stream->index_fd = ret;
+ }
+
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto end_rcu_unlock;
+ }
+ stream->total_index_received++;
+ }
+
+end_rcu_unlock:
+ rcu_read_unlock();
+
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending close index id reply");
+ ret = send_ret;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret, send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(cmd);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Flag every pending stream in the connection recv list that they are
+ * ready to be used by the viewer.
+ */
+ set_viewer_ready_flag(cmd);
+
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ uatomic_set(&cmd->session->new_streams, 1);
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending sent_stream reply");
+ ret = send_ret;
+ } else {
+ /* Success. */
+ ret = 0;