From 2808826e3c69ab54a36860cc0fcc5afa6ca65f45 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:09:53 +0800 Subject: [PATCH] fix(stream): release `FetchResult`s if the subsequent fetch fails (#2172) * fix(stream): release `FetchResult`s if the subsequent fetch fails Signed-off-by: Ning Yu * revert: "fix(stream): release `FetchResult`s if the subsequent fetch fails" This reverts commit 5836a6afa09ff0139680f96231c49828293b3bc8. * refactor: add the `FetchResult` into the list in order rather than in reverse order Signed-off-by: Ning Yu * fix: release `FetchResult`s if failed to fetch Signed-off-by: Ning Yu --------- Signed-off-by: Ning Yu --- .../streamaspect/ElasticLogFileRecords.java | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index 0a52159d01..71b095e0f6 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -137,17 +137,35 @@ private CompletableFuture readAll0(FetchContext context, long startOffs if (nextFetchOffset >= endOffset) { return CompletableFuture.completedFuture(MemoryRecords.EMPTY); } - return fetch0(context, nextFetchOffset, endOffset, maxSize) - .thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf())); + List results = new LinkedList<>(); + return fetch0(context, nextFetchOffset, endOffset, maxSize, results) + .whenComplete((nil, e) -> { + if (e != null) { + results.forEach(FetchResult::free); + results.clear(); + } + }) + .thenApply(nil -> PooledMemoryRecords.of(baseOffset, results, context.readOptions().pooledBuf())); } - private CompletableFuture> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) { + /** + * Fetch records from the {@link ElasticStreamSlice} + * + * @param context fetch context + * @param startOffset start offset + * @param endOffset end offset + * @param maxSize max size of the fetched records + * @param results result list to be filled + * @return a future that completes when reaching the end offset or the max size + */ + private CompletableFuture fetch0(FetchContext context, long startOffset, long endOffset, int maxSize, List results) { if (startOffset >= endOffset || maxSize <= 0) { - return CompletableFuture.completedFuture(new LinkedList<>()); + return CompletableFuture.completedFuture(null); } int adjustedMaxSize = Math.min(maxSize, 1024 * 1024); return streamSlice.fetch(context, startOffset, endOffset, adjustedMaxSize) .thenCompose(rst -> { + results.add(rst); long nextFetchOffset = startOffset; int readSize = 0; for (RecordBatchWithContext recordBatchWithContext : rst.recordBatchList()) { @@ -160,12 +178,7 @@ private CompletableFuture> fetch0(FetchContext context, } readSize += recordBatchWithContext.rawPayload().remaining(); } - return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize) - .thenApply(rstList -> { - // add to first since we need to reverse the order. - rstList.addFirst(rst); - return rstList; - }); + return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize, results); }); }