diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/AsyncAWSResponseTransformer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/AsyncAWSResponseTransformer.java new file mode 100644 index 00000000000..7fb26be8c0e --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/AsyncAWSResponseTransformer.java @@ -0,0 +1,97 @@ +package io.deephaven.parquet.table.util; + +import io.deephaven.base.verify.Assert; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * An {@link AsyncResponseTransformer} that transforms a response into a {@link ByteBuffer}. + * + * @param POJO response type. + */ +public final class AsyncAWSResponseTransformer implements AsyncResponseTransformer { + + private volatile CompletableFuture cf; + private ResponseT response; + private final ByteBuffer byteBuffer; + + AsyncAWSResponseTransformer(final int bufferSize) { + // TODO Can be improved with a buffer pool + byteBuffer = ByteBuffer.allocate(bufferSize); + } + + @Override + public CompletableFuture prepare() { + cf = new CompletableFuture<>(); + return cf; + } + + @Override + public void onResponse(ResponseT response) { + this.response = response; + } + + /** + * @return the unmarshalled response object from the service. + */ + public ResponseT response() { + return response; + } + + @Override + public void onStream(SdkPublisher publisher) { + publisher.subscribe(new ByteBuferSubscriber(cf, byteBuffer)); + } + + @Override + public void exceptionOccurred(Throwable throwable) { + cf.completeExceptionally(throwable); + } + + final static class ByteBuferSubscriber implements Subscriber { + private final CompletableFuture resultFuture; + + private Subscription subscription; + + private final ByteBuffer byteBuffer; + + ByteBuferSubscriber(CompletableFuture resultFuture, ByteBuffer byteBuffer) { + this.resultFuture = resultFuture; + this.byteBuffer = byteBuffer; + } + + @Override + public void onSubscribe(final Subscription s) { + if (subscription != null) { + s.cancel(); + return; + } + subscription = s; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(final ByteBuffer responseBytes) { + // Assuming responseBytes will fit in the buffer + Assert.assertion(responseBytes.remaining() <= byteBuffer.remaining(), + "responseBytes.remaining() <= byteBuffer.remaining()"); + byteBuffer.put(responseBytes); + subscription.request(1); + } + + @Override + public void onError(final Throwable throwable) { + resultFuture.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + resultFuture.complete(byteBuffer); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java index c60c5658d9d..cdeef998496 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/util/S3SeekableByteChannel.java @@ -5,8 +5,6 @@ import io.deephaven.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.BytesWrapper; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.io.IOException; @@ -160,8 +158,7 @@ private CompletableFuture computeFragmentFuture(final int fragmentIn .bucket(bucket) .key(key) .range(range), - AsyncResponseTransformer.toBytes()) - .thenApply(BytesWrapper::asByteBuffer); + new AsyncAWSResponseTransformer<>(maxFragmentSize)); } /**