X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=45b7de8c657f9b43a426057fc07f7a299689dc75;hp=e8283f037e43a3a9429b57ec0e606a44097192f2;hb=4f5fb4c3d8752aae822ed0066784cc77e6f0f508;hpb=d3ecc5503007bc81faa8049fac945f163b6356f3 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index e8283f037..45b7de8c6 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2789,6 +2789,112 @@ end_no_reply: return ret; } +/* + * Check if all the streams in the session have completed the last rotation. + * The chunk_id value is used to distinguish the cases where a stream was + * closed on the consumerd before the rotation started but it still active on + * the relayd, and the case where a stream appeared on the consumerd/relayd + * after the last rotation started (in that case, it is already writing in the + * new chunk folder). + */ +static +int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + struct relay_session *session = conn->session; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_generic_reply reply; + struct lttng_ht_iter iter; + struct relay_stream *stream; + int ret = 0; + ssize_t network_ret; + uint64_t chunk_id; + bool rotate_pending = false; + + DBG("Rotate pending command received"); + + if (!session || !conn->version_check_done) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_reply; + } + + if (session->major == 2 && session->minor < 11) { + ERR("Unsupported feature before 2.11"); + ret = -1; + goto end_no_reply; + } + + network_ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); + if (network_ret < (ssize_t) sizeof(msg)) { + if (network_ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid rotate_pending struct size : %zi", + network_ret); + } + ret = -1; + goto end_no_reply; + } + + chunk_id = be64toh(msg.chunk_id); + DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + + /* + * Iterate over all the streams in the session and check if they are + * still waiting for data to perform their rotation. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + node.node) { + if (!stream_get(stream)) { + continue; + } + if (stream->trace->session != session) { + stream_put(stream); + continue; + } + pthread_mutex_lock(&stream->lock); + if (stream->rotate_at_seq_num != -1ULL) { + /* We have not yet performed the rotation. */ + rotate_pending = true; + DBG("Stream %" PRIu64 " is still rotating", + stream->stream_handle); + } else if (stream->chunk_id < chunk_id) { + /* + * Stream closed on the consumer but still active on the + * relay. + */ + rotate_pending = true; + DBG("Stream %" PRIu64 " did not exist on the consumer " + "when the last rotation started, but is" + "still waiting for data before getting" + "closed", + stream->stream_handle); + } + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + if (rotate_pending) { + goto send_reply; + } + } + +send_reply: + rcu_read_unlock(); + memset(&reply, 0, sizeof(reply)); + reply.ret_code = htobe32(rotate_pending ? 1 : 0); + network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(reply), 0); + if (network_ret < (ssize_t) sizeof(reply)) { + ERR("Relay rotate pending ret code failed"); + ret = -1; + } + +end_no_reply: + return ret; +} + /* * Process the commands received on the control socket */ @@ -2843,6 +2949,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_ROTATE_RENAME: ret = relay_rotate_rename(recv_hdr, conn); break; + case RELAYD_ROTATE_PENDING: + ret = relay_rotate_pending(recv_hdr, conn); + break; case RELAYD_MKDIR: ret = relay_mkdir(recv_hdr, conn); break;