diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java index 4d18c9f59d..0fc015172f 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/AutoMQIdentityReplicationPolicy.java @@ -33,7 +33,6 @@ public String offsetSyncsTopic(String clusterAlias) { if (offsetSyncsTopic == null) { return super.offsetSyncsTopic(clusterAlias); } - log.info("Using offset syncs topic: {}", offsetSyncsTopic); return offsetSyncsTopic; } @@ -43,7 +42,6 @@ public String checkpointsTopic(String clusterAlias) { if (checkpointsTopic == null) { return super.checkpointsTopic(clusterAlias); } - log.info("Using checkpoints topic: {}", checkpointsTopic); return checkpointsTopic; } @@ -53,7 +51,22 @@ public String heartbeatsTopic() { if (heartbeatsTopic == null) { return super.heartbeatsTopic(); } - log.info("Using heartbeats topic: {}", heartbeatsTopic); return heartbeatsTopic; } + + @Override + public boolean isCheckpointsTopic(String topic) { + String checkpointsTopic = System.getenv(CHECKPOINTS_TOPIC_ENV_KEY); + return super.isCheckpointsTopic(topic) || topic.equals(checkpointsTopic); + } + + @Override + public boolean isHeartbeatsTopic(String topic) { + return super.isHeartbeatsTopic(topic) || topic.equals(heartbeatsTopic()); + } + + @Override + public boolean isMM2InternalTopic(String topic) { + return super.isMM2InternalTopic(topic) || isHeartbeatsTopic(topic) || isCheckpointsTopic(topic); + } }