diff --git a/buildSrc/src/main/groovy/Classpaths.groovy b/buildSrc/src/main/groovy/Classpaths.groovy
index 89642b93605..429b59f525d 100644
--- a/buildSrc/src/main/groovy/Classpaths.groovy
+++ b/buildSrc/src/main/groovy/Classpaths.groovy
@@ -126,10 +126,10 @@ class Classpaths {
static final String HADOOP_VERSION = '3.4.0'
static final String ICEBERG_GROUP = 'org.apache.iceberg'
- static final String ICEBERG_VERSION = '1.5.0'
+ static final String ICEBERG_VERSION = '1.5.2'
static final String AWSSDK_GROUP = 'software.amazon.awssdk'
- static final String AWSSDK_VERSION = '2.23.19'
+ static final String AWSSDK_VERSION = '2.24.5'
static final String TESTCONTAINER_GROUP = 'org.testcontainers'
static final String TESTCONTAINER_VERSION = '1.19.4'
diff --git a/extensions/iceberg/s3/build.gradle b/extensions/iceberg/s3/build.gradle
index 359651ec7e6..be495b1373d 100644
--- a/extensions/iceberg/s3/build.gradle
+++ b/extensions/iceberg/s3/build.gradle
@@ -16,8 +16,10 @@ dependencies {
implementation project(':extensions-s3')
implementation "org.apache.iceberg:iceberg-aws"
- runtimeOnly "org.apache.iceberg:iceberg-aws-bundle"
+
Classpaths.inheritAWSSDK(project)
+ runtimeOnly "software.amazon.awssdk:sts"
+ runtimeOnly "software.amazon.awssdk:glue"
Classpaths.inheritTestContainers(project)
diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java
index 6f7845c43eb..166b47e5d28 100644
--- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java
+++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java
@@ -7,6 +7,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
@@ -23,6 +24,20 @@
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";
+ /**
+ * Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
+ * value, the system defaults will be used.
+ *
+ * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
+ * @param catalogURI the URI of the Iceberg REST catalog
+ * @param warehouseLocation the location of the S3 datafiles backing the catalog
+ * @param region the AWS region; if omitted, system defaults will be used
+ * @param accessKeyId the AWS access key ID; if omitted, system defaults will be used
+ * @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used
+ * @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service
+ * such as MinIO or LocalStack
+ * @return the Iceberg catalog adapter
+ */
public static IcebergCatalogAdapter createS3Rest(
@Nullable final String name,
@NotNull final String catalogURI,
@@ -53,7 +68,6 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}
- // TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider
final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);
final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
@@ -62,4 +76,36 @@ public static IcebergCatalogAdapter createS3Rest(
return new IcebergCatalogAdapter(catalog, fileIO);
}
+ /**
+ * Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region
+ * and credentials. These can be configured by following
+ * AWS Authentication and
+ * access credentials guide.
+ *
+ * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
+ * @param catalogURI the URI of the AWS Glue catalog
+ * @param warehouseLocation the location of the S3 datafiles backing the catalog
+ * @return the Iceberg catalog adapter
+ */
+ public static IcebergCatalogAdapter createGlue(
+ @Nullable final String name,
+ @NotNull final String catalogURI,
+ @NotNull final String warehouseLocation) {
+
+ // Set up the properties map for the Iceberg catalog
+ final Map properties = new HashMap<>();
+
+ final GlueCatalog catalog = new GlueCatalog();
+
+ properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
+ properties.put(CatalogProperties.URI, catalogURI);
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+
+ final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);
+
+ final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
+ catalog.initialize(catalogName, properties);
+
+ return new IcebergCatalogAdapter(catalog, fileIO);
+ }
}
diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java
index 0fd3b3fcf7e..7544976f27b 100644
--- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java
+++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java
@@ -129,19 +129,23 @@ public void testListTables() {
final Namespace ns = Namespace.of("sales");
- final Collection tables = adapter.listTables(ns);
+ Collection tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");
- final Table table = adapter.listTablesAsTable(ns);
+ Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class),
"table_identifier_object column type");
+
+ // Test the string versions of the methods
+ table = adapter.listTablesAsTable("sales");
+ Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
}
@Test
@@ -160,7 +164,7 @@ public void testListSnapshots() {
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");
- final Table table = adapter.listSnapshotsAsTable(tableIdentifier);
+ Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type");
Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type");
@@ -168,6 +172,10 @@ public void testListSnapshots() {
Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type");
Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class),
"snapshot_object column type");
+
+ // Test the string versions of the methods
+ table = adapter.listSnapshotsAsTable("sales.sales_multi");
+ Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
}
@Test
@@ -180,7 +188,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti
final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
- final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
+ io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
+
+ // Verify we retrieved all the rows.
+ Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
+
+ // Test the string versions of the methods
+ table = adapter.readTable("sales.sales_partitioned", instructions);
// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
@@ -196,9 +210,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti
final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi");
- final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
+ io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
+
+ // Test the string versions of the methods
+ table = adapter.readTable("sales.sales_multi", instructions);
+
+ // Verify we retrieved all the rows.
+ Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}
@Test
@@ -211,7 +231,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti
final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single");
- final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
+ io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
+
+ // Verify we retrieved all the rows.
+ Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
+
+ // Test the string versions of the methods
+ table = adapter.readTable("sales.sales_single", instructions);
// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
@@ -563,16 +589,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx
final List snapshots = adapter.listSnapshots(tableId);
// Verify we retrieved all the rows.
- final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
+ io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
+ Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");
+
+ io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
+ Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");
+
+ io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
+ Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");
+
+ io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
+ Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
+
+ // Test the string versions of the methods
+
+ // Verify we retrieved all the rows.
+ table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");
- final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
+ table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");
- final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
+ table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");
- final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
+ table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
}
diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
index c379c715c6d..c54535599c3 100644
--- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
+++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
@@ -55,7 +55,7 @@ public class IcebergCatalogAdapter {
/**
* Create a single {@link TableDefinition} from a given Schema, PartitionSpec, and TableDefinition. Takes into
- * account {@link Map column rename instructions}
+ * account {@link Map<> column rename instructions}
*
* @param schema The schema of the table.
* @param partitionSpec The partition specification of the table.
@@ -197,11 +197,11 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) {
// Create the column source(s)
final String[] namespaceArr = new String[(int) size];
- columnSourceMap.put("namespace",
+ columnSourceMap.put("Namespace",
InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null));
final Namespace[] namespaceObjectArr = new Namespace[(int) size];
- columnSourceMap.put("namespace_object",
+ columnSourceMap.put("NamespaceObject",
InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceObjectArr, Namespace.class, null));
// Populate the column source arrays
@@ -215,6 +215,16 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}
+ /**
+ * List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting
+ * table will be static and contain the same information as {@link #listNamespaces(Namespace)}.
+ *
+ * @return A {@link Table table} of all namespaces.
+ */
+ public Table listNamespacesAsTable(@NotNull final String... namespace) {
+ return listNamespacesAsTable(Namespace.of(namespace));
+ }
+
/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace.
*
@@ -241,15 +251,15 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) {
// Create the column source(s)
final String[] namespaceArr = new String[(int) size];
- columnSourceMap.put("namespace",
+ columnSourceMap.put("Namespace",
InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null));
final String[] tableNameArr = new String[(int) size];
- columnSourceMap.put("table_name",
+ columnSourceMap.put("TableName",
InMemoryColumnSource.getImmutableMemoryColumnSource(tableNameArr, String.class, null));
final TableIdentifier[] tableIdentifierArr = new TableIdentifier[(int) size];
- columnSourceMap.put("table_identifier_object",
+ columnSourceMap.put("TableIdentifierObject",
InMemoryColumnSource.getImmutableMemoryColumnSource(tableIdentifierArr, TableIdentifier.class, null));
// Populate the column source arrays
@@ -264,6 +274,10 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}
+ public Table listTablesAsTable(@NotNull final String... namespace) {
+ return listTablesAsTable(Namespace.of(namespace));
+ }
+
/**
* List all {@link Snapshot snapshots} of a given Iceberg table.
*
@@ -292,22 +306,22 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
// Create the column source(s)
final long[] idArr = new long[(int) size];
- columnSourceMap.put("id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null));
+ columnSourceMap.put("Id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null));
final long[] timestampArr = new long[(int) size];
- columnSourceMap.put("timestamp_ms",
+ columnSourceMap.put("TimestampMs",
InMemoryColumnSource.getImmutableMemoryColumnSource(timestampArr, long.class, null));
final String[] operatorArr = new String[(int) size];
- columnSourceMap.put("operation",
+ columnSourceMap.put("Operation",
InMemoryColumnSource.getImmutableMemoryColumnSource(operatorArr, String.class, null));
final Map[] summaryArr = new Map[(int) size];
- columnSourceMap.put("summary",
+ columnSourceMap.put("Summary",
InMemoryColumnSource.getImmutableMemoryColumnSource(summaryArr, Map.class, null));
final Snapshot[] snapshotArr = new Snapshot[(int) size];
- columnSourceMap.put("snapshot_object",
+ columnSourceMap.put("SnapshotObject",
InMemoryColumnSource.getImmutableMemoryColumnSource(snapshotArr, Snapshot.class, null));
// Populate the column source(s)
@@ -324,6 +338,17 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}
+ /**
+ * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting
+ * table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}.
+ *
+ * @param tableIdentifier The identifier of the table from which to gather snapshots.
+ * @return A list of all tables in the given namespace.
+ */
+ public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) {
+ return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier));
+ }
+
/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
@@ -334,10 +359,24 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
@SuppressWarnings("unused")
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
- @NotNull final IcebergInstructions instructions) {
+ @Nullable final IcebergInstructions instructions) {
return readTableInternal(tableIdentifier, null, instructions);
}
+ /**
+ * Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
+ *
+ * @param tableIdentifier The table identifier to load
+ * @param instructions The instructions for customizations while reading
+ * @return The loaded table
+ */
+ @SuppressWarnings("unused")
+ public Table readTable(
+ @NotNull final String tableIdentifier,
+ @Nullable final IcebergInstructions instructions) {
+ return readTable(TableIdentifier.parse(tableIdentifier), instructions);
+ }
+
/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
@@ -350,7 +389,7 @@ public Table readTable(
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
final long tableSnapshotId,
- @NotNull final IcebergInstructions instructions) {
+ @Nullable final IcebergInstructions instructions) {
// Find the snapshot with the given snapshot id
final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream()
@@ -361,6 +400,22 @@ public Table readTable(
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}
+ /**
+ * Read a static snapshot of an Iceberg table from the Iceberg catalog.
+ *
+ * @param tableIdentifier The table identifier to load
+ * @param tableSnapshotId The snapshot id to load
+ * @param instructions The instructions for customizations while reading
+ * @return The loaded table
+ */
+ @SuppressWarnings("unused")
+ public Table readTable(
+ @NotNull final String tableIdentifier,
+ final long tableSnapshotId,
+ @Nullable final IcebergInstructions instructions) {
+ return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions);
+ }
+
/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
@@ -373,32 +428,35 @@ public Table readTable(
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
@NotNull final Snapshot tableSnapshot,
- @NotNull final IcebergInstructions instructions) {
+ @Nullable final IcebergInstructions instructions) {
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}
private Table readTableInternal(
@NotNull final TableIdentifier tableIdentifier,
@Nullable final Snapshot tableSnapshot,
- @NotNull final IcebergInstructions instructions) {
+ @Nullable final IcebergInstructions instructions) {
- // Load the table from the catalog
+ // Load the table from the catalog.
final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier);
// Do we want the latest or a specific snapshot?
final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot();
final Schema schema = table.schemas().get(snapshot.schemaId());
- // Load the partitioning schema
+ // Load the partitioning schema.
final org.apache.iceberg.PartitionSpec partitionSpec = table.spec();
+ // Get default instructions if none are provided
+ final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions;
+
// Get the user supplied table definition.
- final TableDefinition userTableDef = instructions.tableDefinition().orElse(null);
+ final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null);
// Get the table definition from the schema (potentially limited by the user supplied table definition and
// applying column renames).
final TableDefinition icebergTableDef =
- fromSchema(schema, partitionSpec, userTableDef, instructions.columnRenames());
+ fromSchema(schema, partitionSpec, userTableDef, userInstructions.columnRenames());
// If the user supplied a table definition, make sure it's fully compatible.
final TableDefinition tableDef;
@@ -433,11 +491,11 @@ private Table readTableInternal(
if (partitionSpec.isUnpartitioned()) {
// Create the flat layout location key finder
- keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, instructions);
+ keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions);
} else {
// Create the partitioning column location key finder
keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec,
- instructions);
+ userInstructions);
}
refreshService = null;
@@ -459,4 +517,12 @@ private Table readTableInternal(
return result;
}
+
+ /**
+ * Returns the underlying Iceberg {@link Catalog catalog} used by this adapter.
+ */
+ @SuppressWarnings("unused")
+ public Catalog catalog() {
+ return catalog;
+ }
}
diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java
index 4788e0e8714..b595b4cfd14 100644
--- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java
+++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java
@@ -17,6 +17,13 @@
@Immutable
@BuildableStyle
public abstract class IcebergInstructions {
+ /**
+ * The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system
+ * defaults for cloud provider-specific parameters
+ */
+ @SuppressWarnings("unused")
+ public static final IcebergInstructions DEFAULT = builder().build();
+
public static Builder builder() {
return ImmutableIcebergInstructions.builder();
}
diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py
new file mode 100644
index 00000000000..7506bc95a25
--- /dev/null
+++ b/py/server/deephaven/experimental/iceberg.py
@@ -0,0 +1,252 @@
+#
+# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
+#
+""" This module adds Iceberg table support into Deephaven. """
+from typing import List, Optional, Union, Dict, Sequence
+
+import jpy
+
+from deephaven import DHError
+from deephaven._wrapper import JObjectWrapper
+from deephaven.column import Column
+from deephaven.dtypes import DType
+from deephaven.experimental import s3
+
+from deephaven.jcompat import j_table_definition
+
+from deephaven.table import Table
+
+_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions")
+_JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter")
+
+# IcebergToolsS3 is an optional library
+try:
+ _JIcebergToolsS3 = jpy.get_type("io.deephaven.iceberg.util.IcebergToolsS3")
+except Exception:
+ _JIcebergToolsS3 = None
+
+_JNamespace = jpy.get_type("org.apache.iceberg.catalog.Namespace")
+_JTableIdentifier = jpy.get_type("org.apache.iceberg.catalog.TableIdentifier")
+_JSnapshot = jpy.get_type("org.apache.iceberg.Snapshot")
+
+
+class IcebergInstructions(JObjectWrapper):
+ """
+ This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename
+ instructions and table definitions, as well as special data instructions for loading data files from the cloud.
+ """
+
+ j_object_type = _JIcebergInstructions
+
+ def __init__(self,
+ table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None,
+ data_instructions: Optional[s3.S3Instructions] = None,
+ column_renames: Optional[Dict[str, str]] = None):
+ """
+ Initializes the instructions using the provided parameters.
+
+ Args:
+ table_definition (Optional[Union[Dict[str, DType], List[Column], None]]): the table definition; if omitted,
+ the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table
+ will have that definition. This is useful for specifying a subset of the Iceberg schema columns.
+ data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when
+ reading files from a non-local file system, like S3.
+ column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in
+ the output table.
+
+ Raises:
+ DHError: If unable to build the instructions object.
+ """
+
+ try:
+ builder = self.j_object_type.builder()
+
+ if table_definition is not None:
+ builder.tableDefinition(j_table_definition(table_definition))
+
+ if data_instructions is not None:
+ builder.dataInstructions(data_instructions.j_object)
+
+ if column_renames is not None:
+ for old_name, new_name in column_renames.items():
+ builder.putColumnRenames(old_name, new_name)
+
+ self._j_object = builder.build()
+ except Exception as e:
+ raise DHError(e, "Failed to build Iceberg instructions") from e
+
+ @property
+ def j_object(self) -> jpy.JType:
+ return self._j_object
+
+
+class IcebergCatalogAdapter(JObjectWrapper):
+ """
+ This class provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and
+ snapshots, as well as reading Iceberg tables into Deephaven tables.
+ """
+ j_object_type = _JIcebergCatalogAdapter or type(None)
+
+ def __init__(self, j_object: _JIcebergCatalogAdapter):
+ self.j_catalog_adapter = j_object
+
+ def namespaces(self, namespace: Optional[str] = None) -> Table:
+ """
+ Returns information on the namespaces in the catalog as a Deephaven table. If a namespace is specified, the
+ tables in that namespace are listed; otherwise the top-level namespaces are listed.
+
+ Args:
+ namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the
+ top-level namespaces are listed.
+
+ Returns:
+ a table containing the namespaces.
+ """
+
+ if namespace is not None:
+ return Table(self.j_object.listNamespaces(namespace))
+ return Table(self.j_object.listNamespacesAsTable())
+
+ def tables(self, namespace: str) -> Table:
+ """
+ Returns information on the tables in the specified namespace as a Deephaven table.
+
+ Args:
+ namespace (str): the namespace from which to list tables.
+
+ Returns:
+ a table containing the tables in the provided namespace.
+ """
+
+ if namespace is not None:
+ return Table(self.j_object.listTablesAsTable(namespace))
+ return Table(self.j_object.listTablesAsTable())
+
+ def snapshots(self, table_identifier: str) -> Table:
+ """
+ Returns information on the snapshots of the specified table as a Deephaven table.
+
+ Args:
+ table_identifier (str): the table from which to list snapshots.
+
+ Returns:
+ a table containing the snapshot information.
+ """
+
+ return self.j_object.listSnapshotsAsTable(table_identifier)
+
+ def read_table(self, table_identifier: str, instructions: Optional[IcebergInstructions] = None, snapshot_id: Optional[int] = None) -> Table:
+ """
+ Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to
+ read a specific snapshot of the table.
+
+ Args:
+ table_identifier (str): the table to read.
+ instructions (Optional[IcebergInstructions]): the instructions for reading the table. These instructions
+ can include column renames, table definition, and specific data instructions for reading the data files
+ from the provider. If omitted, the table will be read with default instructions.
+ snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected.
+
+ Returns:
+ Table: the table read from the catalog.
+ """
+
+ if instructions is not None:
+ instructions_object = instructions.j_object
+ else:
+ instructions_object = _JIcebergInstructions.DEFAULT
+
+ if snapshot_id is not None:
+ return Table(self.j_object.readTable(table_identifier, snapshot_id, instructions_object))
+ return Table(self.j_object.readTable(table_identifier, instructions_object))
+
+ @property
+ def j_object(self) -> jpy.JType:
+ return self.j_catalog_adapter
+
+
+def adapter_s3_rest(
+ catalog_uri: str,
+ warehouse_location: str,
+ name: Optional[str] = None,
+ region_name: Optional[str] = None,
+ access_key_id: Optional[str] = None,
+ secret_access_key: Optional[str] = None,
+ end_point_override: Optional[str] = None
+) -> IcebergCatalogAdapter:
+ """
+ Create a catalog adapter using an S3-compatible provider and a REST catalog.
+
+ Args:
+ catalog_uri (str): the URI of the REST catalog.
+ warehouse_location (str): the location of the warehouse.
+ name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the
+ catalog URI.
+ region_name (Optional[str]): the S3 region name to use; If not provided, the default region will be
+ picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the
+ {user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running in
+ EC2.
+ access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be
+ provided to use static credentials, else default credentials will be used.
+ secret_access_key (Optional[str]): the secret access key for reading files. Both access key and secret key
+ must be provided to use static credentials, else default credentials will be used.
+ end_point_override (Optional[str]): the S3 endpoint to connect to. Callers connecting to AWS do not typically
+ need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs.
+
+ Returns:
+ IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog.
+
+ Raises:
+ DHError: If unable to build the catalog adapter.
+ """
+ if not _JIcebergToolsS3:
+ raise DHError(message="`adapter_s3_rest` requires the Iceberg specific deephaven S3 extensions to be "
+ "included in the package")
+
+ try:
+ return IcebergCatalogAdapter(
+ _JIcebergToolsS3.createS3Rest(
+ name,
+ catalog_uri,
+ warehouse_location,
+ region_name,
+ access_key_id,
+ secret_access_key,
+ end_point_override))
+ except Exception as e:
+ raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e
+
+
+def adapter_aws_glue(
+ catalog_uri: str,
+ warehouse_location: str,
+ name: Optional[str] = None
+) -> IcebergCatalogAdapter:
+ """
+ Create a catalog adapter using an AWS Glue catalog.
+
+ Args:
+ catalog_uri (str): the URI of the REST catalog.
+ warehouse_location (str): the location of the warehouse.
+ name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the
+ catalog URI.
+
+ Returns:
+ IcebergCatalogAdapter: the catalog adapter for the provided AWS Glue catalog.
+
+ Raises:
+ DHError: If unable to build the catalog adapter.
+ """
+ if not _JIcebergToolsS3:
+ raise DHError(message="`adapter_aws_glue` requires the Iceberg specific deephaven S3 extensions to "
+ "be included in the package")
+
+ try:
+ return IcebergCatalogAdapter(
+ _JIcebergToolsS3.createGlue(
+ name,
+ catalog_uri,
+ warehouse_location))
+ except Exception as e:
+ raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e
+
diff --git a/py/server/deephaven/jcompat.py b/py/server/deephaven/jcompat.py
index e6e921fd8f1..d807cb472f3 100644
--- a/py/server/deephaven/jcompat.py
+++ b/py/server/deephaven/jcompat.py
@@ -14,9 +14,11 @@
from deephaven import dtypes, DHError
from deephaven._wrapper import unwrap, wrap_j_object, JObjectWrapper
from deephaven.dtypes import DType, _PRIMITIVE_DTYPE_NULL_MAP
+from deephaven.column import Column
_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
+_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_DH_PANDAS_NULLABLE_TYPE_MAP: Dict[DType, pd.api.extensions.ExtensionDtype] = {
dtypes.bool_: pd.BooleanDtype,
@@ -325,6 +327,35 @@ def _j_array_to_series(dtype: DType, j_array: jpy.JType, conv_null: bool) -> pd.
return s
+def j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]:
+ """Produce a Deephaven TableDefinition from user input.
+
+ Args:
+ table_definition (Union[Dict[str, DType], List[Column], None]): the table definition as a dictionary of column
+ names and their corresponding data types or a list of Column objects
+
+ Returns:
+ a Deephaven TableDefinition object or None if the input is None
+
+ Raises:
+ DHError
+ """
+ if table_definition is None:
+ return None
+ elif isinstance(table_definition, Dict):
+ return _JTableDefinition.of(
+ [
+ Column(name=name, data_type=dtype).j_column_definition
+ for name, dtype in table_definition.items()
+ ]
+ )
+ elif isinstance(table_definition, List):
+ return _JTableDefinition.of(
+ [col.j_column_definition for col in table_definition]
+ )
+ else:
+ raise DHError(f"Unexpected table_definition type: {type(table_definition)}")
+
class AutoCloseable(JObjectWrapper):
"""A context manager wrapper to allow Java AutoCloseable to be used in with statements.
diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py
index 76c9dd86735..dc877660671 100644
--- a/py/server/deephaven/parquet.py
+++ b/py/server/deephaven/parquet.py
@@ -13,7 +13,7 @@
from deephaven import DHError
from deephaven.column import Column
from deephaven.dtypes import DType
-from deephaven.jcompat import j_array_list
+from deephaven.jcompat import j_array_list, j_table_definition
from deephaven.table import Table, PartitionedTable
from deephaven.experimental import s3
@@ -135,7 +135,7 @@ def _build_parquet_instructions(
builder.setFileLayout(_j_file_layout(file_layout))
if table_definition is not None:
- builder.setTableDefinition(_j_table_definition(table_definition))
+ builder.setTableDefinition(j_table_definition(table_definition))
if index_columns:
builder.addAllIndexColumns(_j_list_of_list_of_string(index_columns))
@@ -146,24 +146,6 @@ def _build_parquet_instructions(
return builder.build()
-def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]:
- if table_definition is None:
- return None
- elif isinstance(table_definition, Dict):
- return _JTableDefinition.of(
- [
- Column(name=name, data_type=dtype).j_column_definition
- for name, dtype in table_definition.items()
- ]
- )
- elif isinstance(table_definition, List):
- return _JTableDefinition.of(
- [col.j_column_definition for col in table_definition]
- )
- else:
- raise DHError(f"Unexpected table_definition type: {type(table_definition)}")
-
-
def _j_file_layout(file_layout: Optional[ParquetFileLayout]) -> Optional[jpy.JType]:
if file_layout is None:
return None
diff --git a/py/server/deephaven/stream/table_publisher.py b/py/server/deephaven/stream/table_publisher.py
index cb0d1073de8..a6c65f47885 100644
--- a/py/server/deephaven/stream/table_publisher.py
+++ b/py/server/deephaven/stream/table_publisher.py
@@ -5,13 +5,13 @@
import jpy
-from typing import Callable, Dict, Optional, Tuple
+from typing import Callable, Dict, Optional, Tuple, Union, List
from deephaven._wrapper import JObjectWrapper
from deephaven.column import Column
from deephaven.dtypes import DType
from deephaven.execution_context import get_exec_ctx
-from deephaven.jcompat import j_lambda, j_runnable
+from deephaven.jcompat import j_lambda, j_runnable, j_table_definition
from deephaven.table import Table
from deephaven.update_graph import UpdateGraph
@@ -75,7 +75,7 @@ def publish_failure(self, failure: Exception) -> None:
def table_publisher(
name: str,
- col_defs: Dict[str, DType],
+ col_defs: Union[Dict[str, DType], List[Column]],
on_flush_callback: Optional[Callable[[TablePublisher], None]] = None,
on_shutdown_callback: Optional[Callable[[], None]] = None,
update_graph: Optional[UpdateGraph] = None,
@@ -107,12 +107,7 @@ def adapt_callback(_table_publisher: jpy.JType):
j_table_publisher = _JTablePublisher.of(
name,
- _JTableDefinition.of(
- [
- Column(name=name, data_type=dtype).j_column_definition
- for name, dtype in col_defs.items()
- ]
- ),
+ j_table_definition(col_defs),
j_lambda(adapt_callback, _JConsumer, None) if on_flush_callback else None,
j_runnable(on_shutdown_callback) if on_shutdown_callback else None,
(update_graph or get_exec_ctx().update_graph).j_update_graph,
diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py
new file mode 100644
index 00000000000..62ba31e6636
--- /dev/null
+++ b/py/server/tests/test_iceberg.py
@@ -0,0 +1,76 @@
+#
+# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
+#
+import jpy
+
+from deephaven import dtypes
+from deephaven.column import Column, ColumnType
+
+from tests.testbase import BaseTestCase
+from deephaven.experimental import s3, iceberg
+
+from deephaven.jcompat import j_map_to_dict, j_list_to_list
+
+_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
+
+class IcebergTestCase(BaseTestCase):
+ """ Test cases for the deephaven.iceberg module (performed locally) """
+
+ def setUp(self):
+ super().setUp()
+
+ def tearDown(self):
+ super().tearDown()
+
+ def test_instruction_create_empty(self):
+ iceberg_instructions = iceberg.IcebergInstructions()
+
+ def test_instruction_create_with_s3_instructions(self):
+ s3_instructions = s3.S3Instructions(region_name="us-east-1",
+ access_key_id="some_access_key_id",
+ secret_access_key="som_secret_access_key"
+ )
+ iceberg_instructions = iceberg.IcebergInstructions(data_instructions=s3_instructions)
+
+ def test_instruction_create_with_col_renames(self):
+ renames = {
+ "old_name_a": "new_name_a",
+ "old_name_b": "new_name_b",
+ "old_name_c": "new_name_c"
+ }
+ iceberg_instructions = iceberg.IcebergInstructions(column_renames=renames)
+
+ col_rename_dict = j_map_to_dict(iceberg_instructions.j_object.columnRenames())
+ self.assertTrue(col_rename_dict["old_name_a"] == "new_name_a")
+ self.assertTrue(col_rename_dict["old_name_b"] == "new_name_b")
+ self.assertTrue(col_rename_dict["old_name_c"] == "new_name_c")
+
+ def test_instruction_create_with_table_definition_dict(self):
+ table_def={
+ "x": dtypes.int32,
+ "y": dtypes.double,
+ "z": dtypes.double,
+ }
+
+ iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def)
+ col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames())
+ self.assertTrue(col_names[0] == "x")
+ self.assertTrue(col_names[1] == "y")
+ self.assertTrue(col_names[2] == "z")
+
+ def test_instruction_create_with_table_definition_list(self):
+ table_def=[
+ Column(
+ "Partition", dtypes.int32, column_type=ColumnType.PARTITIONING
+ ),
+ Column("x", dtypes.int32),
+ Column("y", dtypes.double),
+ Column("z", dtypes.double),
+ ]
+
+ iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def)
+ col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames())
+ self.assertTrue(col_names[0] == "Partition")
+ self.assertTrue(col_names[1] == "x")
+ self.assertTrue(col_names[2] == "y")
+ self.assertTrue(col_names[3] == "z")