diff --git a/extensions/parquet/table/build.gradle b/extensions/parquet/table/build.gradle index aede6e888b5..93b770e35c2 100644 --- a/extensions/parquet/table/build.gradle +++ b/extensions/parquet/table/build.gradle @@ -44,15 +44,19 @@ dependencies { testImplementation project(':base-test-utils') testImplementation project(':engine-test-utils') + testImplementation TestTools.projectDependency(project, 'extensions-s3') Classpaths.inheritJUnitClassic(project, 'testImplementation') + testImplementation "org.testcontainers:testcontainers:1.19.4" + testImplementation "org.testcontainers:localstack:1.19.4" + testImplementation "org.testcontainers:minio:1.19.4" + testRuntimeOnly project(':log-to-slf4j'), project(path: ':configs'), project(path: ':test-configs') Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly') runtimeOnly project(':extensions-trackedfile') - testImplementation project(':extensions-s3') brotliTestImplementation project(':extensions-parquet-table') brotliTestImplementation('com.github.rdblue:brotli-codec:0.1.1') @@ -70,3 +74,6 @@ if (Architecture.fromHost() == Architecture.AMD64) { } TestTools.addEngineOutOfBandTest(project) + +testOutOfBand.systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image') +testOutOfBand.systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image') diff --git a/extensions/parquet/table/gradle.properties b/extensions/parquet/table/gradle.properties index c186bbfdde1..e170c13d14b 100644 --- a/extensions/parquet/table/gradle.properties +++ b/extensions/parquet/table/gradle.properties @@ -1 +1,5 @@ io.deephaven.project.ProjectType=JAVA_PUBLIC + +# TODO(deephaven-core#5115): EPIC: Dependency management +testcontainers.localstack.image=localstack/localstack:3.1.0 +testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index e7e773e1bfe..3b28644c691 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -224,6 +224,12 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par */ public abstract ParquetInstructions withTableDefinition(final TableDefinition tableDefinition); + /** + * Creates a new {@link ParquetInstructions} object with the same properties as the current object but layout set as + * the provided {@link ParquetFileLayout}. + */ + public abstract ParquetInstructions withLayout(final ParquetFileLayout fileLayout); + /** * Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition * and layout set as the provided values. @@ -350,18 +356,23 @@ public Optional>> getIndexColumns() { } @Override - public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) { - return withTableDefinitionAndLayout(tableDefinition, null); + public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) { + return withTableDefinitionAndLayout(useDefinition, null); + } + + @Override + public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) { + return withTableDefinitionAndLayout(null, useLayout); } @Override public ParquetInstructions withTableDefinitionAndLayout( - @Nullable final TableDefinition tableDefinition, - @Nullable final ParquetFileLayout fileLayout) { + @Nullable final TableDefinition useDefinition, + @Nullable final ParquetFileLayout useLayout) { return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(), - fileLayout, tableDefinition, null); + useLayout, useDefinition, null); } @Override @@ -598,7 +609,12 @@ public Optional>> getIndexColumns() { @Override public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) { - return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null)); + return withTableDefinitionAndLayout(useDefinition, fileLayout); + } + + @Override + public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) { + return withTableDefinitionAndLayout(tableDefinition, useLayout); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index a5adb0915c5..23500d714a5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -150,7 +150,7 @@ public static Table readTable( return readTableFromFileUri(sourceURI, readInstructions); } if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) { - throw new UncheckedDeephavenException("We currently do not support reading parquet metadata files " + + throw new UnsupportedOperationException("We currently do not support reading parquet metadata files " + "from non local storage"); } if (!isDirectory) { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 69e5f2b6825..bd17da37691 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -9,7 +9,6 @@ import io.deephaven.api.SortColumn; import io.deephaven.base.FileUtils; import io.deephaven.base.verify.Assert; -import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.primitive.function.ByteConsumer; import io.deephaven.engine.primitive.function.CharConsumer; diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java new file mode 100644 index 00000000000..796776eda1b --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java @@ -0,0 +1,29 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table; + +import io.deephaven.extensions.s3.S3Instructions.Builder; +import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; +import io.deephaven.extensions.s3.testlib.SingletonContainers; +import org.junit.BeforeClass; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +public class S3ParquetLocalStackTest extends S3ParquetTestBase { + + @BeforeClass + public static void initContainer() { + // ensure container is started so container startup time isn't associated with a specific test + LocalStack.init(); + } + + @Override + public Builder s3Instructions(Builder builder) { + return LocalStack.s3Instructions(builder); + } + + @Override + public S3AsyncClient s3AsyncClient() { + return SingletonContainers.LocalStack.s3AsyncClient(); + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java new file mode 100644 index 00000000000..2b3cb60e609 --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java @@ -0,0 +1,33 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table; + +import io.deephaven.extensions.s3.S3Instructions.Builder; +import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; +import io.deephaven.extensions.s3.testlib.SingletonContainers; +import io.deephaven.stats.util.OSUtil; +import org.junit.Assume; +import org.junit.BeforeClass; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +public class S3ParquetMinIOTest extends S3ParquetTestBase { + + @BeforeClass + public static void initContainer() { + // TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X + Assume.assumeFalse("OSUtil.runningMacOS()", OSUtil.runningMacOS()); + // ensure container is started so container startup time isn't associated with a specific test + MinIO.init(); + } + + @Override + public Builder s3Instructions(final Builder builder) { + return MinIO.s3Instructions(builder); + } + + @Override + public S3AsyncClient s3AsyncClient() { + return SingletonContainers.MinIO.s3AsyncClient(); + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java new file mode 100644 index 00000000000..70df7cf5534 --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java @@ -0,0 +1,89 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table; + +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.extensions.s3.Credentials; +import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.test.types.OutOfBandTest; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.time.Duration; + +import static org.junit.Assert.assertEquals; + +/** + * These tests verify the behavior of Parquet implementation when reading against remote S3 servers. + **/ +@Category(OutOfBandTest.class) +public class S3ParquetRemoteTest { + + // The following tests are disabled by default, and should be run manually. + private static final boolean ENABLE_REMOTE_S3_TESTING = false; + + @Rule + public final EngineCleanup framework = new EngineCleanup(); + + @Test + public void readSampleParquetFilesFromPublicS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-2") + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .build(); + final TableDefinition tableDefinition = TableDefinition.of( + ColumnDefinition.ofString("hash"), + ColumnDefinition.ofLong("version"), + ColumnDefinition.ofLong("size"), + ColumnDefinition.ofString("block_hash"), + ColumnDefinition.ofLong("block_number"), + ColumnDefinition.ofLong("index"), + ColumnDefinition.ofLong("virtual_size"), + ColumnDefinition.ofLong("lock_time"), + ColumnDefinition.ofLong("input_count"), + ColumnDefinition.ofLong("output_count"), + ColumnDefinition.ofBoolean("isCoinbase"), + ColumnDefinition.ofDouble("output_value"), + ColumnDefinition.ofTime("last_modified"), + ColumnDefinition.ofDouble("input_value")); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .setTableDefinition(tableDefinition) + .build(); + ParquetTools.readTable( + "s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet", + readInstructions).head(10).select(); + + ParquetTools.readTable( + "s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet", + readInstructions).head(10).select(); + } + + @Test + public void readKeyValuePartitionedParquetFromPublicS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-1") + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .build(); + final TableDefinition ookla_table_definition = TableDefinition.of( + ColumnDefinition.ofInt("quarter").withPartitioning(), + ColumnDefinition.ofString("quadkey")); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .setTableDefinition(ookla_table_definition) + .build(); + final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023", + readInstructions).head(10).select(); + assertEquals(2, table.numColumns()); + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java new file mode 100644 index 00000000000..1aa86dc9b5d --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -0,0 +1,220 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table; + +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.engine.util.TableTools; +import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; +import io.deephaven.test.types.OutOfBandTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import software.amazon.awssdk.core.async.AsyncRequestBody; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.util.TableTools.merge; +import static io.deephaven.parquet.table.ParquetTools.writeKeyValuePartitionedTable; +import static org.junit.Assert.assertTrue; + +@Category(OutOfBandTest.class) +abstract class S3ParquetTestBase extends S3SeekableChannelTestSetup { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Rule + public final EngineCleanup framework = new EngineCleanup(); + + @Before + public void setUp() throws ExecutionException, InterruptedException, TimeoutException { + super.doSetUp(); + } + + @After + public void tearDown() throws ExecutionException, InterruptedException, TimeoutException { + super.doTearDown(); + } + + private static Table getTable(final int numRows) { + return TableTools.emptyTable(numRows).update( + "someIntColumn = (int) i", + "someDoubleColumn = (double) i", + "someStringColumn = String.valueOf(i)", + "someBooleanColumn = i % 2 == 0", + "someCharColumn = (char) (i % 26 + 'a')"); + } + + @Test + public final void readSingleParquetFile() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + final Table table = getTable(500_000); + final File dest = new File(folder.newFolder(), "table.parquet"); + ParquetTools.writeTable(table, dest.getAbsolutePath()); + putObject("table.parquet", AsyncRequestBody.fromFile(dest)); + + final URI uri = uri("table.parquet"); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + final Table fromS3 = ParquetTools.readTable(uri.toString(), readInstructions); + assertTableEquals(table, fromS3); + } + + @Test + public final void readFlatPartitionedParquetData() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + final Table table = getTable(100_000); + final String destDirName = "flatPartitionedDataDir"; + final File destDir = new File(folder.newFolder(), destDirName); + for (int i = 0; i < 3; ++i) { + final File dest = new File(destDir, "table" + i + ".parquet"); + ParquetTools.writeTable(table, dest.getAbsolutePath()); + } + final File pqFileToBeIgnored = new File(destDir, "temp/table.parquet"); + ParquetTools.writeTable(TableTools.emptyTable(100).select("someIntColumn = (int) i"), + pqFileToBeIgnored.getAbsolutePath()); + uploadDirectory(destDir.toPath(), destDirName); + final URI uri = uri(destDirName); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + + final Table expected = merge(table, table, table); + final Table fromS3AsFlat = ParquetTools.readTable(uri.toString(), + readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED)); + assertTableEquals(expected, fromS3AsFlat); + } + + @Test + public final void readFlatPartitionedParquetDataAsKVPartitioned() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + final Table table = getTable(100_000); + final String destDirName = "flatPartitionedDataDir"; + final File destDir = new File(folder.newFolder(), destDirName); + for (int i = 0; i < 3; ++i) { + final File dest = new File(destDir, "table" + i + ".parquet"); + ParquetTools.writeTable(table, dest.getAbsolutePath()); + } + uploadDirectory(destDir.toPath(), destDirName); + final URI uri = uri(destDirName); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + + final Table expected = merge(table, table, table); + final Table fromS3AsFlat = ParquetTools.readTable(uri.toString(), + readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED)); + assertTableEquals(expected, fromS3AsFlat); + + final Table fromS3AsKV = ParquetTools.readTable(uri.toString(), + readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)); + assertTableEquals(expected, fromS3AsKV); + } + + @Test + public void readKeyValuePartitionedParquetData() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofInt("someIntColumn"), + ColumnDefinition.ofString("someStringColumn")); + final Table table = ((QueryTable) TableTools.emptyTable(500_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "someIntColumn = (int) i", + "someStringColumn = String.valueOf(i)")) + .withDefinitionUnsafe(definition); + final String destDirName = "keyValuePartitionedDataDir"; + final File destDir = new File(folder.newFolder(), destDirName); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setBaseNameForPartitionedParquetData("data") + .build(); + writeKeyValuePartitionedTable(table, destDir.getPath(), writeInstructions); + uploadDirectory(destDir.toPath(), destDirName); + final URI uri = uri(destDirName); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .setTableDefinition(definition) + .build(); + final Table fromS3 = ParquetTools.readTable(uri.toString(), readInstructions); + assertTrue(fromS3.getDefinition().getColumn("PC1").isPartitioning()); + assertTrue(fromS3.getDefinition().getColumn("PC2").isPartitioning()); + assertTableEquals(table.sort("PC1", "PC2"), fromS3); + } + + @Test + public void readMetadataPartitionedParquetData() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofInt("someIntColumn"), + ColumnDefinition.ofString("someStringColumn")); + final Table table = ((QueryTable) TableTools.emptyTable(500_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "someIntColumn = (int) i", + "someStringColumn = String.valueOf(i)")) + .withDefinitionUnsafe(definition); + final String destDirName = "metadataPartitionedDataDir"; + final File destDir = new File(folder.newFolder(), destDirName); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setBaseNameForPartitionedParquetData("data") + .setGenerateMetadataFiles(true) + .build(); + writeKeyValuePartitionedTable(table, destDir.getPath(), writeInstructions); + assertTrue(new File(destDir, "_metadata").exists()); + assertTrue(new File(destDir, "_common_metadata").exists()); + uploadDirectory(destDir.toPath(), destDirName); + final URI metadataFileURI = uri(destDirName + "/_metadata"); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .setTableDefinition(definition) + .build(); + try { + ParquetTools.readTable(metadataFileURI.toString(), readInstructions); + Assert.fail("Exception expected for unsupported metadata file read from S3"); + } catch (UnsupportedOperationException e) { + } + final URI directoryURI = uri(destDirName); + try { + ParquetTools.readTable(directoryURI.toString(), + readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + Assert.fail("Exception expected for unsupported metadata file read from S3"); + } catch (UnsupportedOperationException e) { + } + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetS3.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetS3.java deleted file mode 100644 index 6181f8120fc..00000000000 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetS3.java +++ /dev/null @@ -1,190 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.parquet.table; - -import io.deephaven.configuration.Configuration; -import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.*; -import io.deephaven.engine.testutil.junit4.EngineCleanup; -import io.deephaven.engine.util.TableTools; -import io.deephaven.extensions.s3.Credentials; -import io.deephaven.extensions.s3.S3Instructions; -import io.deephaven.test.types.OutOfBandTest; - -import java.time.Duration; -import java.util.List; - -import org.junit.*; -import org.junit.experimental.categories.Category; - -import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; -import static io.deephaven.engine.util.TableTools.emptyTable; -import static org.junit.Assert.*; - -@Category(OutOfBandTest.class) -public final class TestParquetS3 { - - // TODO(deephaven-core#5064): Add support for local S3 testing - // The following tests are disabled by default, as they are verifying against a remote system - private static final boolean ENABLE_S3_TESTING = - Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", false); - - @Rule - public final EngineCleanup framework = new EngineCleanup(); - - - @Test - public void readSampleParquetFilesFromDeephavenS3Bucket() { - Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("us-east-1") - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.defaultCredentials()) - .build(); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .build(); - final Table fromAws1 = - ParquetTools.readTable("s3://dh-s3-parquet-test1/multiColFile.parquet", readInstructions).select(); - final Table dhTable1 = TableTools.emptyTable(1_000_000).update("A=(int)i", "B=(double)(i+1)"); - assertTableEquals(fromAws1, dhTable1); - - final Table fromAws2 = - ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions).select(); - final Table dhTable2 = TableTools.emptyTable(5).update("A=(int)i"); - assertTableEquals(fromAws2, dhTable2); - - final Table fromAws3 = ParquetTools - .readTable("s3://dh-s3-parquet-test1/single%20col%20file%20with%20spaces%20in%20name.parquet", - readInstructions) - .select(); - assertTableEquals(fromAws3, dhTable2); - - final Table fromAws4 = - ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions) - .select().sumBy(); - final Table dhTable4 = TableTools.emptyTable(5).update("A=(int)i").sumBy(); - assertTableEquals(fromAws4, dhTable4); - } - - @Test - public void readSampleParquetFilesFromPublicS3() { - Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("us-east-2") - .connectionTimeout(Duration.ofSeconds(1)) - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.anonymous()) - .build(); - final TableDefinition tableDefinition = TableDefinition.of( - ColumnDefinition.ofString("hash"), - ColumnDefinition.ofLong("version"), - ColumnDefinition.ofLong("size"), - ColumnDefinition.ofString("block_hash"), - ColumnDefinition.ofLong("block_number"), - ColumnDefinition.ofLong("index"), - ColumnDefinition.ofLong("virtual_size"), - ColumnDefinition.ofLong("lock_time"), - ColumnDefinition.ofLong("input_count"), - ColumnDefinition.ofLong("output_count"), - ColumnDefinition.ofBoolean("isCoinbase"), - ColumnDefinition.ofDouble("output_value"), - ColumnDefinition.ofTime("last_modified"), - ColumnDefinition.ofDouble("input_value")); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .setTableDefinition(tableDefinition) - .build(); - ParquetTools.readTable( - "s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet", - readInstructions).head(10).select(); - - ParquetTools.readTable( - "s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet", - readInstructions).head(10).select(); - } - - @Test - public void readFlatPartitionedParquetFromS3() { - Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("us-east-1") - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.defaultCredentials()) - .build(); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .setFileLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED) - .build(); - final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet/", - readInstructions); - final Table expected = emptyTable(30).update("A = (int)i % 10"); - assertTableEquals(expected, table); - } - - @Test - public void readFlatPartitionedDataAsKeyValuePartitionedParquetFromS3() { - Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("us-east-1") - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.defaultCredentials()) - .build(); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED) - .build(); - final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet3/", - readInstructions); - final Table expected = emptyTable(30).update("A = (int)i % 10"); - assertTableEquals(expected, table); - } - - @Test - public void readKeyValuePartitionedParquetFromS3() { - Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("us-east-1") - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.defaultCredentials()) - .build(); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED) - .build(); - final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/KeyValuePartitionedData/", - readInstructions); - final List> partitioningColumns = table.getDefinition().getPartitioningColumns(); - assertEquals(3, partitioningColumns.size()); - assertEquals("PC1", partitioningColumns.get(0).getName()); - assertEquals("PC2", partitioningColumns.get(1).getName()); - assertEquals("PC3", partitioningColumns.get(2).getName()); - assertEquals(100, table.size()); - assertEquals(3, table.selectDistinct("PC1").size()); - assertEquals(2, table.selectDistinct("PC2").size()); - assertEquals(2, table.selectDistinct("PC3").size()); - assertEquals(100, table.selectDistinct("I").size()); - assertEquals(1, table.selectDistinct("J").size()); - } - - @Test - public void readKeyValuePartitionedParquetFromPublicS3() { - Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("us-east-1") - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.anonymous()) - .build(); - final TableDefinition ookla_table_definition = TableDefinition.of( - ColumnDefinition.ofInt("quarter").withPartitioning(), - ColumnDefinition.ofString("quadkey")); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .setTableDefinition(ookla_table_definition) - .build(); - final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023", - readInstructions).head(10).select(); - assertEquals(2, table.numColumns()); - } -} diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleLocalStackTest.java similarity index 89% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleLocalStackTest.java index 41daca3e3a6..b654ef60a91 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleLocalStackTest.java @@ -3,7 +3,6 @@ // package io.deephaven.extensions.s3; - import io.deephaven.extensions.s3.S3Instructions.Builder; import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; import org.junit.jupiter.api.BeforeAll; @@ -11,7 +10,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") -public class S3SeekableChannelLocalStackTest extends S3SeekableChannelTestBase { +public class S3SeekableChannelSimpleLocalStackTest extends S3SeekableChannelSimpleTestBase { @BeforeAll static void initContainer() { diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleMinIOTest.java similarity index 87% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleMinIOTest.java index e427fd3e64e..1a91cd0c860 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleMinIOTest.java @@ -3,7 +3,6 @@ // package io.deephaven.extensions.s3; - import io.deephaven.extensions.s3.S3Instructions.Builder; import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; import io.deephaven.stats.util.OSUtil; @@ -13,7 +12,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") -public class S3SeekableChannelMinIOTest extends S3SeekableChannelTestBase { +public class S3SeekableChannelSimpleMinIOTest extends S3SeekableChannelSimpleTestBase { @BeforeAll static void initContainer() { @@ -24,7 +23,7 @@ static void initContainer() { } @Override - public Builder s3Instructions(Builder builder) { + public Builder s3Instructions(final Builder builder) { return MinIO.s3Instructions(builder); } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java similarity index 55% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java index 0075568581b..68f6a9042c7 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java @@ -3,8 +3,7 @@ // package io.deephaven.extensions.s3; - -import io.deephaven.extensions.s3.testlib.S3Helper; +import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.util.channel.CachedChannelProvider; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; @@ -12,60 +11,35 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; - import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.time.Duration; -import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; -public abstract class S3SeekableChannelTestBase { - - public abstract S3AsyncClient s3AsyncClient(); - - public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); - - private ExecutorService executor; - private S3AsyncClient asyncClient; - private String bucket; +abstract class S3SeekableChannelSimpleTestBase extends S3SeekableChannelTestSetup { @BeforeEach void setUp() throws ExecutionException, InterruptedException, TimeoutException { - executor = Executors.newCachedThreadPool(); - bucket = UUID.randomUUID().toString(); - asyncClient = s3AsyncClient(); - asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + doSetUp(); } @AfterEach void tearDown() throws ExecutionException, InterruptedException, TimeoutException { - S3Helper.deleteAllKeys(asyncClient, bucket); - asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); - asyncClient.close(); - executor.shutdownNow(); + doTearDown(); } @Test void readSimpleFiles() throws IOException, URISyntaxException, ExecutionException, InterruptedException, TimeoutException { - uploadDirectory("readSimpleFiles"); + uploadDirectory(Path.of(S3SeekableChannelSimpleTestBase.class.getResource("readSimpleFiles").toURI()), null); { final URI uri = uri("empty.txt"); final ByteBuffer buffer = ByteBuffer.allocate(1); @@ -114,44 +88,4 @@ public int read() { assertThat(readChannel.read(buffer)).isEqualTo(-1); } } - - private void uploadDirectory(String resourceDir) - throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException { - S3Helper.uploadDirectory( - asyncClient, - Path.of(S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()), - bucket, - null, - Duration.ofSeconds(5)); - } - - private URI uri(String key) { - return URI.create(String.format("s3://%s/%s", bucket, key)); - } - - private void putObject(String key, AsyncRequestBody body) - throws ExecutionException, InterruptedException, TimeoutException { - asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5, - TimeUnit.SECONDS); - } - - private SeekableChannelsProvider providerImpl(URI uri) { - final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); - final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); - return plugin.createProvider(uri, instructions); - } - - private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { - final ByteBuffer dst = ByteBuffer.allocate(maxBytes); - while (dst.remaining() > 0 && channel.read(dst) != -1) { - // continue - } - if (dst.remaining() == 0) { - if (channel.read(ByteBuffer.allocate(1)) != -1) { - throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); - } - } - dst.flip(); - return dst; - } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java index 6d2b839f471..933c63bd288 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java @@ -37,7 +37,7 @@ public static void uploadDirectory( } } - public static void uploadDirectory( + private static void uploadDirectory( S3TransferManager transferManager, Path dir, String bucket, @@ -67,6 +67,9 @@ public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket) .map(S3Object::key) .map(S3Helper::objectId) .collect(Collectors.toList()); + if (deletes.isEmpty()) { + break; + } futures.add(s3AsyncClient.deleteObjects(DeleteObjectsRequest.builder() .bucket(bucket) .delete(Delete.builder().objects(deletes).build()) @@ -79,6 +82,9 @@ public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket) ListObjectsV2Request.builder().bucket(bucket).continuationToken(nextContinuationToken).build()) .get(5, TimeUnit.SECONDS); } + if (futures.isEmpty()) { + return; + } CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS); } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java new file mode 100644 index 00000000000..9d4df0a5744 --- /dev/null +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java @@ -0,0 +1,86 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3.testlib; + +import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin; +import io.deephaven.util.channel.SeekableChannelsProvider; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Path; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class S3SeekableChannelTestSetup { + + protected ExecutorService executor; + protected S3AsyncClient asyncClient; + protected String bucket; + + protected abstract S3AsyncClient s3AsyncClient(); + + protected abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); + + protected final void doSetUp() throws ExecutionException, InterruptedException, TimeoutException { + executor = Executors.newCachedThreadPool(); + bucket = UUID.randomUUID().toString(); + asyncClient = s3AsyncClient(); + asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + } + + protected final void doTearDown() throws ExecutionException, InterruptedException, TimeoutException { + S3Helper.deleteAllKeys(asyncClient, bucket); + asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + asyncClient.close(); + executor.shutdownNow(); + } + + protected void uploadDirectory(final Path directory, final String prefix) + throws ExecutionException, InterruptedException, TimeoutException { + S3Helper.uploadDirectory(asyncClient, directory, bucket, prefix, Duration.ofSeconds(5)); + } + + protected final URI uri(String key) { + return URI.create(String.format("s3://%s/%s", bucket, key)); + } + + protected final void putObject(String key, AsyncRequestBody body) + throws ExecutionException, InterruptedException, TimeoutException { + asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5, + TimeUnit.SECONDS); + } + + protected final SeekableChannelsProvider providerImpl(URI uri) { + final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); + final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); + return plugin.createProvider(uri, instructions); + } + + protected static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { + final ByteBuffer dst = ByteBuffer.allocate(maxBytes); + while (dst.remaining() > 0 && channel.read(dst) != -1) { + // continue + } + if (dst.remaining() == 0) { + if (channel.read(ByteBuffer.allocate(1)) != -1) { + throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); + } + } + dst.flip(); + return dst; + } +} diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 4887cf7a78b..055b2a783d6 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -577,7 +577,6 @@ def test_read_parquet_from_s3(self): # Fails because we don't have the right credentials with self.assertRaises(Exception): read("s3://dh-s3-parquet-test1/multiColFile.parquet", special_instructions=s3_instructions).select() - # TODO(deephaven-core#5064): Add support for local S3 testing def verify_index_files(self, index_dir_path, expected_num_index_files=1): self.assertTrue(os.path.exists(index_dir_path))