X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=a3eef148810e3219d1978edeacfe1e2434d46690;hp=009621a6f7b1c4701a0857848346f29c66b8511a;hb=ad7051c0f2e1eb584c623794bf4b83548e3ccadc;hpb=beaad64cecee395058e37c8b33dc50af99d771a4 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 009621a6f..a3eef1488 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1460,19 +1460,51 @@ end_no_session: */ static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd) + struct relay_command *cmd, struct lttng_ht *streams_ht) { int ret; + uint64_t stream_id; + struct relay_stream *stream; + struct lttng_ht_iter iter; + struct lttcomm_relayd_quiescent_control msg; struct lttcomm_relayd_generic_reply reply; DBG("Checking quiescent state on control socket"); + if (!cmd->session || cmd->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); + if (ret < sizeof(msg)) { + ERR("Relay didn't receive valid begin data_pending struct size: %d", + ret); + ret = -1; + goto end_no_session; + } + + stream_id = be64toh(msg.stream_id); + + rcu_read_lock(); + cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + if (stream->stream_handle == stream_id) { + stream->data_pending_check_done = 1; + DBG("Relay quiescent control pending flag set to %" PRIu64, + stream_id); + break; + } + } + rcu_read_unlock(); + reply.ret_code = htobe32(LTTNG_OK); ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); if (ret < 0) { ERR("Relay data quiescent control ret code failed"); } +end_no_session: return ret; } @@ -1643,7 +1675,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_data_pending(recv_hdr, cmd, streams_ht); break; case RELAYD_QUIESCENT_CONTROL: - ret = relay_quiescent_control(recv_hdr, cmd); + ret = relay_quiescent_control(recv_hdr, cmd, streams_ht); break; case RELAYD_BEGIN_DATA_PENDING: ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);