diff --git a/buildSrc/src/main/groovy/Classpaths.groovy b/buildSrc/src/main/groovy/Classpaths.groovy index a09da2269ad..caf4f5769d1 100644 --- a/buildSrc/src/main/groovy/Classpaths.groovy +++ b/buildSrc/src/main/groovy/Classpaths.groovy @@ -123,6 +123,18 @@ class Classpaths { static final String GUAVA_NAME = 'guava' static final String GUAVA_VERSION = '33.2.0-jre' + static final String HADOOP_GROUP = 'org.apache.hadoop' + static final String HADOOP_VERSION = '3.4.0' + + static final String ICEBERG_GROUP = 'org.apache.iceberg' + static final String ICEBERG_VERSION = '1.5.0' + + static final String AWSSDK_GROUP = 'software.amazon.awssdk' + static final String AWSSDK_VERSION = '2.23.19' + + static final String TESTCONTAINER_GROUP = 'org.testcontainers' + static final String TESTCONTAINER_VERSION = '1.19.4' + static boolean addDependency(Configuration conf, String group, String name, String version, Action configure = Actions.doNothing()) { if (!conf.dependencies.find { it.name == name && it.group == group}) { DefaultExternalModuleDependency dep = dependency group, name, version @@ -295,7 +307,7 @@ class Classpaths { /** configName controls only the Configuration's classpath, all transitive dependencies are runtimeOnly */ static void inheritParquetHadoopConfiguration(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) { Configuration config = p.configurations.getByName(configName) - addDependency(config, 'org.apache.hadoop', 'hadoop-common', '3.4.0') { + addDependency(config, HADOOP_GROUP, 'hadoop-common', HADOOP_VERSION) { it.setTransitive(false) // Do not take any extra dependencies of this project transitively. We just want a few classes for // configuration and compression codecs. For any additional required dependencies, add them separately, as @@ -314,4 +326,35 @@ class Classpaths { it.because('hadoop-common required dependency for Configuration') } } + + static void inheritIcebergHadoop(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) { + Configuration config = p.configurations.getByName(configName) + addDependency(config, HADOOP_GROUP, 'hadoop-common', HADOOP_VERSION) + addDependency(config, HADOOP_GROUP, 'hadoop-hdfs-client', HADOOP_VERSION) + } + + + static void inheritIcebergCore(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) { + Configuration config = p.configurations.getByName(configName) + addDependency(config, p.getDependencies().platform(ICEBERG_GROUP + ":iceberg-bom:" + ICEBERG_VERSION)) + + addDependency(config, ICEBERG_GROUP, 'iceberg-core', ICEBERG_VERSION) + addDependency(config, ICEBERG_GROUP, 'iceberg-bundled-guava', ICEBERG_VERSION) + } + + static void inheritAWSSDK(Project p, String configName = JavaPlugin.IMPLEMENTATION_CONFIGURATION_NAME) { + Configuration config = p.configurations.getByName(configName) + addDependency(config, p.getDependencies().platform(AWSSDK_GROUP + ":bom:" + AWSSDK_VERSION)) + + addDependency(config, AWSSDK_GROUP, 's3', AWSSDK_VERSION) + addDependency(config, AWSSDK_GROUP, 'aws-crt-client', AWSSDK_VERSION) + } + + static void inheritTestContainers(Project p, String configName = JavaPlugin.TEST_IMPLEMENTATION_CONFIGURATION_NAME) { + Configuration config = p.configurations.getByName(configName) + addDependency(config, TESTCONTAINER_GROUP, 'testcontainers', TESTCONTAINER_VERSION) + addDependency(config, TESTCONTAINER_GROUP, 'junit-jupiter', TESTCONTAINER_VERSION) + addDependency(config, TESTCONTAINER_GROUP, 'localstack', TESTCONTAINER_VERSION) + addDependency(config, TESTCONTAINER_GROUP, 'minio', TESTCONTAINER_VERSION) + } } diff --git a/extensions/iceberg/build.gradle b/extensions/iceberg/build.gradle new file mode 100644 index 00000000000..2eba0158fb1 --- /dev/null +++ b/extensions/iceberg/build.gradle @@ -0,0 +1,39 @@ +plugins { + id 'java-library' + id 'io.deephaven.project.register' +} + +description 'Iceberg: Support to read iceberg catalogs.' + +dependencies { + api project(':engine-api') + api project(':engine-table') + + implementation project(':engine-base') + implementation project(':log-factory') + implementation project(':Configuration') + + Classpaths.inheritAutoService(project) + Classpaths.inheritImmutables(project) + + Classpaths.inheritParquetHadoop(project) + + implementation project(':extensions-parquet-base') + implementation project(':extensions-parquet-table') + + Classpaths.inheritIcebergCore(project) + Classpaths.inheritIcebergHadoop(project) + + Classpaths.inheritJUnitPlatform(project) + Classpaths.inheritAssertJ(project) + + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + + Classpaths.inheritTestContainers(project) + + testRuntimeOnly project(':test-configs') + testRuntimeOnly project(':log-to-slf4j') + Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly') +} diff --git a/extensions/iceberg/gradle.properties b/extensions/iceberg/gradle.properties new file mode 100644 index 00000000000..c186bbfdde1 --- /dev/null +++ b/extensions/iceberg/gradle.properties @@ -0,0 +1 @@ +io.deephaven.project.ProjectType=JAVA_PUBLIC diff --git a/extensions/iceberg/s3/build.gradle b/extensions/iceberg/s3/build.gradle new file mode 100644 index 00000000000..c07df457cf7 --- /dev/null +++ b/extensions/iceberg/s3/build.gradle @@ -0,0 +1,41 @@ +plugins { + id 'java-library' + id 'io.deephaven.project.register' +} + +description 'Iceberg: Support to read iceberg catalogs.' + +dependencies { + implementation project(':extensions-iceberg') + + // Bring in the AWS / S3 extensions + Classpaths.inheritIcebergCore(project) + + implementation project(':extensions-s3') + implementation "org.apache.iceberg:iceberg-aws" + runtimeOnly "org.apache.iceberg:iceberg-aws-bundle" + Classpaths.inheritAWSSDK(project) + + Classpaths.inheritTestContainers(project) + + testImplementation TestTools.projectDependency(project, 'extensions-s3') + testImplementation TestTools.projectDependency(project, 'extensions-iceberg') + + testRuntimeOnly project(':test-configs') + testRuntimeOnly project(':log-to-slf4j') + Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly') +} + +test { + useJUnitPlatform { + excludeTags("testcontainers") + } +} + +tasks.register('testOutOfBand', Test) { + useJUnitPlatform { + includeTags("testcontainers") + } + systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image') + systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image') +} diff --git a/extensions/iceberg/s3/gradle.properties b/extensions/iceberg/s3/gradle.properties new file mode 100644 index 00000000000..cfd384f094a --- /dev/null +++ b/extensions/iceberg/s3/gradle.properties @@ -0,0 +1,4 @@ +io.deephaven.project.ProjectType=JAVA_PUBLIC + +testcontainers.localstack.image=localstack/localstack:3.1.0 +testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java new file mode 100644 index 00000000000..6f7845c43eb --- /dev/null +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java @@ -0,0 +1,65 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import com.google.common.base.Strings; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.RESTCatalog; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tools for accessing tables in the Iceberg table format. + */ +@SuppressWarnings("unused") +public class IcebergToolsS3 extends IcebergTools { + private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO"; + + public static IcebergCatalogAdapter createS3Rest( + @Nullable final String name, + @NotNull final String catalogURI, + @NotNull final String warehouseLocation, + @Nullable final String region, + @Nullable final String accessKeyId, + @Nullable final String secretAccessKey, + @Nullable final String endpointOverride) { + + // Set up the properties map for the Iceberg catalog + final Map properties = new HashMap<>(); + + final RESTCatalog catalog = new RESTCatalog(); + + properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); + properties.put(CatalogProperties.URI, catalogURI); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + + // Configure the properties map from the Iceberg instructions. + if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) { + properties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId); + properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey); + } + if (!Strings.isNullOrEmpty(region)) { + properties.put(AwsClientProperties.CLIENT_REGION, region); + } + if (!Strings.isNullOrEmpty(endpointOverride)) { + properties.put(S3FileIOProperties.ENDPOINT, endpointOverride); + } + + // TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider + final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null); + + final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; + catalog.initialize(catalogName, properties); + + return new IcebergCatalogAdapter(catalog, fileIO); + } + +} diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java new file mode 100644 index 00000000000..578e358985e --- /dev/null +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java @@ -0,0 +1,31 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + + +import io.deephaven.extensions.s3.S3Instructions.Builder; +import io.deephaven.extensions.s3.testlib.SingletonContainers; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +@Tag("testcontainers") +public class IcebergLocalStackTest extends IcebergToolsTest { + + @BeforeAll + static void initContainer() { + // ensure container is started so container startup time isn't associated with a specific test + SingletonContainers.LocalStack.init(); + } + + @Override + public Builder s3Instructions(Builder builder) { + return SingletonContainers.LocalStack.s3Instructions(builder); + } + + @Override + public S3AsyncClient s3AsyncClient() { + return SingletonContainers.LocalStack.s3AsyncClient(); + } +} diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java new file mode 100644 index 00000000000..804d2d01746 --- /dev/null +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java @@ -0,0 +1,35 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + + +import io.deephaven.extensions.s3.S3Instructions.Builder; +import io.deephaven.extensions.s3.testlib.SingletonContainers; +import io.deephaven.stats.util.OSUtil; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +@Tag("testcontainers") +public class IcebergMinIOTest extends IcebergToolsTest { + + @BeforeAll + static void initContainer() { + // TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X + Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()"); + // ensure container is started so container startup time isn't associated with a specific test + SingletonContainers.MinIO.init(); + } + + @Override + public Builder s3Instructions(Builder builder) { + return SingletonContainers.MinIO.s3Instructions(builder); + } + + @Override + public S3AsyncClient s3AsyncClient() { + return SingletonContainers.MinIO.s3AsyncClient(); + } +} diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java new file mode 100644 index 00000000000..0fd3b3fcf7e --- /dev/null +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -0,0 +1,594 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import gnu.trove.list.array.TLongArrayList; +import io.deephaven.base.verify.Assert; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.util.TableTools; +import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.iceberg.TestCatalog.IcebergTestCatalog; +import io.deephaven.iceberg.TestCatalog.IcebergTestFileIO; +import io.deephaven.time.DateTimeUtils; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.*; + +import java.io.File; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public abstract class IcebergToolsTest { + IcebergInstructions instructions; + + public abstract S3AsyncClient s3AsyncClient(); + + public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); + + private S3AsyncClient asyncClient; + private String bucket; + + private final List keys = new ArrayList<>(); + + private String warehousePath; + private Catalog resourceCatalog; + private FileIO resourceFileIO; + + @BeforeEach + void setUp() throws ExecutionException, InterruptedException { + bucket = "warehouse"; + asyncClient = s3AsyncClient(); + asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(); + + warehousePath = IcebergToolsTest.class.getResource("/warehouse").getPath(); + resourceFileIO = new IcebergTestFileIO("s3://warehouse", warehousePath); + + // Create the test catalog for the tests + resourceCatalog = IcebergTestCatalog.create(warehousePath, resourceFileIO); + + final S3Instructions s3Instructions = s3Instructions(S3Instructions.builder()).build(); + + instructions = IcebergInstructions.builder() + .dataInstructions(s3Instructions) + .build(); + } + + private void uploadParquetFiles(final File root, final String prefixToRemove) + throws ExecutionException, InterruptedException, TimeoutException { + for (final File file : root.listFiles()) { + if (file.isDirectory()) { + uploadParquetFiles(file, prefixToRemove); + } else if (file.getName().endsWith(".parquet")) { + final String key = file.getPath().substring(prefixToRemove.length() + 1); + + keys.add(key); + final CompletableFuture future = asyncClient.putObject( + PutObjectRequest.builder().bucket(bucket).key(key).build(), + AsyncRequestBody.fromFile(file)); + + final PutObjectResponse response = future.get(10, TimeUnit.SECONDS); + if (!response.sdkHttpResponse().isSuccessful()) { + Assert.statementNeverExecuted("Failed to upload file: " + file.getPath()); + } + } + } + } + + @AfterEach + public void tearDown() throws ExecutionException, InterruptedException { + for (String key : keys) { + asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get(); + } + keys.clear(); + asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(); + asyncClient.close(); + } + + @Test + public void testListNamespaces() { + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Collection namespaces = adapter.listNamespaces(); + final Collection namespaceNames = + namespaces.stream().map(Namespace::toString).collect(Collectors.toList()); + + Assert.eq(namespaceNames.size(), "namespaceNames.size()", 2, "2 namespace in the catalog"); + Assert.eqTrue(namespaceNames.contains("sales"), "namespaceNames.contains(sales)"); + Assert.eqTrue(namespaceNames.contains("sample"), "namespaceNames.contains(sample)"); + + final Table table = adapter.listNamespacesAsTable(); + Assert.eq(table.size(), "table.size()", 2, "2 namespace in the catalog"); + Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type"); + Assert.eqTrue(table.getColumnSource("namespace_object").getType().equals(Namespace.class), + "namespace_object column type"); + } + + @Test + public void testListTables() { + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + + final Collection tables = adapter.listTables(ns); + Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")), + "tables.contains(sales_partitioned)"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)"); + + final Table table = adapter.listTablesAsTable(ns); + Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); + Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type"); + Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type"); + Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class), + "table_identifier_object column type"); + } + + @Test + public void testListSnapshots() { + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final TLongArrayList snapshotIds = new TLongArrayList(); + final TableIdentifier tableIdentifier = TableIdentifier.of("sales", "sales_multi"); + adapter.listSnapshots(tableIdentifier) + .forEach(snapshot -> snapshotIds.add(snapshot.snapshotId())); + + Assert.eq(snapshotIds.size(), "snapshots.size()", 4, "4 snapshots for sales/sales_multi"); + + Assert.eqTrue(snapshotIds.contains(2001582482032951248L), "snapshots.contains(2001582482032951248)"); + Assert.eqTrue(snapshotIds.contains(8325605756612719366L), "snapshots.contains(8325605756612719366L)"); + Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)"); + Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)"); + + final Table table = adapter.listSnapshotsAsTable(tableIdentifier); + Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); + Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type"); + Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type"); + Assert.eqTrue(table.getColumnSource("operation").getType().equals(String.class), "operation column type"); + Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type"); + Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class), + "snapshot_object column type"); + } + + @Test + public void testOpenTableA() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableB() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableC() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_single").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableS3Only() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableDefinition() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("year").withPartitioning(), + ColumnDefinition.ofInt("month").withPartitioning(), + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTablePartitionTypeException() { + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofLong("year").withPartitioning(), + ColumnDefinition.ofInt("month").withPartitioning(), + ColumnDefinition.ofLong("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofDouble("Units_Sold"), + ColumnDefinition.ofLong("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + try { + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + TableTools.showWithRowSet(table, 100, DateTimeUtils.timeZone(), System.out); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDefinition.IncompatibleTableDefinitionException e) { + Assert.eqTrue(e.getMessage().startsWith("Table definition incompatibilities"), "Exception message"); + } + } + + @Test + public void testOpenTableDefinitionRename() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("__year").withPartitioning(), + ColumnDefinition.ofInt("__month").withPartitioning(), + ColumnDefinition.ofString("RegionName"), + ColumnDefinition.ofString("ItemType"), + ColumnDefinition.ofInt("UnitsSold"), + ColumnDefinition.ofDouble("UnitPrice"), + ColumnDefinition.fromGenericType("OrderDate", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .putColumnRenames("Region", "RegionName") + .putColumnRenames("Item_Type", "ItemType") + .putColumnRenames("Units_Sold", "UnitsSold") + .putColumnRenames("Unit_Price", "UnitPrice") + .putColumnRenames("Order_Date", "OrderDate") + .putColumnRenames("year", "__year") + .putColumnRenames("month", "__month") + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testSkippedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("year").withPartitioning(), + // Omitting month partitioning column + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testReorderedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("month").withPartitioning(), + ColumnDefinition.ofInt("year").withPartitioning(), + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testZeroPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofString("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testIncorrectPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("month").withPartitioning(), + ColumnDefinition.ofInt("year").withPartitioning(), + ColumnDefinition.ofString("Region").withPartitioning(), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofInt("Units_Sold"), + ColumnDefinition.ofDouble("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + + try { + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDataException e) { + Assert.eqTrue(e.getMessage().startsWith("The following columns are not partitioned"), "Exception message"); + } + } + + @Test + public void testMissingPartitioningColumns() { + final TableDefinition tableDef = TableDefinition.of( + ColumnDefinition.ofInt("__year").withPartitioning(), // Incorrect name + ColumnDefinition.ofInt("__month").withPartitioning(), // Incorrect name + ColumnDefinition.ofLong("Region"), + ColumnDefinition.ofString("Item_Type"), + ColumnDefinition.ofDouble("Units_Sold"), + ColumnDefinition.ofLong("Unit_Price"), + ColumnDefinition.fromGenericType("Order_Date", Instant.class)); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .tableDefinition(tableDef) + .dataInstructions(instructions.dataInstructions().get()) + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + try { + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + Assert.statementNeverExecuted("Expected an exception for missing columns"); + } catch (final TableDefinition.IncompatibleTableDefinitionException e) { + Assert.eqTrue(e.getMessage().startsWith("Table definition incompatibilities"), "Exception message"); + } + } + + @Test + public void testOpenTableColumnRename() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .dataInstructions(instructions.dataInstructions().get()) + .putColumnRenames("RegionName", "Region") + .putColumnRenames("ItemType", "Item_Type") + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableColumnRenamePartitioningColumns() + throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_partitioned").getPath()), + warehousePath); + + final IcebergInstructions localInstructions = IcebergInstructions.builder() + .dataInstructions(instructions.dataInstructions().get()) + .putColumnRenames("VendorID", "vendor_id") + .putColumnRenames("month", "__month") + .putColumnRenames("year", "__year") + .build(); + + final IcebergCatalogAdapter adapter = + IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableSnapshot() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); + final List snapshots = adapter.listSnapshots(tableId); + + // Verify we retrieved all the rows. + final io.deephaven.engine.table.Table table0 = + adapter.readTable(tableId, snapshots.get(0).snapshotId(), instructions); + Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + + final io.deephaven.engine.table.Table table1 = + adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions); + Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + + final io.deephaven.engine.table.Table table2 = + adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions); + Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + + final io.deephaven.engine.table.Table table3 = + adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions); + Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_multi").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sales"); + final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); + final List snapshots = adapter.listSnapshots(tableId); + + // Verify we retrieved all the rows. + final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions); + Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + + final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions); + Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + + final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions); + Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + + final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions); + Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + } + + @Test + public void testOpenAllTypesTable() throws ExecutionException, InterruptedException, TimeoutException { + uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sample/all_types").getPath()), + warehousePath); + + final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); + + final Namespace ns = Namespace.of("sample"); + final TableIdentifier tableId = TableIdentifier.of(ns, "all_types"); + final List snapshots = adapter.listSnapshots(tableId); + + // Verify we retrieved all the rows. + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + Assert.eq(table.size(), "table.size()", 10, "10 rows in the table"); + } +} diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-3-57e22ac8-7e7d-4830-aedb-14dab6fabdd6-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-3-57e22ac8-7e7d-4830-aedb-14dab6fabdd6-0-00001.parquet new file mode 100644 index 00000000000..04d259decea --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-3-57e22ac8-7e7d-4830-aedb-14dab6fabdd6-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9749ae2922aa9d21b7e779142d6c2476d0444c2c24f7e93397e6750147180724 +size 176970 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-4-3aacc74b-7f10-4e08-89ae-f1b1b578ce63-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-4-3aacc74b-7f10-4e08-89ae-f1b1b578ce63-0-00001.parquet new file mode 100644 index 00000000000..206e1f84a6c --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-4-3aacc74b-7f10-4e08-89ae-f1b1b578ce63-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d5007c70d0e8be33ef012c48d7f067b47812b22747c145cfa4bab4bef944fb0f +size 331675 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-5-47debf3c-b256-4a04-9248-d7c69bec4881-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-5-47debf3c-b256-4a04-9248-d7c69bec4881-0-00001.parquet new file mode 100644 index 00000000000..6d552bf0e9a --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-5-47debf3c-b256-4a04-9248-d7c69bec4881-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:c02909448106787f93e8a2550e9db360aedd9ab25c925e8a536b18096aa8ed91 +size 176482 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-6-7860a7c9-b3cb-4b2a-a69b-0f1e5c068512-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-6-7860a7c9-b3cb-4b2a-a69b-0f1e5c068512-0-00001.parquet new file mode 100644 index 00000000000..540e5bc3392 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/data/00000-6-7860a7c9-b3cb-4b2a-a69b-0f1e5c068512-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:c872a3defa86826496fbc76fa4235c858d45edb5441e196c2c812f4c8a1166e7 +size 256855 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00000-8bdf65c1-a414-468b-b7b2-78558b2e8c1f.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00000-8bdf65c1-a414-468b-b7b2-78558b2e8c1f.metadata.json new file mode 100644 index 00000000000..53c1afcba7b --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00000-8bdf65c1-a414-468b-b7b2-78558b2e8c1f.metadata.json @@ -0,0 +1,91 @@ +{ + "format-version" : 2, + "table-uuid" : "49ca5b8c-6402-40f9-96b0-4c09f9f1b512", + "location" : "s3://warehouse/sales/sales_multi", + "last-sequence-number" : 1, + "last-updated-ms" : 1716234545155, + "last-column-id" : 5, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item_Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units_Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit_Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order_Date", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-05-20T19:49:04.669537174Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 2001582482032951248, + "refs" : { + "main" : { + "snapshot-id" : 2001582482032951248, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545155, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "18266", + "added-files-size" : "176970", + "changed-partition-count" : "1", + "total-records" : "18266", + "total-files-size" : "176970", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1716234545155, + "snapshot-id" : 2001582482032951248 + } ], + "metadata-log" : [ ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00001-30972e0f-9c8a-4fa9-911d-e356b2b04061.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00001-30972e0f-9c8a-4fa9-911d-e356b2b04061.metadata.json new file mode 100644 index 00000000000..5aa1996ce92 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00001-30972e0f-9c8a-4fa9-911d-e356b2b04061.metadata.json @@ -0,0 +1,118 @@ +{ + "format-version" : 2, + "table-uuid" : "49ca5b8c-6402-40f9-96b0-4c09f9f1b512", + "location" : "s3://warehouse/sales/sales_multi", + "last-sequence-number" : 2, + "last-updated-ms" : 1716234545529, + "last-column-id" : 5, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item_Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units_Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit_Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order_Date", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-05-20T19:49:04.669537174Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 8325605756612719366, + "refs" : { + "main" : { + "snapshot-id" : 8325605756612719366, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545155, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "18266", + "added-files-size" : "176970", + "changed-partition-count" : "1", + "total-records" : "18266", + "total-files-size" : "176970", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro", + "schema-id" : 0 + }, { + "sequence-number" : 2, + "snapshot-id" : 8325605756612719366, + "parent-snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545529, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "36107", + "added-files-size" : "331675", + "changed-partition-count" : "1", + "total-records" : "54373", + "total-files-size" : "508645", + "total-data-files" : "2", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-8325605756612719366-1-02f91282-fecb-4479-bd3b-20a8a3aaa795.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1716234545155, + "snapshot-id" : 2001582482032951248 + }, { + "timestamp-ms" : 1716234545529, + "snapshot-id" : 8325605756612719366 + } ], + "metadata-log" : [ { + "timestamp-ms" : 1716234545155, + "metadata-file" : "s3://warehouse/sales/sales_multi/metadata/00000-8bdf65c1-a414-468b-b7b2-78558b2e8c1f.metadata.json" + } ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00002-559f7323-a010-4afe-8461-f5261787aae9.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00002-559f7323-a010-4afe-8461-f5261787aae9.metadata.json new file mode 100644 index 00000000000..1be00194c4e --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00002-559f7323-a010-4afe-8461-f5261787aae9.metadata.json @@ -0,0 +1,145 @@ +{ + "format-version" : 2, + "table-uuid" : "49ca5b8c-6402-40f9-96b0-4c09f9f1b512", + "location" : "s3://warehouse/sales/sales_multi", + "last-sequence-number" : 3, + "last-updated-ms" : 1716234545865, + "last-column-id" : 5, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item_Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units_Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit_Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order_Date", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-05-20T19:49:04.669537174Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 3247344357341484163, + "refs" : { + "main" : { + "snapshot-id" : 3247344357341484163, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545155, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "18266", + "added-files-size" : "176970", + "changed-partition-count" : "1", + "total-records" : "18266", + "total-files-size" : "176970", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro", + "schema-id" : 0 + }, { + "sequence-number" : 2, + "snapshot-id" : 8325605756612719366, + "parent-snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545529, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "36107", + "added-files-size" : "331675", + "changed-partition-count" : "1", + "total-records" : "54373", + "total-files-size" : "508645", + "total-data-files" : "2", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-8325605756612719366-1-02f91282-fecb-4479-bd3b-20a8a3aaa795.avro", + "schema-id" : 0 + }, { + "sequence-number" : 3, + "snapshot-id" : 3247344357341484163, + "parent-snapshot-id" : 8325605756612719366, + "timestamp-ms" : 1716234545865, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "18230", + "added-files-size" : "176482", + "changed-partition-count" : "1", + "total-records" : "72603", + "total-files-size" : "685127", + "total-data-files" : "3", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-3247344357341484163-1-77bfad1c-123c-452c-814d-298a1483a99f.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1716234545155, + "snapshot-id" : 2001582482032951248 + }, { + "timestamp-ms" : 1716234545529, + "snapshot-id" : 8325605756612719366 + }, { + "timestamp-ms" : 1716234545865, + "snapshot-id" : 3247344357341484163 + } ], + "metadata-log" : [ { + "timestamp-ms" : 1716234545155, + "metadata-file" : "s3://warehouse/sales/sales_multi/metadata/00000-8bdf65c1-a414-468b-b7b2-78558b2e8c1f.metadata.json" + }, { + "timestamp-ms" : 1716234545529, + "metadata-file" : "s3://warehouse/sales/sales_multi/metadata/00001-30972e0f-9c8a-4fa9-911d-e356b2b04061.metadata.json" + } ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00003-366e4cb2-48c8-4f6d-bd6b-9b452de74dc3.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00003-366e4cb2-48c8-4f6d-bd6b-9b452de74dc3.metadata.json new file mode 100644 index 00000000000..16cca0f3f8d --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/00003-366e4cb2-48c8-4f6d-bd6b-9b452de74dc3.metadata.json @@ -0,0 +1,172 @@ +{ + "format-version" : 2, + "table-uuid" : "49ca5b8c-6402-40f9-96b0-4c09f9f1b512", + "location" : "s3://warehouse/sales/sales_multi", + "last-sequence-number" : 4, + "last-updated-ms" : 1716234546189, + "last-column-id" : 5, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item_Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units_Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit_Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order_Date", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-05-20T19:49:04.669537174Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 1792185872197984875, + "refs" : { + "main" : { + "snapshot-id" : 1792185872197984875, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545155, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "18266", + "added-files-size" : "176970", + "changed-partition-count" : "1", + "total-records" : "18266", + "total-files-size" : "176970", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro", + "schema-id" : 0 + }, { + "sequence-number" : 2, + "snapshot-id" : 8325605756612719366, + "parent-snapshot-id" : 2001582482032951248, + "timestamp-ms" : 1716234545529, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "36107", + "added-files-size" : "331675", + "changed-partition-count" : "1", + "total-records" : "54373", + "total-files-size" : "508645", + "total-data-files" : "2", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-8325605756612719366-1-02f91282-fecb-4479-bd3b-20a8a3aaa795.avro", + "schema-id" : 0 + }, { + "sequence-number" : 3, + "snapshot-id" : 3247344357341484163, + "parent-snapshot-id" : 8325605756612719366, + "timestamp-ms" : 1716234545865, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "18230", + "added-files-size" : "176482", + "changed-partition-count" : "1", + "total-records" : "72603", + "total-files-size" : "685127", + "total-data-files" : "3", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-3247344357341484163-1-77bfad1c-123c-452c-814d-298a1483a99f.avro", + "schema-id" : 0 + }, { + "sequence-number" : 4, + "snapshot-id" : 1792185872197984875, + "parent-snapshot-id" : 3247344357341484163, + "timestamp-ms" : 1716234546189, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "27397", + "added-files-size" : "256855", + "changed-partition-count" : "1", + "total-records" : "100000", + "total-files-size" : "941982", + "total-data-files" : "4", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_multi/metadata/snap-1792185872197984875-1-51be90a2-7294-4ebb-8eee-20d512fff8b0.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1716234545155, + "snapshot-id" : 2001582482032951248 + }, { + "timestamp-ms" : 1716234545529, + "snapshot-id" : 8325605756612719366 + }, { + "timestamp-ms" : 1716234545865, + "snapshot-id" : 3247344357341484163 + }, { + "timestamp-ms" : 1716234546189, + "snapshot-id" : 1792185872197984875 + } ], + "metadata-log" : [ { + "timestamp-ms" : 1716234545155, + "metadata-file" : "s3://warehouse/sales/sales_multi/metadata/00000-8bdf65c1-a414-468b-b7b2-78558b2e8c1f.metadata.json" + }, { + "timestamp-ms" : 1716234545529, + "metadata-file" : "s3://warehouse/sales/sales_multi/metadata/00001-30972e0f-9c8a-4fa9-911d-e356b2b04061.metadata.json" + }, { + "timestamp-ms" : 1716234545865, + "metadata-file" : "s3://warehouse/sales/sales_multi/metadata/00002-559f7323-a010-4afe-8461-f5261787aae9.metadata.json" + } ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/02f91282-fecb-4479-bd3b-20a8a3aaa795-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/02f91282-fecb-4479-bd3b-20a8a3aaa795-m0.avro new file mode 100644 index 00000000000..d871fbb9fd0 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/02f91282-fecb-4479-bd3b-20a8a3aaa795-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/51be90a2-7294-4ebb-8eee-20d512fff8b0-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/51be90a2-7294-4ebb-8eee-20d512fff8b0-m0.avro new file mode 100644 index 00000000000..21c52f9fe1c Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/51be90a2-7294-4ebb-8eee-20d512fff8b0-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/77bfad1c-123c-452c-814d-298a1483a99f-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/77bfad1c-123c-452c-814d-298a1483a99f-m0.avro new file mode 100644 index 00000000000..9501a6ce37e Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/77bfad1c-123c-452c-814d-298a1483a99f-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/a61cde47-0ced-4b74-b91a-9e32dd59b212-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/a61cde47-0ced-4b74-b91a-9e32dd59b212-m0.avro new file mode 100644 index 00000000000..462894acbde Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/a61cde47-0ced-4b74-b91a-9e32dd59b212-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-1792185872197984875-1-51be90a2-7294-4ebb-8eee-20d512fff8b0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-1792185872197984875-1-51be90a2-7294-4ebb-8eee-20d512fff8b0.avro new file mode 100644 index 00000000000..c6e3e85459b Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-1792185872197984875-1-51be90a2-7294-4ebb-8eee-20d512fff8b0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro new file mode 100644 index 00000000000..28d439215a0 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-2001582482032951248-1-a61cde47-0ced-4b74-b91a-9e32dd59b212.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-3247344357341484163-1-77bfad1c-123c-452c-814d-298a1483a99f.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-3247344357341484163-1-77bfad1c-123c-452c-814d-298a1483a99f.avro new file mode 100644 index 00000000000..8774b62b1e7 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-3247344357341484163-1-77bfad1c-123c-452c-814d-298a1483a99f.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-8325605756612719366-1-02f91282-fecb-4479-bd3b-20a8a3aaa795.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-8325605756612719366-1-02f91282-fecb-4479-bd3b-20a8a3aaa795.avro new file mode 100644 index 00000000000..b5659a5c9b9 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_multi/metadata/snap-8325605756612719366-1-02f91282-fecb-4479-bd3b-20a8a3aaa795.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/data/year=2022/month=11/00000-8-981b110e-f66f-4474-a2a5-ad5e580e8f54-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/data/year=2022/month=11/00000-8-981b110e-f66f-4474-a2a5-ad5e580e8f54-0-00001.parquet new file mode 100644 index 00000000000..d9750a49699 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/data/year=2022/month=11/00000-8-981b110e-f66f-4474-a2a5-ad5e580e8f54-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:589f536f9f9f513f4472136fe0af3ab5d7a4b38597a8eb3ac421dee99dfa5287 +size 470840 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/data/year=2022/month=12/00001-9-981b110e-f66f-4474-a2a5-ad5e580e8f54-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/data/year=2022/month=12/00001-9-981b110e-f66f-4474-a2a5-ad5e580e8f54-0-00001.parquet new file mode 100644 index 00000000000..7bc15e32c9f --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/data/year=2022/month=12/00001-9-981b110e-f66f-4474-a2a5-ad5e580e8f54-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:bb248908f3f23a6922e6d9ac97288154d1ccabf741666ade9fc8ae6a054822fa +size 285118 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/00000-9635bc47-14fb-4a05-b8af-2c9f5986dfe6.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/00000-9635bc47-14fb-4a05-b8af-2c9f5986dfe6.metadata.json new file mode 100644 index 00000000000..8010b7d780f --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/00000-9635bc47-14fb-4a05-b8af-2c9f5986dfe6.metadata.json @@ -0,0 +1,111 @@ +{ + "format-version" : 2, + "table-uuid" : "27a1c139-084c-430b-9d0e-c08f18f89b42", + "location" : "s3://warehouse/sales/sales_partitioned", + "last-sequence-number" : 1, + "last-updated-ms" : 1716234546921, + "last-column-id" : 7, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item_Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units_Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit_Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order_Date", + "required" : false, + "type" : "timestamptz" + }, { + "id" : 6, + "name" : "year", + "required" : false, + "type" : "int" + }, { + "id" : 7, + "name" : "month", + "required" : false, + "type" : "int" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ { + "name" : "year", + "transform" : "identity", + "source-id" : 6, + "field-id" : 1000 + }, { + "name" : "month", + "transform" : "identity", + "source-id" : 7, + "field-id" : 1001 + } ] + } ], + "last-partition-id" : 1001, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-05-20T19:49:06.309740092Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 5230665555732911945, + "refs" : { + "main" : { + "snapshot-id" : 5230665555732911945, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 5230665555732911945, + "timestamp-ms" : 1716234546921, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "2", + "added-records" : "100000", + "added-files-size" : "755958", + "changed-partition-count" : "2", + "total-records" : "100000", + "total-files-size" : "755958", + "total-data-files" : "2", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_partitioned/metadata/snap-5230665555732911945-1-4c305b6c-008f-407a-8e3b-ea1187f6421e.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1716234546921, + "snapshot-id" : 5230665555732911945 + } ], + "metadata-log" : [ ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/4c305b6c-008f-407a-8e3b-ea1187f6421e-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/4c305b6c-008f-407a-8e3b-ea1187f6421e-m0.avro new file mode 100644 index 00000000000..3b11294bc10 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/4c305b6c-008f-407a-8e3b-ea1187f6421e-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/snap-5230665555732911945-1-4c305b6c-008f-407a-8e3b-ea1187f6421e.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/snap-5230665555732911945-1-4c305b6c-008f-407a-8e3b-ea1187f6421e.avro new file mode 100644 index 00000000000..17af22ffd9f Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_partitioned/metadata/snap-5230665555732911945-1-4c305b6c-008f-407a-8e3b-ea1187f6421e.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/data/00000-2-1b3d5352-57ce-4968-b83d-30b13e1e7151-0-00001.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/data/00000-2-1b3d5352-57ce-4968-b83d-30b13e1e7151-0-00001.parquet new file mode 100644 index 00000000000..6394c55b6b2 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/data/00000-2-1b3d5352-57ce-4968-b83d-30b13e1e7151-0-00001.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:bd1139eaf58ad06a1aaaf2d9d8505225d4dd7ac9e7a40f8f4b324ec364e76de5 +size 729342 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/00000-3a7beb71-ae4e-42dd-ab04-884f6fd52372.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/00000-3a7beb71-ae4e-42dd-ab04-884f6fd52372.metadata.json new file mode 100644 index 00000000000..0e56acb0f0d --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/00000-3a7beb71-ae4e-42dd-ab04-884f6fd52372.metadata.json @@ -0,0 +1,91 @@ +{ + "format-version" : 2, + "table-uuid" : "f1d2074d-58b0-4087-8edb-8d85f1472553", + "location" : "s3://warehouse/sales/sales_single", + "last-sequence-number" : 1, + "last-updated-ms" : 1716234544074, + "last-column-id" : 5, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "Region", + "required" : false, + "type" : "string" + }, { + "id" : 2, + "name" : "Item_Type", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "Units_Sold", + "required" : false, + "type" : "int" + }, { + "id" : 4, + "name" : "Unit_Price", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "Order_Date", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root", + "created-at" : "2024-05-20T19:49:02.681248048Z", + "write.format.default" : "parquet", + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 5481576066981634597, + "refs" : { + "main" : { + "snapshot-id" : 5481576066981634597, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 5481576066981634597, + "timestamp-ms" : 1716234544074, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1716234462547", + "added-data-files" : "1", + "added-records" : "100000", + "added-files-size" : "729342", + "changed-partition-count" : "1", + "total-records" : "100000", + "total-files-size" : "729342", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sales/sales_single/metadata/snap-5481576066981634597-1-3395af41-65d8-41f0-8674-13624d6129c6.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1716234544074, + "snapshot-id" : 5481576066981634597 + } ], + "metadata-log" : [ ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/3395af41-65d8-41f0-8674-13624d6129c6-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/3395af41-65d8-41f0-8674-13624d6129c6-m0.avro new file mode 100644 index 00000000000..3a70db60f6c Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/3395af41-65d8-41f0-8674-13624d6129c6-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/snap-5481576066981634597-1-3395af41-65d8-41f0-8674-13624d6129c6.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/snap-5481576066981634597-1-3395af41-65d8-41f0-8674-13624d6129c6.avro new file mode 100644 index 00000000000..a7720ef0bd7 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sales/sales_single/metadata/snap-5481576066981634597-1-3395af41-65d8-41f0-8674-13624d6129c6.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/ac528ac1-dea2-472b-b72b-48c4fdccc8bd.parquet b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/ac528ac1-dea2-472b-b72b-48c4fdccc8bd.parquet new file mode 100644 index 00000000000..6bcedd0c6c5 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/ac528ac1-dea2-472b-b72b-48c4fdccc8bd.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:1d28097102526b4f6ecbcac552da69348448a28f8909c155527a6b1faccffef6 +size 4239 diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/00000-bbb283f2-4659-434a-b422-227b8b15dfb9.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/00000-bbb283f2-4659-434a-b422-227b8b15dfb9.metadata.json new file mode 100644 index 00000000000..ec2c53dc065 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/00000-bbb283f2-4659-434a-b422-227b8b15dfb9.metadata.json @@ -0,0 +1,100 @@ +{ + "format-version" : 2, + "table-uuid" : "059cfc0e-7f89-4be3-913d-b1d451736783", + "location" : "s3://warehouse/sample/all_types", + "last-sequence-number" : 0, + "last-updated-ms" : 1717446052895, + "last-column-id" : 13, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "booleanField", + "required" : false, + "type" : "boolean" + }, { + "id" : 2, + "name" : "integerField", + "required" : false, + "type" : "int" + }, { + "id" : 3, + "name" : "longField", + "required" : false, + "type" : "long" + }, { + "id" : 4, + "name" : "floatField", + "required" : false, + "type" : "float" + }, { + "id" : 5, + "name" : "doubleField", + "required" : false, + "type" : "double" + }, { + "id" : 6, + "name" : "stringField", + "required" : false, + "type" : "string" + }, { + "id" : 7, + "name" : "dateField", + "required" : false, + "type" : "date" + }, { + "id" : 8, + "name" : "timeField", + "required" : false, + "type" : "time" + }, { + "id" : 9, + "name" : "timestampField", + "required" : false, + "type" : "timestamp" + }, { + "id" : 10, + "name" : "decimalField", + "required" : false, + "type" : "decimal(9, 4)" + }, { + "id" : 11, + "name" : "fixedField", + "required" : false, + "type" : "fixed[10]" + }, { + "id" : 12, + "name" : "binaryField", + "required" : false, + "type" : "binary" + }, { + "id" : 13, + "name" : "instantField", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : -1, + "refs" : { }, + "snapshots" : [ ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ ], + "metadata-log" : [ ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/00001-15b8eb1c-5cf6-4266-9f0c-5ae723cd77ba.metadata.json b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/00001-15b8eb1c-5cf6-4266-9f0c-5ae723cd77ba.metadata.json new file mode 100644 index 00000000000..c9e2ea56748 --- /dev/null +++ b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/00001-15b8eb1c-5cf6-4266-9f0c-5ae723cd77ba.metadata.json @@ -0,0 +1,130 @@ +{ + "format-version" : 2, + "table-uuid" : "059cfc0e-7f89-4be3-913d-b1d451736783", + "location" : "s3://warehouse/sample/all_types", + "last-sequence-number" : 1, + "last-updated-ms" : 1717446059625, + "last-column-id" : 13, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "booleanField", + "required" : false, + "type" : "boolean" + }, { + "id" : 2, + "name" : "integerField", + "required" : false, + "type" : "int" + }, { + "id" : 3, + "name" : "longField", + "required" : false, + "type" : "long" + }, { + "id" : 4, + "name" : "floatField", + "required" : false, + "type" : "float" + }, { + "id" : 5, + "name" : "doubleField", + "required" : false, + "type" : "double" + }, { + "id" : 6, + "name" : "stringField", + "required" : false, + "type" : "string" + }, { + "id" : 7, + "name" : "dateField", + "required" : false, + "type" : "date" + }, { + "id" : 8, + "name" : "timeField", + "required" : false, + "type" : "time" + }, { + "id" : 9, + "name" : "timestampField", + "required" : false, + "type" : "timestamp" + }, { + "id" : 10, + "name" : "decimalField", + "required" : false, + "type" : "decimal(9, 4)" + }, { + "id" : 11, + "name" : "fixedField", + "required" : false, + "type" : "fixed[10]" + }, { + "id" : 12, + "name" : "binaryField", + "required" : false, + "type" : "binary" + }, { + "id" : 13, + "name" : "instantField", + "required" : false, + "type" : "timestamptz" + } ] + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ ] + } ], + "last-partition-id" : 999, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "write.parquet.compression-codec" : "zstd" + }, + "current-snapshot-id" : 6186754175552482648, + "refs" : { + "main" : { + "snapshot-id" : 6186754175552482648, + "type" : "branch" + } + }, + "snapshots" : [ { + "sequence-number" : 1, + "snapshot-id" : 6186754175552482648, + "timestamp-ms" : 1717446059625, + "summary" : { + "operation" : "append", + "added-data-files" : "1", + "added-records" : "10", + "added-files-size" : "4239", + "changed-partition-count" : "1", + "total-records" : "10", + "total-files-size" : "4239", + "total-data-files" : "1", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "s3://warehouse/sample/all_types/metadata/snap-6186754175552482648-1-388d67e5-5760-40c1-a460-c8c4ee5b8629.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "partition-statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1717446059625, + "snapshot-id" : 6186754175552482648 + } ], + "metadata-log" : [ { + "timestamp-ms" : 1717446052895, + "metadata-file" : "s3://warehouse/sample/all_types/metadata/00000-bbb283f2-4659-434a-b422-227b8b15dfb9.metadata.json" + } ] +} \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/388d67e5-5760-40c1-a460-c8c4ee5b8629-m0.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/388d67e5-5760-40c1-a460-c8c4ee5b8629-m0.avro new file mode 100644 index 00000000000..ebe516d1779 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/388d67e5-5760-40c1-a460-c8c4ee5b8629-m0.avro differ diff --git a/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/snap-6186754175552482648-1-388d67e5-5760-40c1-a460-c8c4ee5b8629.avro b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/snap-6186754175552482648-1-388d67e5-5760-40c1-a460-c8c4ee5b8629.avro new file mode 100644 index 00000000000..cc27bc3f4f2 Binary files /dev/null and b/extensions/iceberg/s3/src/test/resources/warehouse/sample/all_types/metadata/snap-6186754175552482648-1-388d67e5-5760-40c1-a460-c8c4ee5b8629.avro differ diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java new file mode 100644 index 00000000000..f5334cf866c --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -0,0 +1,145 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.layout; + +import io.deephaven.base.FileUtils; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; +import io.deephaven.iceberg.location.IcebergTableLocationKey; +import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; +import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.parquet.table.ParquetInstructions; +import org.apache.iceberg.*; +import org.apache.iceberg.io.FileIO; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public abstract class IcebergBaseLayout implements TableLocationKeyFinder { + /** + * The {@link TableDefinition} that will be used for the table. + */ + final TableDefinition tableDef; + + /** + * The Iceberg {@link Table} to discover locations for. + */ + final Table table; + + /** + * The {@link Snapshot} to discover locations for. + */ + final Snapshot snapshot; + + /** + * The {@link FileIO} to use for passing to the catalog reading manifest data files. + */ + final FileIO fileIO; + + /** + * The instructions for customizations while reading. + */ + final IcebergInstructions instructions; + + /** + * A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent. + */ + final Map cache; + + /** + * The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only + * accessed while synchronized on {@code this}. + */ + ParquetInstructions parquetInstructions; + + protected IcebergTableLocationKey locationKey( + final org.apache.iceberg.FileFormat format, + final URI fileUri, + @Nullable final Map> partitions) { + + if (format == org.apache.iceberg.FileFormat.PARQUET) { + if (parquetInstructions == null) { + // Start with user-supplied instructions (if provided). + final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); + + // Add the table definition. + builder.setTableDefinition(tableDef); + + // Add any column rename mappings. + if (!instructions.columnRenames().isEmpty()) { + for (Map.Entry entry : instructions.columnRenames().entrySet()) { + builder.addColumnNameMapping(entry.getKey(), entry.getValue()); + } + } + + // Add the data instructions. + instructions.dataInstructions().ifPresent(builder::setSpecialInstructions); + + parquetInstructions = builder.build(); + } + return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions); + } + throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'", + table, snapshot.snapshotId(), format, fileUri)); + } + + /** + * @param tableDef The {@link TableDefinition} that will be used for the table. + * @param table The {@link Table} to discover locations for. + * @param tableSnapshot The {@link Snapshot} from which to discover data files. + * @param fileIO The file IO to use for reading manifest data files. + * @param instructions The instructions for customizations while reading. + */ + public IcebergBaseLayout( + @NotNull final TableDefinition tableDef, + @NotNull final Table table, + @NotNull final Snapshot tableSnapshot, + @NotNull final FileIO fileIO, + @NotNull final IcebergInstructions instructions) { + this.tableDef = tableDef; + this.table = table; + this.snapshot = tableSnapshot; + this.fileIO = fileIO; + this.instructions = instructions; + + this.cache = new HashMap<>(); + } + + abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri); + + @Override + public synchronized void findKeys(@NotNull final Consumer locationKeyObserver) { + try { + // Retrieve the manifest files from the snapshot + final List manifestFiles = snapshot.allManifests(fileIO); + for (final ManifestFile manifestFile : manifestFiles) { + // Currently only can process manifest files with DATA content type. + if (manifestFile.content() != ManifestContent.DATA) { + throw new TableDataException( + String.format("%s:%d - only DATA manifest files are currently supported, encountered %s", + table, snapshot.snapshotId(), manifestFile.content())); + } + try (final ManifestReader reader = ManifestFiles.read(manifestFile, fileIO)) { + for (DataFile df : reader) { + final URI fileUri = FileUtils.convertToURI(df.path().toString(), false); + final IcebergTableLocationKey locationKey = + cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri)); + if (locationKey != null) { + locationKeyObserver.accept(locationKey); + } + } + } + } + } catch (final Exception e) { + throw new TableDataException( + String.format("%s:%d - error finding Iceberg locations", table, snapshot.snapshotId()), e); + } + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java new file mode 100644 index 00000000000..ac4c19283f9 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java @@ -0,0 +1,46 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.layout; + +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; +import io.deephaven.iceberg.location.IcebergTableLocationKey; +import io.deephaven.iceberg.util.IcebergInstructions; +import org.apache.iceberg.*; +import org.apache.iceberg.io.FileIO; +import org.jetbrains.annotations.NotNull; + +import java.net.URI; + +/** + * Iceberg {@link TableLocationKeyFinder location finder} for tables without partitions that will discover data files + * from a {@link Snapshot} + */ +public final class IcebergFlatLayout extends IcebergBaseLayout { + /** + * @param tableDef The {@link TableDefinition} that will be used for the table. + * @param table The {@link Table} to discover locations for. + * @param tableSnapshot The {@link Snapshot} from which to discover data files. + * @param fileIO The file IO to use for reading manifest data files. + * @param instructions The instructions for customizations while reading. + */ + public IcebergFlatLayout( + @NotNull final TableDefinition tableDef, + @NotNull final Table table, + @NotNull final Snapshot tableSnapshot, + @NotNull final FileIO fileIO, + @NotNull final IcebergInstructions instructions) { + super(tableDef, table, tableSnapshot, fileIO, instructions); + } + + @Override + public String toString() { + return IcebergFlatLayout.class.getSimpleName() + '[' + table.name() + ']'; + } + + @Override + IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) { + return locationKey(df.format(), fileUri, null); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java new file mode 100644 index 00000000000..47ec05dfd74 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -0,0 +1,104 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.layout; + +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; +import io.deephaven.iceberg.location.IcebergTableLocationKey; +import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.util.type.TypeUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.iceberg.*; +import org.apache.iceberg.io.FileIO; +import org.jetbrains.annotations.NotNull; + +import java.net.URI; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from + * a {@link Snapshot} + */ +public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout { + private class ColumnData { + final String name; + final Class type; + final int index; + + public ColumnData(String name, Class type, int index) { + this.name = name; + this.type = type; + this.index = index; + } + } + + private final List outputPartitioningColumns; + + /** + * @param tableDef The {@link TableDefinition} that will be used for the table. + * @param table The {@link Table} to discover locations for. + * @param tableSnapshot The {@link Snapshot} from which to discover data files. + * @param fileIO The file IO to use for reading manifest data files. + * @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table. + * @param instructions The instructions for customizations while reading. + */ + public IcebergKeyValuePartitionedLayout( + @NotNull final TableDefinition tableDef, + @NotNull final org.apache.iceberg.Table table, + @NotNull final org.apache.iceberg.Snapshot tableSnapshot, + @NotNull final FileIO fileIO, + @NotNull final PartitionSpec partitionSpec, + @NotNull final IcebergInstructions instructions) { + super(tableDef, table, tableSnapshot, fileIO, instructions); + + // We can assume due to upstream validation that there are no duplicate names (after renaming) that are included + // in the output definition, so we can ignore duplicates. + final MutableInt icebergIndex = new MutableInt(0); + final Map availablePartitioningColumns = partitionSpec.fields().stream() + .map(PartitionField::name) + .map(name -> instructions.columnRenames().getOrDefault(name, name)) + .collect(Collectors.toMap( + name -> name, + name -> icebergIndex.getAndIncrement(), + (v1, v2) -> v1, + LinkedHashMap::new)); + + outputPartitioningColumns = tableDef.getColumnStream() + .map((final ColumnDefinition columnDef) -> { + final Integer index = availablePartitioningColumns.get(columnDef.getName()); + if (index == null) { + return null; + } + return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + @Override + public String toString() { + return IcebergKeyValuePartitionedLayout.class.getSimpleName() + '[' + table.name() + ']'; + } + + @Override + IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) { + final Map> partitions = new LinkedHashMap<>(); + + final PartitionData partitionData = (PartitionData) df.partition(); + for (final ColumnData colData : outputPartitioningColumns) { + final String colName = colData.name; + final Object colValue = partitionData.get(colData.index); + if (colValue != null && !colData.type.isAssignableFrom(colValue.getClass())) { + throw new TableDataException("Partitioning column " + colName + + " has type " + colValue.getClass().getName() + + " but expected " + colData.type.getName()); + } + partitions.put(colName, (Comparable) colValue); + } + return locationKey(df.format(), fileUri, partitions); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java new file mode 100644 index 00000000000..dc91d1c45fd --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java @@ -0,0 +1,33 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.location; + +import io.deephaven.engine.table.impl.locations.TableKey; +import io.deephaven.engine.table.impl.locations.TableLocation; +import io.deephaven.engine.table.impl.locations.impl.TableLocationFactory; +import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.location.ParquetTableLocation; +import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * {@link TableLocationFactory} for Iceberg {@link TableLocation}s. + */ +public final class IcebergTableLocationFactory implements TableLocationFactory { + public IcebergTableLocationFactory() {} + + @Override + @NotNull + public TableLocation makeLocation(@NotNull final TableKey tableKey, + @NotNull final IcebergTableLocationKey locationKey, + @Nullable final TableDataRefreshService refreshService) { + if (locationKey instanceof IcebergTableParquetLocationKey) { + return new ParquetTableLocation(tableKey, (ParquetTableLocationKey) locationKey, + (ParquetInstructions) locationKey.readInstructions()); + } + throw new UnsupportedOperationException("Unsupported location key type: " + locationKey.getClass()); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationKey.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationKey.java new file mode 100644 index 00000000000..d6d5d4fb514 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationKey.java @@ -0,0 +1,18 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.location; + +import io.deephaven.engine.table.impl.locations.TableLocationKey; + +/** + * {@link TableLocationKey} implementation for use with data stored in Iceberg tables. + */ +public interface IcebergTableLocationKey extends TableLocationKey { + /** + * Get the read instructions for the location. + * + * @return the read instructions + */ + Object readInstructions(); +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java new file mode 100644 index 00000000000..e356d0ecb92 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.location; + +import io.deephaven.engine.table.impl.locations.TableLocationKey; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.Map; + +/** + * {@link TableLocationKey} implementation for use with data stored in Iceberg tables in the parquet format. + */ +public class IcebergTableParquetLocationKey extends ParquetTableLocationKey implements IcebergTableLocationKey { + private static final String IMPLEMENTATION_NAME = IcebergTableParquetLocationKey.class.getSimpleName(); + + private final ParquetInstructions readInstructions; + + /** + * Construct a new IcebergTableParquetLocationKey for the supplied {@code fileUri} and {@code partitions}. + * + * @param fileUri The file that backs the keyed location + * @param order Explicit ordering index, taking precedence over other fields + * @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this + * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will + * be made, so the calling code is free to mutate the map after this call + * @param readInstructions the instructions for customizations while reading + */ + public IcebergTableParquetLocationKey( + @NotNull final URI fileUri, + final int order, + @Nullable final Map> partitions, + @NotNull final ParquetInstructions readInstructions) { + super(fileUri, order, partitions, readInstructions); + this.readInstructions = readInstructions; + } + + @Override + public String getImplementationName() { + return IMPLEMENTATION_NAME; + } + + @Override + public ParquetInstructions readInstructions() { + return readInstructions; + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java new file mode 100644 index 00000000000..c379c715c6d --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -0,0 +1,462 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.table.*; +import io.deephaven.engine.table.impl.PartitionAwareSourceTable; +import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider; +import io.deephaven.engine.table.impl.locations.impl.PollingTableLocationProvider; +import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; +import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; +import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService; +import io.deephaven.engine.table.impl.sources.InMemoryColumnSource; +import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl; +import io.deephaven.engine.updategraph.UpdateSourceRegistrar; +import io.deephaven.iceberg.layout.IcebergFlatLayout; +import io.deephaven.iceberg.layout.IcebergKeyValuePartitionedLayout; +import io.deephaven.iceberg.location.IcebergTableLocationFactory; +import io.deephaven.iceberg.location.IcebergTableLocationKey; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +public class IcebergCatalogAdapter { + private final Catalog catalog; + private final FileIO fileIO; + + /** + * Construct an IcebergCatalogAdapter from a catalog and file IO. + */ + IcebergCatalogAdapter( + @NotNull final Catalog catalog, + @NotNull final FileIO fileIO) { + this.catalog = catalog; + this.fileIO = fileIO; + } + + /** + * Create a single {@link TableDefinition} from a given Schema, PartitionSpec, and TableDefinition. Takes into + * account {@link Map column rename instructions} + * + * @param schema The schema of the table. + * @param partitionSpec The partition specification of the table. + * @param tableDefinition The table definition. + * @param columnRename The map for renaming columns. + * @return The generated TableDefinition. + */ + private static TableDefinition fromSchema( + @NotNull final Schema schema, + @NotNull final PartitionSpec partitionSpec, + @Nullable final TableDefinition tableDefinition, + @NotNull final Map columnRename) { + + final Set columnNames = tableDefinition != null + ? tableDefinition.getColumnNameSet() + : null; + + final Set partitionNames = + partitionSpec.fields().stream() + .map(PartitionField::name) + .map(colName -> columnRename.getOrDefault(colName, colName)) + .collect(Collectors.toSet()); + + final List> columns = new ArrayList<>(); + + for (final Types.NestedField field : schema.columns()) { + final String name = columnRename.getOrDefault(field.name(), field.name()); + // Skip columns that are not in the provided table definition. + if (columnNames != null && !columnNames.contains(name)) { + continue; + } + final Type type = field.type(); + final io.deephaven.qst.type.Type qstType = convertPrimitiveType(type); + final ColumnDefinition column; + if (partitionNames.contains(name)) { + column = ColumnDefinition.of(name, qstType).withPartitioning(); + } else { + column = ColumnDefinition.of(name, qstType); + } + columns.add(column); + } + + return TableDefinition.of(columns); + } + + /** + * Convert an Iceberg data type to a Deephaven type. + * + * @param icebergType The Iceberg data type to be converted. + * @return The converted Deephaven type. + */ + static io.deephaven.qst.type.Type convertPrimitiveType(@NotNull final Type icebergType) { + final Type.TypeID typeId = icebergType.typeId(); + switch (typeId) { + case BOOLEAN: + return io.deephaven.qst.type.Type.booleanType().boxedType(); + case DOUBLE: + return io.deephaven.qst.type.Type.doubleType(); + case FLOAT: + return io.deephaven.qst.type.Type.floatType(); + case INTEGER: + return io.deephaven.qst.type.Type.intType(); + case LONG: + return io.deephaven.qst.type.Type.longType(); + case STRING: + return io.deephaven.qst.type.Type.stringType(); + case TIMESTAMP: + final Types.TimestampType timestampType = (Types.TimestampType) icebergType; + return timestampType.shouldAdjustToUTC() + ? io.deephaven.qst.type.Type.find(Instant.class) + : io.deephaven.qst.type.Type.find(LocalDateTime.class); + case DATE: + return io.deephaven.qst.type.Type.find(java.time.LocalDate.class); + case TIME: + return io.deephaven.qst.type.Type.find(java.time.LocalTime.class); + case DECIMAL: + return io.deephaven.qst.type.Type.find(java.math.BigDecimal.class); + case FIXED: // Fall through + case BINARY: + return io.deephaven.qst.type.Type.find(byte[].class); + case UUID: // Fall through + case STRUCT: // Fall through + case LIST: // Fall through + case MAP: // Fall through + default: + throw new TableDataException("Unsupported iceberg column type " + typeId.name()); + } + } + + /** + * List all {@link Namespace namespaces} in the catalog. This method is only supported if the catalog implements + * {@link SupportsNamespaces} for namespace discovery. See {@link SupportsNamespaces#listNamespaces(Namespace)}. + * + * @return A list of all namespaces. + */ + public List listNamespaces() { + return listNamespaces(Namespace.empty()); + } + + /** + * List all {@link Namespace namespaces} in a given namespace. This method is only supported if the catalog + * implements {@link SupportsNamespaces} for namespace discovery. See + * {@link SupportsNamespaces#listNamespaces(Namespace)}. + * + * @param namespace The namespace to list namespaces in. + * @return A list of all namespaces in the given namespace. + */ + public List listNamespaces(@NotNull final Namespace namespace) { + if (catalog instanceof SupportsNamespaces) { + final SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + return nsCatalog.listNamespaces(namespace); + } + throw new UnsupportedOperationException(String.format( + "%s does not implement org.apache.iceberg.catalog.SupportsNamespaces", catalog.getClass().getName())); + } + + /** + * List all {@link Namespace namespaces} in the catalog as a Deephaven {@link Table table}. The resulting table will + * be static and contain the same information as {@link #listNamespaces()}. + * + * @return A {@link Table table} of all namespaces. + */ + public Table listNamespacesAsTable() { + return listNamespacesAsTable(Namespace.empty()); + } + + /** + * List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting + * table will be static and contain the same information as {@link #listNamespaces(Namespace)}. + * + * @return A {@link Table table} of all namespaces. + */ + public Table listNamespacesAsTable(@NotNull final Namespace namespace) { + final List namespaces = listNamespaces(namespace); + final long size = namespaces.size(); + + // Create and return a table containing the namespaces as strings + final Map> columnSourceMap = new LinkedHashMap<>(); + + // Create the column source(s) + final String[] namespaceArr = new String[(int) size]; + columnSourceMap.put("namespace", + InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null)); + + final Namespace[] namespaceObjectArr = new Namespace[(int) size]; + columnSourceMap.put("namespace_object", + InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceObjectArr, Namespace.class, null)); + + // Populate the column source arrays + for (int i = 0; i < size; i++) { + final Namespace ns = namespaces.get(i); + namespaceArr[i] = ns.toString(); + namespaceObjectArr[i] = ns; + } + + // Create and return the table + return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); + } + + /** + * List all Iceberg {@link TableIdentifier tables} in a given namespace. + * + * @param namespace The namespace to list tables in. + * @return A list of all tables in the given namespace. + */ + public List listTables(@NotNull final Namespace namespace) { + return catalog.listTables(namespace); + } + + /** + * List all Iceberg {@link TableIdentifier tables} in a given namespace as a Deephaven {@link Table table}. The + * resulting table will be static and contain the same information as {@link #listTables(Namespace)}. + * + * @param namespace The namespace from which to gather the tables + * @return A list of all tables in the given namespace. + */ + public Table listTablesAsTable(@NotNull final Namespace namespace) { + final List tableIdentifiers = listTables(namespace); + final long size = tableIdentifiers.size(); + + // Create and return a table containing the namespaces as strings + final Map> columnSourceMap = new LinkedHashMap<>(); + + // Create the column source(s) + final String[] namespaceArr = new String[(int) size]; + columnSourceMap.put("namespace", + InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null)); + + final String[] tableNameArr = new String[(int) size]; + columnSourceMap.put("table_name", + InMemoryColumnSource.getImmutableMemoryColumnSource(tableNameArr, String.class, null)); + + final TableIdentifier[] tableIdentifierArr = new TableIdentifier[(int) size]; + columnSourceMap.put("table_identifier_object", + InMemoryColumnSource.getImmutableMemoryColumnSource(tableIdentifierArr, TableIdentifier.class, null)); + + // Populate the column source arrays + for (int i = 0; i < size; i++) { + final TableIdentifier tableIdentifier = tableIdentifiers.get(i); + namespaceArr[i] = tableIdentifier.namespace().toString(); + tableNameArr[i] = tableIdentifier.name(); + tableIdentifierArr[i] = tableIdentifier; + } + + // Create and return the table + return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); + } + + /** + * List all {@link Snapshot snapshots} of a given Iceberg table. + * + * @param tableIdentifier The identifier of the table from which to gather snapshots. + * @return A list of all snapshots of the given table. + */ + public List listSnapshots(@NotNull final TableIdentifier tableIdentifier) { + final List snapshots = new ArrayList<>(); + catalog.loadTable(tableIdentifier).snapshots().forEach(snapshots::add); + return snapshots; + } + + /** + * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting + * table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}. + * + * @param tableIdentifier The identifier of the table from which to gather snapshots. + * @return A list of all tables in the given namespace. + */ + public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier) { + final List snapshots = listSnapshots(tableIdentifier); + final long size = snapshots.size(); + + // Create and return a table containing the namespaces as strings + final Map> columnSourceMap = new LinkedHashMap<>(); + + // Create the column source(s) + final long[] idArr = new long[(int) size]; + columnSourceMap.put("id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null)); + + final long[] timestampArr = new long[(int) size]; + columnSourceMap.put("timestamp_ms", + InMemoryColumnSource.getImmutableMemoryColumnSource(timestampArr, long.class, null)); + + final String[] operatorArr = new String[(int) size]; + columnSourceMap.put("operation", + InMemoryColumnSource.getImmutableMemoryColumnSource(operatorArr, String.class, null)); + + final Map[] summaryArr = new Map[(int) size]; + columnSourceMap.put("summary", + InMemoryColumnSource.getImmutableMemoryColumnSource(summaryArr, Map.class, null)); + + final Snapshot[] snapshotArr = new Snapshot[(int) size]; + columnSourceMap.put("snapshot_object", + InMemoryColumnSource.getImmutableMemoryColumnSource(snapshotArr, Snapshot.class, null)); + + // Populate the column source(s) + for (int i = 0; i < size; i++) { + final Snapshot snapshot = snapshots.get(i); + idArr[i] = snapshot.snapshotId(); + timestampArr[i] = snapshot.timestampMillis(); + operatorArr[i] = snapshot.operation(); + summaryArr[i] = snapshot.summary(); + snapshotArr[i] = snapshot; + } + + // Create and return the table + return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); + } + + /** + * Read the latest static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final TableIdentifier tableIdentifier, + @NotNull final IcebergInstructions instructions) { + return readTableInternal(tableIdentifier, null, instructions); + } + + /** + * Read a static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param tableSnapshotId The snapshot id to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final TableIdentifier tableIdentifier, + final long tableSnapshotId, + @NotNull final IcebergInstructions instructions) { + + // Find the snapshot with the given snapshot id + final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream() + .filter(snapshot -> snapshot.snapshotId() == tableSnapshotId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Snapshot with id " + tableSnapshotId + " not found")); + + return readTableInternal(tableIdentifier, tableSnapshot, instructions); + } + + /** + * Read a static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param tableSnapshot The {@link Snapshot snapshot} to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final TableIdentifier tableIdentifier, + @NotNull final Snapshot tableSnapshot, + @NotNull final IcebergInstructions instructions) { + return readTableInternal(tableIdentifier, tableSnapshot, instructions); + } + + private Table readTableInternal( + @NotNull final TableIdentifier tableIdentifier, + @Nullable final Snapshot tableSnapshot, + @NotNull final IcebergInstructions instructions) { + + // Load the table from the catalog + final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); + + // Do we want the latest or a specific snapshot? + final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); + final Schema schema = table.schemas().get(snapshot.schemaId()); + + // Load the partitioning schema + final org.apache.iceberg.PartitionSpec partitionSpec = table.spec(); + + // Get the user supplied table definition. + final TableDefinition userTableDef = instructions.tableDefinition().orElse(null); + + // Get the table definition from the schema (potentially limited by the user supplied table definition and + // applying column renames). + final TableDefinition icebergTableDef = + fromSchema(schema, partitionSpec, userTableDef, instructions.columnRenames()); + + // If the user supplied a table definition, make sure it's fully compatible. + final TableDefinition tableDef; + if (userTableDef != null) { + tableDef = icebergTableDef.checkCompatibility(userTableDef); + + // Ensure that the user has not marked non-partitioned columns as partitioned. + final Set userPartitionColumns = userTableDef.getPartitioningColumns().stream() + .map(ColumnDefinition::getName) + .collect(Collectors.toSet()); + final Set partitionColumns = tableDef.getPartitioningColumns().stream() + .map(ColumnDefinition::getName) + .collect(Collectors.toSet()); + + // The working partitioning column set must be a super-set of the user-supplied set. + if (!partitionColumns.containsAll(userPartitionColumns)) { + final Set invalidColumns = new HashSet<>(userPartitionColumns); + invalidColumns.removeAll(partitionColumns); + + throw new TableDataException("The following columns are not partitioned in the Iceberg table: " + + invalidColumns); + } + } else { + // Use the snapshot schema as the table definition. + tableDef = icebergTableDef; + } + + final String description; + final TableLocationKeyFinder keyFinder; + final TableDataRefreshService refreshService; + final UpdateSourceRegistrar updateSourceRegistrar; + + if (partitionSpec.isUnpartitioned()) { + // Create the flat layout location key finder + keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, instructions); + } else { + // Create the partitioning column location key finder + keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec, + instructions); + } + + refreshService = null; + updateSourceRegistrar = null; + description = "Read static iceberg table with " + keyFinder; + + final AbstractTableLocationProvider locationProvider = new PollingTableLocationProvider<>( + StandaloneTableKey.getInstance(), + keyFinder, + new IcebergTableLocationFactory(), + refreshService); + + final PartitionAwareSourceTable result = new PartitionAwareSourceTable( + tableDef, + description, + RegionedTableComponentFactoryImpl.INSTANCE, + locationProvider, + updateSourceRegistrar); + + return result; + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java new file mode 100644 index 00000000000..4788e0e8714 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java @@ -0,0 +1,56 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.engine.table.TableDefinition; +import org.immutables.value.Value.Immutable; + +import java.util.Map; +import java.util.Optional; + +/** + * This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in + * this class may change in the future. As such, callers may wish to explicitly set the values. + */ +@Immutable +@BuildableStyle +public abstract class IcebergInstructions { + public static Builder builder() { + return ImmutableIcebergInstructions.builder(); + } + + /** + * The {@link TableDefinition} to use when reading Iceberg data files. + */ + public abstract Optional tableDefinition(); + + /** + * The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). + */ + public abstract Optional dataInstructions(); + + /** + * A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg + * data files. + */ + public abstract Map columnRenames(); + + public interface Builder { + @SuppressWarnings("unused") + Builder tableDefinition(TableDefinition tableDefinition); + + @SuppressWarnings("unused") + Builder dataInstructions(Object s3Instructions); + + @SuppressWarnings("unused") + Builder putColumnRenames(String key, String value); + + @SuppressWarnings("unused") + Builder putAllColumnRenames(Map entries); + + IcebergInstructions build(); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java new file mode 100644 index 00000000000..bcdda326dca --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java @@ -0,0 +1,19 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.io.FileIO; + +/** + * Tools for accessing tables in the Iceberg table format. + */ +public abstract class IcebergTools { + @SuppressWarnings("unused") + public static IcebergCatalogAdapter createAdapter( + final Catalog catalog, + final FileIO fileIO) { + return new IcebergCatalogAdapter(catalog, fileIO); + } +} diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java new file mode 100644 index 00000000000..e62fbd282e0 --- /dev/null +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java @@ -0,0 +1,106 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.TestCatalog; + +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.io.FileIO; + +import java.io.File; +import java.util.*; + +public class IcebergTestCatalog implements Catalog, SupportsNamespaces { + private final Map> namespaceTableMap; + private final Map tableMap; + + private IcebergTestCatalog(final String path, final FileIO fileIO) { + namespaceTableMap = new HashMap<>(); + tableMap = new HashMap<>(); + + // Assume first level is namespace. + final File root = new File(path); + for (final File namespaceFile : root.listFiles()) { + if (namespaceFile.isDirectory()) { + final Namespace namespace = Namespace.of(namespaceFile.getName()); + namespaceTableMap.putIfAbsent(namespace, new HashMap<>()); + for (final File tableFile : namespaceFile.listFiles()) { + if (tableFile.isDirectory()) { + // Second level is table name. + final TableIdentifier tableId = TableIdentifier.of(namespace, tableFile.getName()); + final Table table = IcebergTestTable.loadFromMetadata(tableFile.getAbsolutePath(), fileIO); + + // Add it to the maps. + namespaceTableMap.get(namespace).put(tableId, table); + tableMap.put(tableId, table); + } + } + } + } + } + + public static IcebergTestCatalog create(final String path, final FileIO fileIO) { + return new IcebergTestCatalog(path, fileIO); + } + + @Override + public List listTables(Namespace namespace) { + if (namespaceTableMap.containsKey(namespace)) { + return new ArrayList<>(namespaceTableMap.get(namespace).keySet()); + } + return List.of(); + } + + @Override + public boolean dropTable(TableIdentifier tableIdentifier, boolean b) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void renameTable(TableIdentifier tableIdentifier, TableIdentifier tableIdentifier1) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public Table loadTable(TableIdentifier tableIdentifier) { + if (tableMap.containsKey(tableIdentifier)) { + return tableMap.get(tableIdentifier); + } + return null; + } + + @Override + public void createNamespace(Namespace namespace, Map map) { + + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return new ArrayList<>(namespaceTableMap.keySet()); + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + return Map.of(); + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + return false; + } + + @Override + public boolean setProperties(Namespace namespace, Map map) throws NoSuchNamespaceException { + return false; + } + + @Override + public boolean removeProperties(Namespace namespace, Set set) throws NoSuchNamespaceException { + return false; + } +} diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestFileIO.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestFileIO.java new file mode 100644 index 00000000000..03be6ca1b5e --- /dev/null +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestFileIO.java @@ -0,0 +1,49 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.TestCatalog; + +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Set; + +public class IcebergTestFileIO extends InMemoryFileIO { + private final Set inputFiles; + private final String matchPrefix; + private final String replacePrefix; + + public IcebergTestFileIO(final String matchPrefix, final String replacePrefix) { + this.matchPrefix = matchPrefix; + this.replacePrefix = replacePrefix; + inputFiles = new HashSet<>(); + } + + @Override + public InputFile newInputFile(String s) { + if (!inputFiles.contains(s)) { + try { + final String replaced = s.replace(matchPrefix, replacePrefix); + final byte[] data = Files.readAllBytes(Path.of(replaced)); + addFile(s, data); + inputFiles.add(s); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return super.newInputFile(s); + } + + @Override + public OutputFile newOutputFile(String s) { + return null; + } + + @Override + public void deleteFile(String s) {} +} diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java new file mode 100644 index 00000000000..d1cf5c2ee0e --- /dev/null +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java @@ -0,0 +1,239 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.TestCatalog; + +import org.apache.iceberg.*; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.jetbrains.annotations.NotNull; +import org.testcontainers.shaded.org.apache.commons.lang3.NotImplementedException; + +import java.io.File; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class IcebergTestTable implements Table { + private final TableMetadata metadata; + private final FileIO fileIO; + + private IcebergTestTable(@NotNull final String path, @NotNull final FileIO fileIO) { + final File metadataRoot = new File(path, "metadata"); + this.fileIO = fileIO; + + final List metadataFiles = new ArrayList<>(); + + // Get a list of the JSON files. + for (final File file : metadataRoot.listFiles()) { + if (!file.isDirectory() && file.getName().endsWith(".json")) { + metadataFiles.add(file.getAbsolutePath()); + } + } + + // The last entry after sorting will be the newest / current. + metadataFiles.sort(String::compareTo); + final Path tablePath = Path.of(metadataFiles.get(metadataFiles.size() - 1)); + try { + final String tableJson = new String(java.nio.file.Files.readAllBytes(tablePath)); + metadata = TableMetadataParser.fromJson(tableJson); + } catch (Exception e) { + throw new RuntimeException("Failed to read table file: " + tablePath, e); + } + } + + public static IcebergTestTable loadFromMetadata(@NotNull final String path, @NotNull final FileIO fileIO) { + return new IcebergTestTable(path, fileIO); + } + + @Override + public void refresh() {} + + @Override + public TableScan newScan() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Schema schema() { + return metadata.schema(); + } + + @Override + public Map schemas() { + final Map schemaMap = new java.util.HashMap<>(); + final List schemas = metadata.schemas(); + for (int i = 0; i < schemas.size(); i++) { + schemaMap.put(i, schemas.get(i)); + } + return schemaMap; + } + + @Override + public PartitionSpec spec() { + return metadata.spec(); + } + + @Override + public Map specs() { + final List partitionSpecs = metadata.specs(); + final Map specMap = new java.util.HashMap<>(); + for (int i = 0; i < partitionSpecs.size(); i++) { + specMap.put(i, partitionSpecs.get(i)); + } + return specMap; + } + + @Override + public SortOrder sortOrder() { + return metadata.sortOrder(); + } + + @Override + public Map sortOrders() { + final List sortOrders = metadata.sortOrders(); + final Map sortOrderMap = new java.util.HashMap<>(); + for (int i = 0; i < sortOrders.size(); i++) { + sortOrderMap.put(i, sortOrders.get(i)); + } + return sortOrderMap; + } + + @Override + public Map properties() { + return metadata.properties(); + } + + @Override + public String location() { + return metadata.location(); + } + + @Override + public Snapshot currentSnapshot() { + return metadata.currentSnapshot(); + } + + @Override + public Snapshot snapshot(long l) { + final List snapshots = metadata.snapshots(); + for (final Snapshot snapshot : snapshots) { + if (snapshot.snapshotId() == l) { + return snapshot; + } + } + return null; + } + + @Override + public Iterable snapshots() { + return metadata.snapshots(); + } + + @Override + public List history() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public UpdateSchema updateSchema() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public UpdatePartitionSpec updateSpec() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public UpdateProperties updateProperties() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public ReplaceSortOrder replaceSortOrder() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public UpdateLocation updateLocation() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public AppendFiles newAppend() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public RewriteFiles newRewrite() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public RewriteManifests rewriteManifests() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public OverwriteFiles newOverwrite() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public RowDelta newRowDelta() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public ReplacePartitions newReplacePartitions() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public DeleteFiles newDelete() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public ExpireSnapshots expireSnapshots() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public ManageSnapshots manageSnapshots() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public Transaction newTransaction() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + public EncryptionManager encryption() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public LocationProvider locationProvider() { + throw new NotImplementedException("Not implemented"); + } + + @Override + public List statisticsFiles() { + return metadata.statisticsFiles(); + } + + @Override + public Map refs() { + throw new NotImplementedException("Not implemented"); + } +} diff --git a/extensions/parquet/table/build.gradle b/extensions/parquet/table/build.gradle index 6d594e29a41..1c80142c1d8 100644 --- a/extensions/parquet/table/build.gradle +++ b/extensions/parquet/table/build.gradle @@ -48,9 +48,7 @@ dependencies { testImplementation TestTools.projectDependency(project, 'extensions-s3') Classpaths.inheritJUnitClassic(project, 'testImplementation') - testImplementation "org.testcontainers:testcontainers:1.19.4" - testImplementation "org.testcontainers:localstack:1.19.4" - testImplementation "org.testcontainers:minio:1.19.4" + Classpaths.inheritTestContainers(project) testRuntimeOnly project(':log-to-slf4j'), project(path: ':configs'), diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index 3b28644c691..069ddd4dac6 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -87,7 +87,7 @@ public static void setDefaultMaximumDictionarySize(final int maximumDictionarySi /** * @return The default for {@link #getMaximumDictionarySize()} */ - public static int getDefaltMaximumDictionarySize() { + public static int getDefaultMaximumDictionarySize() { return defaultMaximumDictionarySize; } @@ -147,7 +147,7 @@ public enum ParquetFileLayout { *
  • A single parquet {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file * */ - METADATA_PARTITIONED; + METADATA_PARTITIONED } private static final boolean DEFAULT_GENERATE_METADATA_FILES = false; @@ -257,10 +257,10 @@ public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions if (i2 == EMPTY) { return true; } - return ((ReadOnly) i2).columnNameToInstructions.size() == 0; + return ((ReadOnly) i2).columnNameToInstructions.isEmpty(); } if (i2 == EMPTY) { - return ((ReadOnly) i1).columnNameToInstructions.size() == 0; + return ((ReadOnly) i1).columnNameToInstructions.isEmpty(); } return ReadOnly.sameCodecMappings((ReadOnly) i1, (ReadOnly) i2); } @@ -723,7 +723,7 @@ public Builder(final ParquetInstructions parquetInstructions) { } private void newColumnNameToInstructionsMap() { - columnNameToInstructions = new KeyedObjectHashMap<>(new KeyedObjectKey.Basic() { + columnNameToInstructions = new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() { @Override public String getKey(@NotNull final ColumnInstructions value) { return value.getColumnName(); @@ -733,7 +733,7 @@ public String getKey(@NotNull final ColumnInstructions value) { private void newParquetColumnNameToInstructionsMap() { parquetColumnNameToInstructions = - new KeyedObjectHashMap<>(new KeyedObjectKey.Basic() { + new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() { @Override public String getKey(@NotNull final ColumnInstructions value) { return value.getParquetColumnName(); diff --git a/extensions/s3/build.gradle b/extensions/s3/build.gradle index 64954314387..7d1343c052b 100644 --- a/extensions/s3/build.gradle +++ b/extensions/s3/build.gradle @@ -13,9 +13,7 @@ dependencies { implementation project(':Configuration') implementation project(':log-factory') - implementation platform('software.amazon.awssdk:bom:2.23.19') - implementation 'software.amazon.awssdk:s3' - implementation 'software.amazon.awssdk:aws-crt-client' + Classpaths.inheritAWSSDK(project) compileOnly depAnnotations @@ -32,10 +30,7 @@ dependencies { testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'software.amazon.awssdk:s3-transfer-manager' - testImplementation "org.testcontainers:testcontainers:1.19.4" - testImplementation "org.testcontainers:junit-jupiter:1.19.4" - testImplementation "org.testcontainers:localstack:1.19.4" - testImplementation "org.testcontainers:minio:1.19.4" + Classpaths.inheritTestContainers(project) testRuntimeOnly project(':test-configs') testRuntimeOnly project(':log-to-slf4j') diff --git a/py/embedded-server/java-runtime/build.gradle b/py/embedded-server/java-runtime/build.gradle index 56111b847c5..b64ddaf2958 100644 --- a/py/embedded-server/java-runtime/build.gradle +++ b/py/embedded-server/java-runtime/build.gradle @@ -32,6 +32,7 @@ dependencies { if (!hasProperty('excludeS3')) { runtimeOnly project(':extensions-s3') + runtimeOnly project(':extensions-iceberg:s3') } } diff --git a/server/build.gradle b/server/build.gradle index 27f6f18aa0a..d3ccceabb13 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -9,6 +9,7 @@ dependencies { implementation project(':engine-table') implementation project(':extensions-csv') implementation project(':extensions-arrow') + implementation project(':extensions-iceberg') implementation project(':extensions-parquet-table') implementation project(':extensions-performance') implementation project(':extensions-jdbc') diff --git a/server/jetty-app-custom/build.gradle b/server/jetty-app-custom/build.gradle index ed64f1cdab7..11917d41326 100644 --- a/server/jetty-app-custom/build.gradle +++ b/server/jetty-app-custom/build.gradle @@ -55,6 +55,7 @@ if (!hasProperty('excludeSql')) { if (!hasProperty('excludeS3')) { dependencies { runtimeOnly project(':extensions-s3') + runtimeOnly project(':extensions-iceberg:s3') } } diff --git a/server/jetty-app/build.gradle b/server/jetty-app/build.gradle index 407235af4ae..39bb137af16 100644 --- a/server/jetty-app/build.gradle +++ b/server/jetty-app/build.gradle @@ -54,6 +54,7 @@ if (!hasProperty('excludeSql')) { if (!hasProperty('excludeS3')) { dependencies { runtimeOnly project(':extensions-s3') + runtimeOnly project(':extensions-iceberg:s3') } } diff --git a/server/netty-app/build.gradle b/server/netty-app/build.gradle index 15a180d998b..cd5b23ea1bb 100644 --- a/server/netty-app/build.gradle +++ b/server/netty-app/build.gradle @@ -54,6 +54,7 @@ if (!hasProperty('excludeSql')) { if (!hasProperty('excludeS3')) { dependencies { runtimeOnly project(':extensions-s3') + runtimeOnly project(':extensions-iceberg:s3') } } diff --git a/settings.gradle b/settings.gradle index 852e08360bd..9a240a942db 100644 --- a/settings.gradle +++ b/settings.gradle @@ -267,6 +267,12 @@ project(':extensions-trackedfile').projectDir = file('extensions/trackedfile') include(':extensions-s3') project(':extensions-s3').projectDir = file('extensions/s3') +include(':extensions-iceberg') +project(':extensions-iceberg').projectDir = file('extensions/iceberg') + +include(':extensions-iceberg:s3') +project(':extensions-iceberg:s3').projectDir = file('extensions/iceberg/s3') + include(':plugin') include(':plugin-dagger')