Skip to content

Commit

Permalink
Added an async response transfer to save on copies
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Dec 22, 2023
1 parent d3e9da8 commit de239f8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.deephaven.parquet.table.util;

import io.deephaven.base.verify.Assert;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* An {@link AsyncResponseTransformer} that transforms a response into a {@link ByteBuffer}.
*
* @param <ResponseT> POJO response type.
*/
public final class AsyncAWSResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ByteBuffer> {

private volatile CompletableFuture<ByteBuffer> cf;
private ResponseT response;
private final ByteBuffer byteBuffer;

AsyncAWSResponseTransformer(final int bufferSize) {
// TODO Can be improved with a buffer pool
byteBuffer = ByteBuffer.allocate(bufferSize);
}

@Override
public CompletableFuture<ByteBuffer> prepare() {
cf = new CompletableFuture<>();
return cf;
}

@Override
public void onResponse(ResponseT response) {
this.response = response;
}

/**
* @return the unmarshalled response object from the service.
*/
public ResponseT response() {
return response;
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(new ByteBuferSubscriber(cf, byteBuffer));
}

@Override
public void exceptionOccurred(Throwable throwable) {
cf.completeExceptionally(throwable);
}

final static class ByteBuferSubscriber implements Subscriber<ByteBuffer> {
private final CompletableFuture<ByteBuffer> resultFuture;

private Subscription subscription;

private final ByteBuffer byteBuffer;

ByteBuferSubscriber(CompletableFuture<ByteBuffer> resultFuture, ByteBuffer byteBuffer) {
this.resultFuture = resultFuture;
this.byteBuffer = byteBuffer;
}

@Override
public void onSubscribe(final Subscription s) {
if (subscription != null) {
s.cancel();
return;
}
subscription = s;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(final ByteBuffer responseBytes) {
// Assuming responseBytes will fit in the buffer
Assert.assertion(responseBytes.remaining() <= byteBuffer.remaining(),
"responseBytes.remaining() <= byteBuffer.remaining()");
byteBuffer.put(responseBytes);
subscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
resultFuture.completeExceptionally(throwable);
}

@Override
public void onComplete() {
resultFuture.complete(byteBuffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import io.deephaven.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.io.IOException;
Expand Down Expand Up @@ -160,8 +158,7 @@ private CompletableFuture<ByteBuffer> computeFragmentFuture(final int fragmentIn
.bucket(bucket)
.key(key)
.range(range),
AsyncResponseTransformer.toBytes())
.thenApply(BytesWrapper::asByteBuffer);
new AsyncAWSResponseTransformer<>(maxFragmentSize));
}

/**
Expand Down

0 comments on commit de239f8

Please sign in to comment.