Skip to content

Commit

Permalink
fix(stream): release FetchResults if the subsequent fetch fails (#2172
Browse files Browse the repository at this point in the history
)

* fix(stream): release `FetchResult`s if the subsequent fetch fails

Signed-off-by: Ning Yu <[email protected]>

* revert: "fix(stream): release `FetchResult`s if the subsequent fetch fails"

This reverts commit 5836a6a.

* refactor: add the `FetchResult` into the list in order rather than in reverse order

Signed-off-by: Ning Yu <[email protected]>

* fix: release `FetchResult`s if failed to fetch

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 22, 2024
1 parent 1e217ee commit e9bbf7a
Showing 1 changed file with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,35 @@ private CompletableFuture<Records> readAll0(FetchContext context, long startOffs
if (nextFetchOffset >= endOffset) {
return CompletableFuture.completedFuture(MemoryRecords.EMPTY);
}
return fetch0(context, nextFetchOffset, endOffset, maxSize)
.thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf()));
List<FetchResult> results = new LinkedList<>();
return fetch0(context, nextFetchOffset, endOffset, maxSize, results)
.whenComplete((nil, e) -> {
if (e != null) {
results.forEach(FetchResult::free);
results.clear();
}
})
.thenApply(nil -> PooledMemoryRecords.of(baseOffset, results, context.readOptions().pooledBuf()));
}

private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) {
/**
* Fetch records from the {@link ElasticStreamSlice}
*
* @param context fetch context
* @param startOffset start offset
* @param endOffset end offset
* @param maxSize max size of the fetched records
* @param results result list to be filled
* @return a future that completes when reaching the end offset or the max size
*/
private CompletableFuture<Void> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize, List<FetchResult> results) {
if (startOffset >= endOffset || maxSize <= 0) {
return CompletableFuture.completedFuture(new LinkedList<>());
return CompletableFuture.completedFuture(null);
}
int adjustedMaxSize = Math.min(maxSize, 1024 * 1024);
return streamSlice.fetch(context, startOffset, endOffset, adjustedMaxSize)
.thenCompose(rst -> {
results.add(rst);
long nextFetchOffset = startOffset;
int readSize = 0;
for (RecordBatchWithContext recordBatchWithContext : rst.recordBatchList()) {
Expand All @@ -163,12 +181,7 @@ private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context,
}
readSize += recordBatchWithContext.rawPayload().remaining();
}
return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize)
.thenApply(rstList -> {
// add to first since we need to reverse the order.
rstList.addFirst(rst);
return rstList;
});
return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize, results);
});
}

Expand Down

0 comments on commit e9bbf7a

Please sign in to comment.