From 618bd6a7ff5cff4e648856809cc0ca74320914e3 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 14 May 2024 13:22:30 -0500 Subject: [PATCH] Merged with Devin's change --- extensions/parquet/table/gradle.properties | 2 +- ...ava => ParquetS3SimpleLocalStackTest.java} | 2 +- ...est.java => ParquetS3SimpleMinIOTest.java} | 2 +- ...Base.java => ParquetS3SimpleTestBase.java} | 10 +- ...3SeekableChannelSimpleLocalStackTest.java} | 2 +- ... => S3SeekableChannelSimpleMinIOTest.java} | 2 +- ...a => S3SeekableChannelSimpleTestBase.java} | 56 ++----- .../s3/S3SeekableChannelTestBase.java | 157 ------------------ .../testlib/S3SeekableChannelTestSetup.java | 92 ++++++++++ 9 files changed, 119 insertions(+), 206 deletions(-) rename extensions/parquet/table/src/test/java/io/deephaven/parquet/table/{ParquetLocalStackS3Test.java => ParquetS3SimpleLocalStackTest.java} (91%) rename extensions/parquet/table/src/test/java/io/deephaven/parquet/table/{ParquetMinIOS3Test.java => ParquetS3SimpleMinIOTest.java} (93%) rename extensions/parquet/table/src/test/java/io/deephaven/parquet/table/{ParquetS3TestBase.java => ParquetS3SimpleTestBase.java} (97%) rename extensions/s3/src/test/java/io/deephaven/extensions/s3/{S3SeekableChannelLocalStackTest.java => S3SeekableChannelSimpleLocalStackTest.java} (89%) rename extensions/s3/src/test/java/io/deephaven/extensions/s3/{S3SeekableChannelMinIOTest.java => S3SeekableChannelSimpleMinIOTest.java} (92%) rename extensions/s3/src/test/java/io/deephaven/extensions/s3/{S3SeekableChannelCommonTests.java => S3SeekableChannelSimpleTestBase.java} (62%) delete mode 100644 extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java create mode 100644 extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java diff --git a/extensions/parquet/table/gradle.properties b/extensions/parquet/table/gradle.properties index ad20d83282d..e170c13d14b 100644 --- a/extensions/parquet/table/gradle.properties +++ b/extensions/parquet/table/gradle.properties @@ -2,4 +2,4 @@ io.deephaven.project.ProjectType=JAVA_PUBLIC # TODO(deephaven-core#5115): EPIC: Dependency management testcontainers.localstack.image=localstack/localstack:3.1.0 -testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z \ No newline at end of file +testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetLocalStackS3Test.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleLocalStackTest.java similarity index 91% rename from extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetLocalStackS3Test.java rename to extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleLocalStackTest.java index dccf4bb38a2..c4a5ac6b5d7 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetLocalStackS3Test.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleLocalStackTest.java @@ -9,7 +9,7 @@ import org.junit.BeforeClass; import software.amazon.awssdk.services.s3.S3AsyncClient; -public class ParquetLocalStackS3Test extends ParquetS3TestBase { +public class ParquetS3SimpleLocalStackTest extends ParquetS3SimpleTestBase { @BeforeClass public static void initContainer() { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetMinIOS3Test.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleMinIOTest.java similarity index 93% rename from extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetMinIOS3Test.java rename to extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleMinIOTest.java index 60e6c573989..30bdcb0eda7 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetMinIOS3Test.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleMinIOTest.java @@ -12,7 +12,7 @@ import static org.junit.Assert.assertFalse; -public class ParquetMinIOS3Test extends ParquetS3TestBase { +public class ParquetS3SimpleMinIOTest extends ParquetS3SimpleTestBase { @BeforeClass public static void initContainer() { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3TestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleTestBase.java similarity index 97% rename from extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3TestBase.java rename to extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleTestBase.java index 0659b2e7e78..e78978c69c9 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3TestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetS3SimpleTestBase.java @@ -12,7 +12,7 @@ import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.Credentials; import io.deephaven.extensions.s3.S3Instructions; -import io.deephaven.extensions.s3.S3SeekableChannelTestBase; +import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.extensions.s3.testlib.S3Helper; import io.deephaven.test.types.OutOfBandTest; import org.junit.After; @@ -37,9 +37,9 @@ import static org.junit.Assert.assertTrue; @Category(OutOfBandTest.class) -abstract class ParquetS3TestBase extends S3SeekableChannelTestBase { +abstract class ParquetS3SimpleTestBase extends S3SeekableChannelTestSetup { - private static final File rootDir = new File(ParquetS3TestBase.class.getName() + "_root"); + private static final File rootDir = new File(ParquetS3SimpleTestBase.class.getName() + "_root"); // The following tests are disabled by default, as they are verifying against a remote system private static final boolean ENABLE_REMOTE_S3_TESTING = false; @@ -49,7 +49,7 @@ abstract class ParquetS3TestBase extends S3SeekableChannelTestBase { @Before public void setUp() throws ExecutionException, InterruptedException, TimeoutException { - super.setUp(); + super.doSetUp(); if (rootDir.exists()) { FileUtils.deleteRecursively(rootDir); } @@ -59,7 +59,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce @After public void tearDown() throws ExecutionException, InterruptedException, TimeoutException { - super.tearDown(); + super.doTearDown(); FileUtils.deleteRecursively(rootDir); } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleLocalStackTest.java similarity index 89% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleLocalStackTest.java index e77a6e2b203..b654ef60a91 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleLocalStackTest.java @@ -10,7 +10,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") -public class S3SeekableChannelLocalStackTest extends S3SeekableChannelCommonTests { +public class S3SeekableChannelSimpleLocalStackTest extends S3SeekableChannelSimpleTestBase { @BeforeAll static void initContainer() { diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleMinIOTest.java similarity index 92% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleMinIOTest.java index f8eb998c6ff..1a91cd0c860 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleMinIOTest.java @@ -12,7 +12,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") -public class S3SeekableChannelMinIOTest extends S3SeekableChannelCommonTests { +public class S3SeekableChannelSimpleMinIOTest extends S3SeekableChannelSimpleTestBase { @BeforeAll static void initContainer() { diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelCommonTests.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java similarity index 62% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelCommonTests.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java index b622a3c53d3..601b41812ce 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelCommonTests.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java @@ -3,11 +3,12 @@ // package io.deephaven.extensions.s3; -import io.deephaven.extensions.s3.testlib.S3Helper; +import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.util.channel.CachedChannelProvider; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; -import io.deephaven.util.channel.SeekableChannelsProviderPlugin; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.async.AsyncRequestBody; import java.io.IOException; @@ -15,17 +16,24 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; -abstract class S3SeekableChannelCommonTests extends S3SeekableChannelTestBase { +abstract class S3SeekableChannelSimpleTestBase extends S3SeekableChannelTestSetup { + + @BeforeEach + void setUp() throws ExecutionException, InterruptedException, TimeoutException { + doSetUp(); + } + + @AfterEach + void tearDown() throws ExecutionException, InterruptedException, TimeoutException { + doTearDown(); + } @Test void readSimpleFiles() @@ -36,7 +44,7 @@ void readSimpleFiles() final ByteBuffer buffer = ByteBuffer.allocate(1); try ( final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { assertThat(readChannel.read(buffer)).isEqualTo(-1); @@ -46,7 +54,7 @@ void readSimpleFiles() final URI uri = uri("hello/world.txt"); try ( final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { final ByteBuffer bytes = readAll(readChannel, 32); @@ -68,7 +76,7 @@ public int read() { final ByteBuffer buffer = ByteBuffer.allocate(1); try ( final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { for (long p = 0; p < numBytes; ++p) { @@ -79,34 +87,4 @@ public int read() { assertThat(readChannel.read(buffer)).isEqualTo(-1); } } - - private void uploadDirectory(String resourceDir) - throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException { - S3Helper.uploadDirectory( - asyncClient, - Path.of(io.deephaven.extensions.s3.S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()), - bucket, - null, - Duration.ofSeconds(5)); - } - - private SeekableChannelsProvider providerImpl(URI uri) { - final SeekableChannelsProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); - final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); - return plugin.createProvider(uri, instructions); - } - - private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { - final ByteBuffer dst = ByteBuffer.allocate(maxBytes); - while (dst.remaining() > 0 && channel.read(dst) != -1) { - // continue - } - if (dst.remaining() == 0) { - if (channel.read(ByteBuffer.allocate(1)) != -1) { - throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); - } - } - dst.flip(); - return dst; - } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java deleted file mode 100644 index 0075568581b..00000000000 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java +++ /dev/null @@ -1,157 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.extensions.s3; - - -import io.deephaven.extensions.s3.testlib.S3Helper; -import io.deephaven.util.channel.CachedChannelProvider; -import io.deephaven.util.channel.SeekableChannelContext; -import io.deephaven.util.channel.SeekableChannelsProvider; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.SeekableByteChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.assertj.core.api.Assertions.assertThat; - -public abstract class S3SeekableChannelTestBase { - - public abstract S3AsyncClient s3AsyncClient(); - - public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); - - private ExecutorService executor; - private S3AsyncClient asyncClient; - private String bucket; - - @BeforeEach - void setUp() throws ExecutionException, InterruptedException, TimeoutException { - executor = Executors.newCachedThreadPool(); - bucket = UUID.randomUUID().toString(); - asyncClient = s3AsyncClient(); - asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); - } - - @AfterEach - void tearDown() throws ExecutionException, InterruptedException, TimeoutException { - S3Helper.deleteAllKeys(asyncClient, bucket); - asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); - asyncClient.close(); - executor.shutdownNow(); - } - - @Test - void readSimpleFiles() - throws IOException, URISyntaxException, ExecutionException, InterruptedException, TimeoutException { - uploadDirectory("readSimpleFiles"); - { - final URI uri = uri("empty.txt"); - final ByteBuffer buffer = ByteBuffer.allocate(1); - try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); - final SeekableChannelContext context = provider.makeContext(); - final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { - assertThat(readChannel.read(buffer)).isEqualTo(-1); - } - } - { - final URI uri = uri("hello/world.txt"); - try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); - final SeekableChannelContext context = provider.makeContext(); - final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { - final ByteBuffer bytes = readAll(readChannel, 32); - assertThat(bytes).isEqualTo(ByteBuffer.wrap("Hello, world!".getBytes(StandardCharsets.UTF_8))); - } - } - } - - @Test - void read32MiB() throws IOException, ExecutionException, InterruptedException, TimeoutException { - final int numBytes = 33554432; - putObject("32MiB.bin", AsyncRequestBody.fromInputStream(new InputStream() { - @Override - public int read() { - return 42; - } - }, (long) numBytes, executor)); - final URI uri = uri("32MiB.bin"); - final ByteBuffer buffer = ByteBuffer.allocate(1); - try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); - final SeekableChannelContext context = provider.makeContext(); - final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { - for (long p = 0; p < numBytes; ++p) { - assertThat(readChannel.read(buffer)).isEqualTo(1); - assertThat(buffer.get(0)).isEqualTo((byte) 42); - buffer.clear(); - } - assertThat(readChannel.read(buffer)).isEqualTo(-1); - } - } - - private void uploadDirectory(String resourceDir) - throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException { - S3Helper.uploadDirectory( - asyncClient, - Path.of(S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()), - bucket, - null, - Duration.ofSeconds(5)); - } - - private URI uri(String key) { - return URI.create(String.format("s3://%s/%s", bucket, key)); - } - - private void putObject(String key, AsyncRequestBody body) - throws ExecutionException, InterruptedException, TimeoutException { - asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5, - TimeUnit.SECONDS); - } - - private SeekableChannelsProvider providerImpl(URI uri) { - final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); - final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); - return plugin.createProvider(uri, instructions); - } - - private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { - final ByteBuffer dst = ByteBuffer.allocate(maxBytes); - while (dst.remaining() > 0 && channel.read(dst) != -1) { - // continue - } - if (dst.remaining() == 0) { - if (channel.read(ByteBuffer.allocate(1)) != -1) { - throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); - } - } - dst.flip(); - return dst; - } -} diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java new file mode 100644 index 00000000000..78e306cc782 --- /dev/null +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java @@ -0,0 +1,92 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3.testlib; + +import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin; +import io.deephaven.util.channel.SeekableChannelsProvider; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Path; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class S3SeekableChannelTestSetup { + + protected ExecutorService executor; + protected S3AsyncClient asyncClient; + protected String bucket; + + protected abstract S3AsyncClient s3AsyncClient(); + + protected abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); + + protected final void doSetUp() throws ExecutionException, InterruptedException, TimeoutException { + executor = Executors.newCachedThreadPool(); + bucket = UUID.randomUUID().toString(); + asyncClient = s3AsyncClient(); + asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + } + + protected final void doTearDown() throws ExecutionException, InterruptedException, TimeoutException { + S3Helper.deleteAllKeys(asyncClient, bucket); + asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + asyncClient.close(); + executor.shutdownNow(); + } + + protected final void uploadDirectory(String resourceDir) + throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException { + S3Helper.uploadDirectory( + asyncClient, + Path.of(S3SeekableChannelTestSetup.class.getResource(resourceDir).toURI()), + bucket, + null, + Duration.ofSeconds(5)); + } + + protected final URI uri(String key) { + return URI.create(String.format("s3://%s/%s", bucket, key)); + } + + protected final void putObject(String key, AsyncRequestBody body) + throws ExecutionException, InterruptedException, TimeoutException { + asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5, + TimeUnit.SECONDS); + } + + protected final SeekableChannelsProvider providerImpl(URI uri) { + final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); + final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); + return plugin.createProvider(uri, instructions); + } + + protected static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { + final ByteBuffer dst = ByteBuffer.allocate(maxBytes); + while (dst.remaining() > 0 && channel.read(dst) != -1) { + // continue + } + if (dst.remaining() == 0) { + if (channel.read(ByteBuffer.allocate(1)) != -1) { + throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); + } + } + dst.flip(); + return dst; + } +}