From 5330d7a7c7581c71eeaf775bcb3ae3cc29dfbfce Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 22 Nov 2024 16:56:11 +0800 Subject: [PATCH] fix: release `FetchResult`s if failed to fetch Signed-off-by: Ning Yu --- .../scala/kafka/log/streamaspect/ElasticLogFileRecords.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index 020b928863..0d3d93e3a2 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -142,6 +142,12 @@ private CompletableFuture readAll0(FetchContext context, long startOffs } List results = new LinkedList<>(); return fetch0(context, nextFetchOffset, endOffset, maxSize, results) + .whenComplete((nil, e) -> { + if (e != null) { + results.forEach(FetchResult::free); + results.clear(); + } + }) .thenApply(nil -> PooledMemoryRecords.of(baseOffset, results, context.readOptions().pooledBuf())); }