From cab9e191da0a18677ea5b4a0a5f47f430a2637b2 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:10:38 +0800 Subject: [PATCH] perf(log): avoid too many checkpoint at the same time (#2129) Signed-off-by: Ning Yu --- .../kafka/log/streamaspect/ElasticUnifiedLog.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala index 44d425bc13..6e406d1e56 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala @@ -84,11 +84,14 @@ class ElasticUnifiedLog(_logStartOffset: Long, rst } - def tryCheckpoint(): Unit = { + def tryCheckpoint(): Boolean = { if (checkpointIntervalBytes > 0) { checkpointIntervalBytes = 0 lastCheckpointTimestamp = time.milliseconds() checkpoint() + true + } else { + false } } @@ -306,7 +309,10 @@ object ElasticUnifiedLog extends Logging { DirtyBytes.reset() for (log <- Logs.values().asScala) { try { - log.tryCheckpoint() + if (log.tryCheckpoint()) { + // sleep a while to avoid too many checkpoint at the same time, which may cause high append latency + Thread.sleep(10) + } } catch { case e: Throwable => error("Error while checkpoint", e) }