diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala index 44d425bc13..6e406d1e56 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala @@ -84,11 +84,14 @@ class ElasticUnifiedLog(_logStartOffset: Long, rst } - def tryCheckpoint(): Unit = { + def tryCheckpoint(): Boolean = { if (checkpointIntervalBytes > 0) { checkpointIntervalBytes = 0 lastCheckpointTimestamp = time.milliseconds() checkpoint() + true + } else { + false } } @@ -306,7 +309,10 @@ object ElasticUnifiedLog extends Logging { DirtyBytes.reset() for (log <- Logs.values().asScala) { try { - log.tryCheckpoint() + if (log.tryCheckpoint()) { + // sleep a while to avoid too many checkpoint at the same time, which may cause high append latency + Thread.sleep(10) + } } catch { case e: Throwable => error("Error while checkpoint", e) }