+ assert(recv_hdr);
+ assert(conn);
+
+ DBG("End data pending command");
+
+ if (!conn->session || conn->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid end data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /* Iterate over all streams to see if the begin data pending flag is set. */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (stream->session_id == session_id &&
+ !stream->data_pending_check_done && !stream->terminated_flag) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ memset(&reply, 0, sizeof(reply));
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(is_data_inflight);
+
+ ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay end data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive an index for a specific stream.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret, send_ret, index_created = 0;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_index index_info;
+ struct relay_index *index, *wr_index = NULL;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t net_seq_num;
+
+ assert(conn);
+
+ DBG("Relay receiving index");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &index_info,
+ sizeof(index_info), 0);
+ if (ret < sizeof(index_info)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid index struct size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ net_seq_num = be64toh(index_info.net_seq_num);
+
+ rcu_read_lock();
+ stream = stream_find_by_id(relay_streams_ht,
+ be64toh(index_info.relay_stream_id));
+ if (!stream) {
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ /* Live beacon handling */
+ if (index_info.packet_size == 0) {
+ DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
+
+ /*
+ * Only flag a stream inactive when it has already received data
+ * and no indexes are in flight.
+ */
+ if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
+ stream->beacon_ts_end = be64toh(index_info.timestamp_end);
+ }
+ ret = 0;
+ goto end_rcu_unlock;
+ } else {
+ stream->beacon_ts_end = -1ULL;
+ }
+
+ index = relay_index_find(stream->stream_handle, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream->stream_handle, net_seq_num);
+ if (!index) {
+ goto end_rcu_unlock;
+ }
+ index_created = 1;
+ stream->indexes_in_flight++;
+ }
+
+ copy_index_control_data(index, &index_info);
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = be64toh(index_info.stream_id);
+ }
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created, set the data in this
+ * object and write it on disk.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ copy_index_control_data(wr_index, &index_info);
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto end_rcu_unlock;
+ }
+ stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
+ }
+
+end_rcu_unlock:
+ rcu_read_unlock();
+
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending close index id reply");
+ ret = send_ret;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret, send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(conn);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!conn->session || conn->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Flag every pending stream in the connection recv list that they are
+ * ready to be used by the viewer.
+ */
+ set_viewer_ready_flag(conn);
+
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ if (conn->session->viewer_refcount) {
+ uatomic_set(&conn->session->new_streams, 1);
+ }
+
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending sent_stream reply");
+ ret = send_ret;
+ } else {
+ /* Success. */
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Process the commands received on the control socket
+ */
+static
+int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret = 0;
+
+ switch (be32toh(recv_hdr->cmd)) {
+ case RELAYD_CREATE_SESSION:
+ ret = relay_create_session(recv_hdr, conn);
+ break;
+ case RELAYD_ADD_STREAM:
+ ret = relay_add_stream(recv_hdr, conn);
+ break;
+ case RELAYD_START_DATA:
+ ret = relay_start(recv_hdr, conn);
+ break;
+ case RELAYD_SEND_METADATA:
+ ret = relay_recv_metadata(recv_hdr, conn);
+ break;
+ case RELAYD_VERSION:
+ ret = relay_send_version(recv_hdr, conn);
+ break;
+ case RELAYD_CLOSE_STREAM:
+ ret = relay_close_stream(recv_hdr, conn);
+ break;
+ case RELAYD_DATA_PENDING:
+ ret = relay_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ ret = relay_quiescent_control(recv_hdr, conn);
+ break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ ret = relay_begin_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_END_DATA_PENDING:
+ ret = relay_end_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_SEND_INDEX:
+ ret = relay_recv_index(recv_hdr, conn);
+ break;
+ case RELAYD_STREAMS_SENT:
+ ret = relay_streams_sent(recv_hdr, conn);
+ break;
+ case RELAYD_UPDATE_SYNC_INFO:
+ default:
+ ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
+ relay_unknown_command(conn);
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Handle index for a data stream.
+ *
+ * RCU read side lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
+ int rotate_index)
+{
+ int ret = 0, index_created = 0;
+ uint64_t stream_id, data_offset;
+ struct relay_index *index, *wr_index = NULL;
+
+ assert(stream);
+
+ stream_id = stream->stream_handle;
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence number. If on
+ * exists, the control thread already received the data for it thus we need
+ * to write it on disk.
+ */
+ index = relay_index_find(stream_id, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream_id, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto error;
+ }
+ index_created = 1;
+ stream->indexes_in_flight++;
+ }
+
+ if (rotate_index || stream->index_fd < 0) {
+ index->to_close_fd = stream->index_fd;
+ ret = index_create_file(stream->path_name, stream->channel_name,
+ relayd_uid, relayd_gid, stream->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ /* This will close the stream's index fd if one. */
+ relay_index_free_safe(index);
+ goto error;
+ }
+ stream->index_fd = ret;
+ }
+ index->fd = stream->index_fd;
+ index->index_data.offset = data_offset;
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created and set the data.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ /* Copy back data from the created index. */
+ wr_index->fd = index->fd;
+ wr_index->to_close_fd = index->to_close_fd;
+ wr_index->index_data.offset = data_offset;
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
+ }
+
+error:
+ return ret;
+}
+
+/*
+ * relay_process_data: Process the data received on the data socket
+ */
+static
+int relay_process_data(struct relay_connection *conn)
+{
+ int ret = 0, rotate_index = 0;
+ ssize_t size_ret;
+ struct relay_stream *stream;
+ struct lttcomm_relayd_data_hdr data_hdr;
+ uint64_t stream_id;
+ uint64_t net_seq_num;
+ uint32_t data_size;
+ struct relay_session *session;
+
+ assert(conn);
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
+ sizeof(struct lttcomm_relayd_data_hdr), 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ }