From e2acc8d25bc0073554a2f3e742e070b7034ec1d0 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 14 Dec 2017 11:00:30 -0500 Subject: [PATCH] Implement the RELAYD_ROTATE_PENDING relay daemon command MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This command allows the sessiond to check if a rotation is complete from the relayd point of view. There can be a significant delay between the time the consumer has finished extracting the data from the buffers and the time the relay has finished writing them on disk, and we can only inform the user that the rotation is complete when all the data is on disk. So the RELAYD_ROTATE_PENDING command is used to poll the relayd after the consumer has finished extracting the data until everything is on the relayd disk. This command also takes care of streams that did not exist on the consumer when the rotation started, or streams that appeared after the last rotation started. The chunk_id field is used to distinguish those cases. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/main.c | 109 +++++++++++++++++++++++ src/common/sessiond-comm/relayd.h | 4 + src/common/sessiond-comm/sessiond-comm.h | 2 + 3 files changed, 115 insertions(+) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index e8283f037..a57b1d4b0 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2789,6 +2789,112 @@ end_no_reply: return ret; } +/* + * Check if all the streams in the session have completed the last rotation. + * The chunk_id value is used to distinguish the cases where a stream was + * closed on the consumerd before the rotation started but it still active on + * the relayd, and the case where a stream appeared on the consumerd/relayd + * after the last rotation started (in that case, it is already writing in the + * new chunk folder). + */ +static +int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + struct relay_session *session = conn->session; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_generic_reply reply; + struct lttng_ht_iter iter; + struct relay_stream *stream; + int ret; + ssize_t network_ret; + uint64_t chunk_id; + bool rotate_pending = false; + + DBG("Rotate pending command received"); + + if (!session || !conn->version_check_done) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Unsupported feature before 2.11"); + ret = -1; + goto end_no_reply; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); + if (network_ret < (ssize_t) sizeof(msg)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid rotate_pending struct size : %zi", + network_ret); + } + ret = -1; + goto end_no_reply; + } + + chunk_id = be64toh(msg.chunk_id); + DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + + /* + * Iterate over all the streams in the session and check if they are + * still waiting for data to perform their rotation. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + node.node) { + if (!stream_get(stream)) { + continue; + } + if (stream->trace->session != session) { + stream_put(stream); + continue; + } + pthread_mutex_lock(&stream->lock); + if (stream->rotate_at_seq_num != -1ULL) { + /* We have not yet performed the rotation. */ + rotate_pending = true; + DBG("Stream %" PRIu64 " is still rotating", + stream->stream_handle); + } else if (stream->chunk_id < chunk_id) { + /* + * Stream closed on the consumer but still active on the + * relay. + */ + rotate_pending = true; + DBG("Stream %" PRIu64 " did not exist on the consumer " + "when the last rotation started, but is" + "still waiting for data before getting" + "closed", + stream->stream_handle); + } + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + if (rotate_pending) { + goto send_reply; + } + } + +send_reply: + rcu_read_unlock(); + memset(&reply, 0, sizeof(reply)); + reply.ret_code = htobe32(rotate_pending ? 1 : 0); + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(reply), 0); + if (network_ret < (ssize_t) sizeof(reply)) { + ERR("Relay rotate pending ret code failed"); + ret = -1; + } + +end_no_reply: + return ret; +} + /* * Process the commands received on the control socket */ @@ -2843,6 +2949,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_ROTATE_RENAME: ret = relay_rotate_rename(recv_hdr, conn); break; + case RELAYD_ROTATE_PENDING: + ret = relay_rotate_pending(recv_hdr, conn); + break; case RELAYD_MKDIR: ret = relay_mkdir(recv_hdr, conn); break; diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 444c43af0..9a733df1e 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -212,6 +212,10 @@ struct lttcomm_relayd_rotate_rename { char paths[]; } LTTNG_PACKED; +struct lttcomm_relayd_rotate_pending { + uint64_t chunk_id; +} LTTNG_PACKED; + struct lttcomm_relayd_mkdir { /* Includes trailing NULL */ uint32_t length; diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 78c7f5e2b..22d04770f 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -127,6 +127,8 @@ enum lttcomm_relayd_command { RELAYD_ROTATE_STREAM = 18, /* Rename a chunk after the rotation is completed (2.11+) */ RELAYD_ROTATE_RENAME = 19, + /* Check if a chunk has data pending (2.11+) */ + RELAYD_ROTATE_PENDING = 20, /* Create a folder on the relayd FS (2.11+) */ RELAYD_MKDIR = 21, }; -- 2.34.1