+/*
+ * relay_close_stream: close a specific stream
+ */
+static
+int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_close_stream stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ int ret, send_ret;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+
+ DBG("Close stream received");
+
+ if (!session || session->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
+ sizeof(struct lttcomm_relayd_close_stream), MSG_WAITALL);
+ if (ret < sizeof(struct lttcomm_relayd_close_stream)) {
+ ERR("Relay didn't receive valid add_stream struct size : %d", ret);
+ ret = -1;
+ goto end_no_session;
+ }
+
+ rcu_read_lock();
+ lttng_ht_lookup(streams_ht,
+ (void *)((unsigned long) be64toh(stream_info.stream_id)),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ DBG("Relay stream %" PRIu64 " not found", be64toh(stream_info.stream_id));
+ ret = -1;
+ goto end_unlock;
+ }
+
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ if (!stream) {
+ ret = -1;
+ goto end_unlock;
+ }
+
+ stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+ stream->close_flag = 1;
+
+ if (close_stream_check(stream)) {
+ int delret;
+
+ close(stream->fd);
+ delret = lttng_ht_del(streams_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+ }
+
+end_unlock:
+ rcu_read_unlock();
+
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending stream id");
+ }
+
+end_no_session:
+ return ret;
+}
+