From ad7051c0f2e1eb584c623794bf4b83548e3ccadc Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 18 Dec 2012 14:02:14 -0500 Subject: [PATCH 1/1] Fix: flag metadata stream on quiescent control cmd For the relayd, when doing a quiescent control command, we have to flag the corresponding metadata stream or else it will simply stay alive until a close stream and always returning that data is inflight at the end data pending command. Add a stream id to the relayd command so the relayd can identify which stream to flag. Signed-off-by: David Goulet --- src/bin/lttng-relayd/main.c | 36 +++++++++++++++++++++++++++++-- src/common/consumer.c | 3 ++- src/common/relayd/relayd.c | 8 +++++-- src/common/relayd/relayd.h | 3 ++- src/common/sessiond-comm/relayd.h | 4 ++++ 5 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 009621a6f..a3eef1488 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1460,19 +1460,51 @@ end_no_session: */ static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd) + struct relay_command *cmd, struct lttng_ht *streams_ht) { int ret; + uint64_t stream_id; + struct relay_stream *stream; + struct lttng_ht_iter iter; + struct lttcomm_relayd_quiescent_control msg; struct lttcomm_relayd_generic_reply reply; DBG("Checking quiescent state on control socket"); + if (!cmd->session || cmd->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); + if (ret < sizeof(msg)) { + ERR("Relay didn't receive valid begin data_pending struct size: %d", + ret); + ret = -1; + goto end_no_session; + } + + stream_id = be64toh(msg.stream_id); + + rcu_read_lock(); + cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + if (stream->stream_handle == stream_id) { + stream->data_pending_check_done = 1; + DBG("Relay quiescent control pending flag set to %" PRIu64, + stream_id); + break; + } + } + rcu_read_unlock(); + reply.ret_code = htobe32(LTTNG_OK); ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); if (ret < 0) { ERR("Relay data quiescent control ret code failed"); } +end_no_session: return ret; } @@ -1643,7 +1675,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_data_pending(recv_hdr, cmd, streams_ht); break; case RELAYD_QUIESCENT_CONTROL: - ret = relay_quiescent_control(recv_hdr, cmd); + ret = relay_quiescent_control(recv_hdr, cmd, streams_ht); break; case RELAYD_BEGIN_DATA_PENDING: ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht); diff --git a/src/common/consumer.c b/src/common/consumer.c index af99333c8..c74d1f7ae 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -3006,7 +3006,8 @@ int consumer_data_pending(uint64_t id) if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { - ret = relayd_quiescent_control(&relayd->control_sock); + ret = relayd_quiescent_control(&relayd->control_sock, + stream->relayd_stream_id); } else { ret = relayd_data_pending(&relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 2ec6ef2db..c999aafbb 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -466,9 +466,11 @@ error: /* * Check on the relayd side for a quiescent state on the control socket. */ -int relayd_quiescent_control(struct lttcomm_sock *sock) +int relayd_quiescent_control(struct lttcomm_sock *sock, + uint64_t metadata_stream_id) { int ret; + struct lttcomm_relayd_quiescent_control msg; struct lttcomm_relayd_generic_reply reply; /* Code flow error. Safety net. */ @@ -476,8 +478,10 @@ int relayd_quiescent_control(struct lttcomm_sock *sock) DBG("Relayd checking quiescent control state"); + msg.stream_id = htobe64(metadata_stream_id); + /* Send command */ - ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0); + ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0); if (ret < 0) { goto error; } diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 2a46cd3eb..fd0acfb35 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -38,7 +38,8 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock, struct lttcomm_relayd_data_hdr *hdr, size_t size); int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id, uint64_t last_net_seq_num); -int relayd_quiescent_control(struct lttcomm_sock *sock); +int relayd_quiescent_control(struct lttcomm_sock *sock, + uint64_t metadata_stream_id); int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id); int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id, unsigned int *is_data_inflight); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 7a89c3127..0a8f53b4a 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -133,4 +133,8 @@ struct lttcomm_relayd_end_data_pending { uint64_t session_id; } LTTNG_PACKED; +struct lttcomm_relayd_quiescent_control { + uint64_t stream_id; +} LTTNG_PACKED; + #endif /* _RELAYD_COMM */ -- 2.34.1