Fix: flag metadata stream on quiescent control cmd
authorDavid Goulet <dgoulet@efficios.com>
Tue, 18 Dec 2012 19:02:14 +0000 (14:02 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Tue, 18 Dec 2012 19:22:24 +0000 (14:22 -0500)
For the relayd, when doing a quiescent control command, we have to flag
the corresponding metadata stream or else it will simply stay alive
until a close stream and always returning that data is inflight at the
end data pending command.

Add a stream id to the relayd command so the relayd can identify which
stream to flag.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/main.c
src/common/consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h

index 009621a6f7b1c4701a0857848346f29c66b8511a..a3eef148810e3219d1978edeacfe1e2434d46690 100644 (file)
@@ -1460,19 +1460,51 @@ end_no_session:
  */
 static
 int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
-               struct relay_command *cmd)
+               struct relay_command *cmd, struct lttng_ht *streams_ht)
 {
        int ret;
+       uint64_t stream_id;
+       struct relay_stream *stream;
+       struct lttng_ht_iter iter;
+       struct lttcomm_relayd_quiescent_control msg;
        struct lttcomm_relayd_generic_reply reply;
 
        DBG("Checking quiescent state on control socket");
 
+       if (!cmd->session || cmd->version_check_done == 0) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+       if (ret < sizeof(msg)) {
+               ERR("Relay didn't receive valid begin data_pending struct size: %d",
+                               ret);
+               ret = -1;
+               goto end_no_session;
+       }
+
+       stream_id = be64toh(msg.stream_id);
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+               if (stream->stream_handle == stream_id) {
+                       stream->data_pending_check_done = 1;
+                       DBG("Relay quiescent control pending flag set to %" PRIu64,
+                                       stream_id);
+                       break;
+               }
+       }
+       rcu_read_unlock();
+
        reply.ret_code = htobe32(LTTNG_OK);
        ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
        if (ret < 0) {
                ERR("Relay data quiescent control ret code failed");
        }
 
+end_no_session:
        return ret;
 }
 
@@ -1643,7 +1675,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
                ret = relay_data_pending(recv_hdr, cmd, streams_ht);
                break;
        case RELAYD_QUIESCENT_CONTROL:
-               ret = relay_quiescent_control(recv_hdr, cmd);
+               ret = relay_quiescent_control(recv_hdr, cmd, streams_ht);
                break;
        case RELAYD_BEGIN_DATA_PENDING:
                ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
index af99333c8d2ae1bacc73f7f4af5fcd231e79e104..c74d1f7ae3cbded46415430ca20054c8686de416 100644 (file)
@@ -3006,7 +3006,8 @@ int consumer_data_pending(uint64_t id)
                if (relayd) {
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
-                               ret = relayd_quiescent_control(&relayd->control_sock);
+                               ret = relayd_quiescent_control(&relayd->control_sock,
+                                               stream->relayd_stream_id);
                        } else {
                                ret = relayd_data_pending(&relayd->control_sock,
                                                stream->relayd_stream_id, stream->next_net_seq_num);
index 2ec6ef2db39aebac9451f3b560884bfb0bde5f66..c999aafbb712472d4df4ebccc1398abe0c861fc0 100644 (file)
@@ -466,9 +466,11 @@ error:
 /*
  * Check on the relayd side for a quiescent state on the control socket.
  */
-int relayd_quiescent_control(struct lttcomm_sock *sock)
+int relayd_quiescent_control(struct lttcomm_sock *sock,
+               uint64_t metadata_stream_id)
 {
        int ret;
+       struct lttcomm_relayd_quiescent_control msg;
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
@@ -476,8 +478,10 @@ int relayd_quiescent_control(struct lttcomm_sock *sock)
 
        DBG("Relayd checking quiescent control state");
 
+       msg.stream_id = htobe64(metadata_stream_id);
+
        /* Send command */
-       ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0);
+       ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
index 2a46cd3eb117f3b4db23f308260953dfe97943e0..fd0acfb3583d1c4b10b80f0e45649ca00271a7b3 100644 (file)
@@ -38,7 +38,8 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
                struct lttcomm_relayd_data_hdr *hdr, size_t size);
 int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
                uint64_t last_net_seq_num);
-int relayd_quiescent_control(struct lttcomm_sock *sock);
+int relayd_quiescent_control(struct lttcomm_sock *sock,
+               uint64_t metadata_stream_id);
 int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id);
 int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
                unsigned int *is_data_inflight);
index 7a89c3127920e3eb7db12a187446ea3a9f3eccaa..0a8f53b4ac7d40c46215637403edda5daa5c1551 100644 (file)
@@ -133,4 +133,8 @@ struct lttcomm_relayd_end_data_pending {
        uint64_t session_id;
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_quiescent_control {
+       uint64_t stream_id;
+} LTTNG_PACKED;
+
 #endif /* _RELAYD_COMM */
This page took 0.043162 seconds and 4 git commands to generate.