return ret;
}
+/*
+ * When we have received all the streams and the metadata for a channel,
+ * we make them visible to the viewer threads.
+ */
+static
+void set_viewer_ready_flag(struct relay_command *cmd)
+{
+ struct relay_stream_recv_handle *node, *tmp_node;
+
+ cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ stream = relay_stream_find_by_id(node->id);
+ if (!stream) {
+ /*
+ * Stream is most probably being cleaned up by the data thread thus
+ * simply continue to the next one.
+ */
+ rcu_read_unlock();
+ continue;
+ }
+
+ stream->viewer_ready = 1;
+ rcu_read_unlock();
+
+ /* Clean stream handle node. */
+ cds_list_del(&node->node);
+ free(node);
+ }
+
+ return;
+}
+
+/*
+ * Add a recv handle node to the connection recv list with the given stream
+ * handle. A new node is allocated thus must be freed when the node is deleted
+ * from the list.
+ */
+static void queue_stream_handle(uint64_t handle, struct relay_command *cmd)
+{
+ struct relay_stream_recv_handle *node;
+
+ assert(cmd);
+
+ node = zmalloc(sizeof(*node));
+ if (!node) {
+ PERROR("zmalloc queue stream handle");
+ return;
+ }
+
+ node->id = handle;
+ cds_list_add(&node->node, &cmd->recv_head);
+}
+
/*
* relay_add_stream: allocate a new stream for a session
*/
ctf_trace_assign(cmd->ctf_traces_ht, stream);
stream->ctf_traces_ht = cmd->ctf_traces_ht;
+ /*
+ * Add the stream handle in the recv list of the connection. Once the end
+ * stream message is received, this list is emptied and streams are set
+ * with the viewer ready flag.
+ */
+ if (stream->metadata_flag) {
+ stream->viewer_ready = 1;
+ } else {
+ queue_stream_handle(stream->stream_handle, cmd);
+ }
+
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_add_unique_ulong(relay_streams_ht,
return ret;
}
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret, send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(cmd);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Flag every pending stream in the connection recv list that they are
+ * ready to be used by the viewer.
+ */
+ set_viewer_ready_flag(cmd);
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending sent_stream reply");
+ ret = send_ret;
+ } else {
+ /* Success. */
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* Process the commands received on the control socket
*/
case RELAYD_SEND_INDEX:
ret = relay_recv_index(recv_hdr, cmd);
break;
+ case RELAYD_STREAMS_SENT:
+ ret = relay_streams_sent(recv_hdr, cmd);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
PERROR("read relay cmd pipe");
goto error_read;
}
+ CDS_INIT_LIST_HEAD(&relay_connection->recv_head);
/*
* Only used by the control side and the reference is copied inside each
assert(!ret);
if (relay_connection->type == RELAY_CONTROL) {
+ struct relay_stream_recv_handle *node, *tmp_node;
+
relay_delete_session(relay_connection, sessions_ht);
lttng_ht_destroy(relay_connection->ctf_traces_ht);
+
+ /* Clean up recv list. */
+ cds_list_for_each_entry_safe(node, tmp_node,
+ &relay_connection->recv_head, node) {
+ cds_list_del(&node->node);
+ free(node);
+ }
}
call_rcu(&relay_connection->rcu_node, deferred_free_connection);