Skip to content

Commit

Permalink
Unit tests for S3 + parquet (deephaven#5441)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored May 16, 2024
1 parent af861b7 commit fb70418
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 277 deletions.
9 changes: 8 additions & 1 deletion extensions/parquet/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ dependencies {
testImplementation project(':base-test-utils')
testImplementation project(':engine-test-utils')

testImplementation TestTools.projectDependency(project, 'extensions-s3')
Classpaths.inheritJUnitClassic(project, 'testImplementation')

testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
testImplementation "org.testcontainers:minio:1.19.4"

testRuntimeOnly project(':log-to-slf4j'),
project(path: ':configs'),
project(path: ':test-configs')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')

runtimeOnly project(':extensions-trackedfile')
testImplementation project(':extensions-s3')

brotliTestImplementation project(':extensions-parquet-table')
brotliTestImplementation('com.github.rdblue:brotli-codec:0.1.1')
Expand All @@ -70,3 +74,6 @@ if (Architecture.fromHost() == Architecture.AMD64) {
}

TestTools.addEngineOutOfBandTest(project)

testOutOfBand.systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image')
testOutOfBand.systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image')
4 changes: 4 additions & 0 deletions extensions/parquet/table/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC

# TODO(deephaven-core#5115): EPIC: Dependency management
testcontainers.localstack.image=localstack/localstack:3.1.0
testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract ParquetInstructions withTableDefinition(final TableDefinition tableDefinition);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but layout set as
* the provided {@link ParquetFileLayout}.
*/
public abstract ParquetInstructions withLayout(final ParquetFileLayout fileLayout);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* and layout set as the provided values.
Expand Down Expand Up @@ -350,18 +356,23 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) {
return withTableDefinitionAndLayout(tableDefinition, null);
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, null);
}

@Override
public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) {
return withTableDefinitionAndLayout(null, useLayout);
}

@Override
public ParquetInstructions withTableDefinitionAndLayout(
@Nullable final TableDefinition tableDefinition,
@Nullable final ParquetFileLayout fileLayout) {
@Nullable final TableDefinition useDefinition,
@Nullable final ParquetFileLayout useLayout) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
fileLayout, tableDefinition, null);
useLayout, useDefinition, null);
}

@Override
Expand Down Expand Up @@ -598,7 +609,12 @@ public Optional<Collection<List<String>>> getIndexColumns() {

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null));
return withTableDefinitionAndLayout(useDefinition, fileLayout);
}

@Override
public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) {
return withTableDefinitionAndLayout(tableDefinition, useLayout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static Table readTable(
return readTableFromFileUri(sourceURI, readInstructions);
}
if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) {
throw new UncheckedDeephavenException("We currently do not support reading parquet metadata files " +
throw new UnsupportedOperationException("We currently do not support reading parquet metadata files " +
"from non local storage");
}
if (!isDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.api.SortColumn;
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.primitive.function.ByteConsumer;
import io.deephaven.engine.primitive.function.CharConsumer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

public class S3ParquetLocalStackTest extends S3ParquetTestBase {

@BeforeClass
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
}

@Override
public Builder s3Instructions(Builder builder) {
return LocalStack.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.LocalStack.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.stats.util.OSUtil;
import org.junit.Assume;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

public class S3ParquetMinIOTest extends S3ParquetTestBase {

@BeforeClass
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assume.assumeFalse("OSUtil.runningMacOS()", OSUtil.runningMacOS());
// ensure container is started so container startup time isn't associated with a specific test
MinIO.init();
}

@Override
public Builder s3Instructions(final Builder builder) {
return MinIO.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.MinIO.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.test.types.OutOfBandTest;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.time.Duration;

import static org.junit.Assert.assertEquals;

/**
* These tests verify the behavior of Parquet implementation when reading against remote S3 servers.
**/
@Category(OutOfBandTest.class)
public class S3ParquetRemoteTest {

// The following tests are disabled by default, and should be run manually.
private static final boolean ENABLE_REMOTE_S3_TESTING = false;

@Rule
public final EngineCleanup framework = new EngineCleanup();

@Test
public void readSampleParquetFilesFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-2")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition tableDefinition = TableDefinition.of(
ColumnDefinition.ofString("hash"),
ColumnDefinition.ofLong("version"),
ColumnDefinition.ofLong("size"),
ColumnDefinition.ofString("block_hash"),
ColumnDefinition.ofLong("block_number"),
ColumnDefinition.ofLong("index"),
ColumnDefinition.ofLong("virtual_size"),
ColumnDefinition.ofLong("lock_time"),
ColumnDefinition.ofLong("input_count"),
ColumnDefinition.ofLong("output_count"),
ColumnDefinition.ofBoolean("isCoinbase"),
ColumnDefinition.ofDouble("output_value"),
ColumnDefinition.ofTime("last_modified"),
ColumnDefinition.ofDouble("input_value"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(tableDefinition)
.build();
ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet",
readInstructions).head(10).select();

ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet",
readInstructions).head(10).select();
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition ookla_table_definition = TableDefinition.of(
ColumnDefinition.ofInt("quarter").withPartitioning(),
ColumnDefinition.ofString("quadkey"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(ookla_table_definition)
.build();
final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023",
readInstructions).head(10).select();
assertEquals(2, table.numColumns());
}
}
Loading

0 comments on commit fb70418

Please sign in to comment.