Skip to content

Commit

Permalink
Initial commit of Iceberg integration. (#5277)
Browse files Browse the repository at this point in the history
* Initial commit of Iceberg integration.

* Tests simplified and passing.

* Exposing 'iceberg-aws' in gradle.

* Addressing PR comments.
  • Loading branch information
lbooker42 authored Jun 5, 2024
1 parent 65fca9c commit 6881afb
Show file tree
Hide file tree
Showing 60 changed files with 3,184 additions and 17 deletions.
45 changes: 44 additions & 1 deletion buildSrc/src/main/groovy/Classpaths.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class Classpaths {
static final String GUAVA_NAME = 'guava'
static final String GUAVA_VERSION = '33.2.0-jre'

static final String HADOOP_GROUP = 'org.apache.hadoop'
static final String HADOOP_VERSION = '3.4.0'

static final String ICEBERG_GROUP = 'org.apache.iceberg'
static final String ICEBERG_VERSION = '1.5.0'

static final String AWSSDK_GROUP = 'software.amazon.awssdk'
static final String AWSSDK_VERSION = '2.23.19'

static final String TESTCONTAINER_GROUP = 'org.testcontainers'
static final String TESTCONTAINER_VERSION = '1.19.4'

static boolean addDependency(Configuration conf, String group, String name, String version, Action<? super DefaultExternalModuleDependency> configure = Actions.doNothing()) {
if (!conf.dependencies.find { it.name == name && it.group == group}) {
DefaultExternalModuleDependency dep = dependency group, name, version
Expand Down Expand Up @@ -295,7 +307,7 @@ class Classpaths {
/** configName controls only the Configuration's classpath, all transitive dependencies are runtimeOnly */
static void inheritParquetHadoopConfiguration(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) {
Configuration config = p.configurations.getByName(configName)
addDependency(config, 'org.apache.hadoop', 'hadoop-common', '3.4.0') {
addDependency(config, HADOOP_GROUP, 'hadoop-common', HADOOP_VERSION) {
it.setTransitive(false)
// Do not take any extra dependencies of this project transitively. We just want a few classes for
// configuration and compression codecs. For any additional required dependencies, add them separately, as
Expand All @@ -314,4 +326,35 @@ class Classpaths {
it.because('hadoop-common required dependency for Configuration')
}
}

static void inheritIcebergHadoop(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) {
Configuration config = p.configurations.getByName(configName)
addDependency(config, HADOOP_GROUP, 'hadoop-common', HADOOP_VERSION)
addDependency(config, HADOOP_GROUP, 'hadoop-hdfs-client', HADOOP_VERSION)
}


static void inheritIcebergCore(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) {
Configuration config = p.configurations.getByName(configName)
addDependency(config, p.getDependencies().platform(ICEBERG_GROUP + ":iceberg-bom:" + ICEBERG_VERSION))

addDependency(config, ICEBERG_GROUP, 'iceberg-core', ICEBERG_VERSION)
addDependency(config, ICEBERG_GROUP, 'iceberg-bundled-guava', ICEBERG_VERSION)
}

static void inheritAWSSDK(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) {
Configuration config = p.configurations.getByName(configName)
addDependency(config, p.getDependencies().platform(AWSSDK_GROUP + ":bom:" + AWSSDK_VERSION))

addDependency(config, AWSSDK_GROUP, 's3', AWSSDK_VERSION)
addDependency(config, AWSSDK_GROUP, 'aws-crt-client', AWSSDK_VERSION)
}

static void inheritTestContainers(Project p, String configName = JavaPlugin.TEST_IMPLEMENTATION_CONFIGURATION_NAME) {
Configuration config = p.configurations.getByName(configName)
addDependency(config, TESTCONTAINER_GROUP, 'testcontainers', TESTCONTAINER_VERSION)
addDependency(config, TESTCONTAINER_GROUP, 'junit-jupiter', TESTCONTAINER_VERSION)
addDependency(config, TESTCONTAINER_GROUP, 'localstack', TESTCONTAINER_VERSION)
addDependency(config, TESTCONTAINER_GROUP, 'minio', TESTCONTAINER_VERSION)
}
}
39 changes: 39 additions & 0 deletions extensions/iceberg/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

description 'Iceberg: Support to read iceberg catalogs.'

dependencies {
api project(':engine-api')
api project(':engine-table')

implementation project(':engine-base')
implementation project(':log-factory')
implementation project(':Configuration')

Classpaths.inheritAutoService(project)
Classpaths.inheritImmutables(project)

Classpaths.inheritParquetHadoop(project)

implementation project(':extensions-parquet-base')
implementation project(':extensions-parquet-table')

Classpaths.inheritIcebergCore(project)
Classpaths.inheritIcebergHadoop(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)

testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

Classpaths.inheritTestContainers(project)

testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}
1 change: 1 addition & 0 deletions extensions/iceberg/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
41 changes: 41 additions & 0 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

description 'Iceberg: Support to read iceberg catalogs.'

dependencies {
implementation project(':extensions-iceberg')

// Bring in the AWS / S3 extensions
Classpaths.inheritIcebergCore(project)

implementation project(':extensions-s3')
implementation "org.apache.iceberg:iceberg-aws"
runtimeOnly "org.apache.iceberg:iceberg-aws-bundle"
Classpaths.inheritAWSSDK(project)

Classpaths.inheritTestContainers(project)

testImplementation TestTools.projectDependency(project, 'extensions-s3')
testImplementation TestTools.projectDependency(project, 'extensions-iceberg')

testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}

test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}
systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image')
systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image')
}
4 changes: 4 additions & 0 deletions extensions/iceberg/s3/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC

testcontainers.localstack.image=localstack/localstack:3.1.0
testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import com.google.common.base.Strings;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.Map;

/**
* Tools for accessing tables in the Iceberg table format.
*/
@SuppressWarnings("unused")
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

public static IcebergCatalogAdapter createS3Rest(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation,
@Nullable final String region,
@Nullable final String accessKeyId,
@Nullable final String secretAccessKey,
@Nullable final String endpointOverride) {

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final RESTCatalog catalog = new RESTCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Configure the properties map from the Iceberg instructions.
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
properties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId);
properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey);
}
if (!Strings.isNullOrEmpty(region)) {
properties.put(AwsClientProperties.CLIENT_REGION, region);
}
if (!Strings.isNullOrEmpty(endpointOverride)) {
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

// TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider
final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeAll
static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.LocalStack.init();
}

@Override
public Builder s3Instructions(Builder builder) {
return SingletonContainers.LocalStack.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.LocalStack.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeAll
static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.MinIO.init();
}

@Override
public Builder s3Instructions(Builder builder) {
return SingletonContainers.MinIO.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.MinIO.s3AsyncClient();
}
}
Loading

0 comments on commit 6881afb

Please sign in to comment.