From 1b39b99c72aba204eb7b0c86e5013cc47f10d2b0 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Fri, 7 Jun 2024 15:13:10 -0700 Subject: [PATCH 01/11] WIP commit, functional but needs docs. --- .../iceberg/util/IcebergToolsTest.java | 79 ++++++- .../iceberg/util/IcebergCatalogAdapter.java | 87 +++++++ py/server/deephaven/experimental/iceberg.py | 219 ++++++++++++++++++ py/server/tests/test_iceberg.py | 58 +++++ 4 files changed, 433 insertions(+), 10 deletions(-) create mode 100644 py/server/deephaven/experimental/iceberg.py create mode 100644 py/server/tests/test_iceberg.py diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index 0fd3b3fcf7e..b2fb624cf3e 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -129,19 +129,30 @@ public void testListTables() { final Namespace ns = Namespace.of("sales"); - final Collection tables = adapter.listTables(ns); + Collection tables = adapter.listTables(ns); Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")), "tables.contains(sales_partitioned)"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)"); - final Table table = adapter.listTablesAsTable(ns); + Table table = adapter.listTablesAsTable(ns); Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type"); Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type"); Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class), "table_identifier_object column type"); + + // Test the string versions of the methods + tables = adapter.listTables("sales"); + Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")), + "tables.contains(sales_partitioned)"); + Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)"); + + table = adapter.listTablesAsTable("sales"); + Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); } @Test @@ -160,7 +171,7 @@ public void testListSnapshots() { Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)"); Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)"); - final Table table = adapter.listSnapshotsAsTable(tableIdentifier); + Table table = adapter.listSnapshotsAsTable(tableIdentifier); Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type"); Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type"); @@ -168,6 +179,21 @@ public void testListSnapshots() { Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type"); Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class), "snapshot_object column type"); + + // Test the string versions of the methods + snapshotIds.clear(); + adapter.listSnapshots("sales.sales_multi") + .forEach(snapshot -> snapshotIds.add(snapshot.snapshotId())); + + Assert.eq(snapshotIds.size(), "snapshots.size()", 4, "4 snapshots for sales/sales_multi"); + + Assert.eqTrue(snapshotIds.contains(2001582482032951248L), "snapshots.contains(2001582482032951248)"); + Assert.eqTrue(snapshotIds.contains(8325605756612719366L), "snapshots.contains(8325605756612719366L)"); + Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)"); + Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)"); + + table = adapter.listSnapshotsAsTable("sales.sales_multi"); + Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); } @Test @@ -180,7 +206,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + table = adapter.readTable("sales.sales_partitioned", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); @@ -196,9 +228,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + table = adapter.readTable("sales.sales_multi", instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); } @Test @@ -211,7 +249,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single"); - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + table = adapter.readTable("sales.sales_single", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); @@ -563,16 +607,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx final List snapshots = adapter.listSnapshots(tableId); // Verify we retrieved all the rows. - final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions); + io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions); + Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + + io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions); + Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + + io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions); + Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + + io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions); + Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + + // Verify we retrieved all the rows. + table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions); Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); - final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions); + table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions); Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); - final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions); + table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions); Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); - final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions); + table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions); Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index c379c715c6d..f38aac36c8c 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -172,6 +172,18 @@ public List listNamespaces(@NotNull final Namespace namespace) { "%s does not implement org.apache.iceberg.catalog.SupportsNamespaces", catalog.getClass().getName())); } + /** + * List all {@link Namespace namespaces} in a given namespace. This method is only supported if the catalog + * implements {@link SupportsNamespaces} for namespace discovery. See + * {@link SupportsNamespaces#listNamespaces(Namespace)}. + * + * @param namespace The namespace(s) to list namespaces in. + * @return A list of all namespaces in the given namespace. + */ + public List listNamespaces(@NotNull final String... namespace) { + return listNamespaces(Namespace.of(namespace)); + } + /** * List all {@link Namespace namespaces} in the catalog as a Deephaven {@link Table table}. The resulting table will * be static and contain the same information as {@link #listNamespaces()}. @@ -215,6 +227,16 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) { return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); } + /** + * List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting + * table will be static and contain the same information as {@link #listNamespaces(Namespace)}. + * + * @return A {@link Table table} of all namespaces. + */ + public Table listNamespacesAsTable(@NotNull final String... namespace) { + return listNamespacesAsTable(Namespace.of(namespace)); + } + /** * List all Iceberg {@link TableIdentifier tables} in a given namespace. * @@ -225,6 +247,16 @@ public List listTables(@NotNull final Namespace namespace) { return catalog.listTables(namespace); } + /** + * List all Iceberg {@link TableIdentifier tables} in a given namespace. + * + * @param namespace The namespace to list tables in. + * @return A list of all tables in the given namespace. + */ + public List listTables(@NotNull final String... namespace) { + return listTables(Namespace.of(namespace)); + } + /** * List all Iceberg {@link TableIdentifier tables} in a given namespace as a Deephaven {@link Table table}. The * resulting table will be static and contain the same information as {@link #listTables(Namespace)}. @@ -264,6 +296,10 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) { return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); } + public Table listTablesAsTable(@NotNull final String... namespace) { + return listTablesAsTable(Namespace.of(namespace)); + } + /** * List all {@link Snapshot snapshots} of a given Iceberg table. * @@ -276,6 +312,16 @@ public List listSnapshots(@NotNull final TableIdentifier tableIdentifi return snapshots; } + /** + * List all {@link Snapshot snapshots} of a given Iceberg table. + * + * @param tableIdentifier The identifier of the table from which to gather snapshots. + * @return A list of all snapshots of the given table. + */ + public List listSnapshots(@NotNull final String tableIdentifier) { + return listSnapshots(TableIdentifier.parse(tableIdentifier)); + } + /** * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting * table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}. @@ -324,6 +370,17 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); } + /** + * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting + * table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}. + * + * @param tableIdentifier The identifier of the table from which to gather snapshots. + * @return A list of all tables in the given namespace. + */ + public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) { + return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier)); + } + /** * Read the latest static snapshot of an Iceberg table from the Iceberg catalog. * @@ -338,6 +395,20 @@ public Table readTable( return readTableInternal(tableIdentifier, null, instructions); } + /** + * Read the latest static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final String tableIdentifier, + @NotNull final IcebergInstructions instructions) { + return readTable(TableIdentifier.parse(tableIdentifier), instructions); + } + /** * Read a static snapshot of an Iceberg table from the Iceberg catalog. * @@ -361,6 +432,22 @@ public Table readTable( return readTableInternal(tableIdentifier, tableSnapshot, instructions); } + /** + * Read a static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param tableSnapshotId The snapshot id to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final String tableIdentifier, + final long tableSnapshotId, + @NotNull final IcebergInstructions instructions) { + return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions); + } + /** * Read a static snapshot of an Iceberg table from the Iceberg catalog. * diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py new file mode 100644 index 00000000000..29f104b3392 --- /dev/null +++ b/py/server/deephaven/experimental/iceberg.py @@ -0,0 +1,219 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +from typing import List, Optional, Union, Dict, Sequence + +import jpy + +from deephaven import DHError +from deephaven._wrapper import JObjectWrapper +from deephaven.column import Column +from deephaven.dtypes import DType +from deephaven.experimental import s3 + +from deephaven.jcompat import j_list_to_list + +from deephaven.table import Table + +# If we move Iceberg to a permanent module, we should remove this try/except block and just import the types directly. +try: + _JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") +except Exception: + _JIcebergInstructions = None +try: + _JIcebergCatalog = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalog") +except Exception: + _JIcebergCatalog = None +try: + _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") +except Exception: + _JIcebergCatalogAdapter = None +try: + _JIcebergToolsS3 = jpy.get_type("io.deephaven.iceberg.util.IcebergToolsS3") +except Exception: + _JIcebergToolsS3 = None + +_JNamespace = jpy.get_type("org.apache.iceberg.catalog.Namespace") +_JTableIdentifier = jpy.get_type("org.apache.iceberg.catalog.TableIdentifier") +_JSnapshot = jpy.get_type("org.apache.iceberg.Snapshot") + +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") + + +""" + XXXXXXXX +""" +class IcebergInstructions(JObjectWrapper): + """ + XXXXXXXXXX provides specialized instructions for reading from S3-compatible APIs. + """ + + j_object_type = _JIcebergInstructions or type(None) + + def __init__(self, + table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None, + data_instructions: Optional[s3.S3Instructions] = None, + column_renames: Optional[Dict[str, str]] = None): + + """ + Initializes the instructions. + + Args: + table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None, + the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will + have that definition. This is useful for bootstrapping purposes when the initially partitioned directory is + empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. + data_instructions (Optional[s3.S3Instructions]): Special instructions for reading parquet files, useful when + reading files from a non-local file system, like S3. By default, None. + column_renames (Optional[Dict[str, str]]): A dictionary of column renames, by default None. When None, no columns will be renamed. + + Raises: + DHError: If unable to build the instructions object. + """ + + if not _JIcebergInstructions: + raise DHError(message="IcebergInstructions requires the Iceberg specific deephaven extensions to be " + "included in the package") + + try: + builder = self.j_object_type.builder() + + if table_definition is not None: + builder.tableDefinition(_j_table_definition(table_definition)) + + if data_instructions is not None: + builder.dataInstructions(data_instructions.j_object) + + if column_renames is not None: + for old_name, new_name in column_renames.items(): + builder.putColumnRenames(old_name, new_name) + + self._j_object = builder.build() + except Exception as e: + raise DHError(e, "Failed to build Iceberg instructions") from e + + @property + def j_object(self) -> jpy.JType: + return self._j_object + +def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]: + if table_definition is None: + return None + elif isinstance(table_definition, Dict): + return _JTableDefinition.of( + [ + Column(name=name, data_type=dtype).j_column_definition + for name, dtype in table_definition.items() + ] + ) + elif isinstance(table_definition, List): + return _JTableDefinition.of( + [col.j_column_definition for col in table_definition] + ) + else: + raise DHError(f"Unexpected table_definition type: {type(table_definition)}") + +def _j_object_list_to_str_list(j_object_list: jpy.JType) -> List[str]: + return [x.toString() for x in j_list_to_list(j_object_list)] + +class IcebergCatalogAdapter(JObjectWrapper): + """ + """ + j_object_type = _JIcebergCatalogAdapter or type(None) + + def __init__(self, j_object: _JIcebergCatalogAdapter): + self.j_catalog_adapter = j_object + + def namespaces(self, namespace: Optional[str] = None) -> Sequence[str]: + """ + Returns the list of namespaces in the catalog as strings. + + :param namespace: + :return: + """ + if namespace is not None: + return _j_object_list_to_str_list(self.j_object.listNamespaces(namespace)) + return _j_object_list_to_str_list(self.j_object.listNamespaces()) + + def namespaces_as_table(self, namespace: Optional[str] = None) -> Table: + if namespace is not None: + return Table(self.j_object.listNamespaces(namespace)) + return Table(self.j_object.listNamespacesAsTable()) + + def tables(self, namespace: Optional[str] = None) -> Sequence[str]: + if namespace is not None: + return _j_object_list_to_str_list(self.j_object.listTables(namespace)) + return _j_object_list_to_str_list(self.j_object.listTables()) + + def tables_as_table(self, namespace: Optional[str] = None) -> Table: + if namespace is not None: + return Table(self.j_object.listTablesAsTable(namespace)) + return Table(self.j_object.listTablesAsTable()) + + def snapshots(self, table_identifier: str) -> Sequence[str]: + """ + Returns a list of snapshots for the specified table. + + :param table_identifier: + :return: + """ + + snaphot_list = [] + for snapshot in j_list_to_list(self.j_object.listSnapshots(table_identifier)): + snaphot_list.append({ + "id": snapshot.snapshotId(), + "timestamp_ms": snapshot.timestampMillis(), + "operation": snapshot.operation(), + "summary": snapshot.summary().toString() + + }) + return snaphot_list + + def snapshots_as_table(self, table_identifier: str) -> Table: + """ + Returns a list of snapshots for the specified table. + + :param table_identifier: + :return: + """ + + return self.j_object.listSnapshotsAsTable(table_identifier) + + def read_table(self, table_identifier: str, instructions: IcebergInstructions, snapshot_id: Optional[int] = None) -> Table: + """ + Reads the specified table. + + :param table_identifier: + :param snapshot_id: + :return: + """ + + if snapshot_id is not None: + return Table(self.j_object.readTable(table_identifier, snapshot_id, instructions.j_object)) + return Table(self.j_object.readTable(table_identifier, instructions.j_object)) + + @property + def j_object(self) -> jpy.JType: + return self.j_catalog_adapter + +def create_s3_rest_adapter( + name: Optional[str] = None, + catalog_uri: Optional[str] = None, + warehouse_location: Optional[str] = None, + region_name: Optional[str] = None, + access_key_id: Optional[str] = None, + secret_access_key: Optional[str] = None, + end_point_override: Optional[str] = None +) -> IcebergCatalogAdapter: + return IcebergCatalogAdapter( + _JIcebergToolsS3.createS3Rest( + name, + catalog_uri, + warehouse_location, + region_name, + access_key_id, + secret_access_key, + end_point_override)) + +def create_s3_aws_glue_adapter() -> IcebergCatalogAdapter: + return IcebergCatalogAdapter(_JIcebergCatalogAdapter.builder().build()) \ No newline at end of file diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py new file mode 100644 index 00000000000..262b1b2ed7d --- /dev/null +++ b/py/server/tests/test_iceberg.py @@ -0,0 +1,58 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# + +from deephaven import dtypes +from deephaven.column import Column, ColumnType + +from tests.testbase import BaseTestCase +from deephaven.experimental import s3, iceberg + + +class IcebergTestCase(BaseTestCase): + """ Test cases for the deephaven.iceberg module (performed locally) """ + + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def test_instruction_create_empty(self): + iceberg_instructions = iceberg.IcebergInstructions() + + def test_instruction_create_with_s3_instructions(self): + s3_instructions = s3.S3Instructions(region_name="us-east-1", + access_key_id="some_access_key_id", + secret_access_key="som_secret_access_key" + ) + iceberg_instructions = iceberg.IcebergInstructions(data_instructions=s3_instructions) + + def test_instruction_create_with_col_renames(self): + renames = { + "old_name_a": "new_name_a", + "old_name_b": "new_name_b", + "old_name_c": "new_name_c" + } + iceberg_instructions = iceberg.IcebergInstructions(column_renames=renames) + + def test_instruction_create_with_table_definitition_dict(self): + table_def={ + "x": dtypes.+ .int32, + "y": dtypes.double, + "z": dtypes.double, + } + + iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + + def test_instruction_create_with_table_definitition_list(self): + table_def=[ + Column( + "Partition", dtypes.int32, column_type=ColumnType.PARTITIONING + ), + Column("x", dtypes.int32), + Column("y", dtypes.double), + Column("z", dtypes.double), + ] + + iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) From c93d05257fbadc847276485bd653c9f0258a0759 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Fri, 7 Jun 2024 16:28:31 -0700 Subject: [PATCH 02/11] Much better docs, need to figure out AWS Glue ASAP. --- py/server/deephaven/experimental/iceberg.py | 133 ++++++++++++++++---- 1 file changed, 107 insertions(+), 26 deletions(-) diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 29f104b3392..2e197e24b32 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -1,6 +1,7 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # +""" This module supports reading external Iceberg tables into Deephaven. """ from typing import List, Optional, Union, Dict, Sequence import jpy @@ -40,12 +41,10 @@ _JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") -""" - XXXXXXXX -""" class IcebergInstructions(JObjectWrapper): """ - XXXXXXXXXX provides specialized instructions for reading from S3-compatible APIs. + This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename + instructions and table definitions, as well as special data instructions for loading data files from the cloud. """ j_object_type = _JIcebergInstructions or type(None) @@ -56,16 +55,16 @@ def __init__(self, column_renames: Optional[Dict[str, str]] = None): """ - Initializes the instructions. + Initializes the instructions using the provided parameters. Args: - table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None, - the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will - have that definition. This is useful for bootstrapping purposes when the initially partitioned directory is - empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. - data_instructions (Optional[s3.S3Instructions]): Special instructions for reading parquet files, useful when - reading files from a non-local file system, like S3. By default, None. - column_renames (Optional[Dict[str, str]]): A dictionary of column renames, by default None. When None, no columns will be renamed. + table_definition (Union[Dict[str, DType], List[Column], None]): the table definition; if ommitted, the + definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table will + have that definition. This is useful for specifying a subset of the Iceberg schema columns. + data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when + reading files from a non-local file system, like S3. + column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in + the output table. Raises: DHError: If unable to build the instructions object. @@ -118,6 +117,8 @@ def _j_object_list_to_str_list(j_object_list: jpy.JType) -> List[str]: class IcebergCatalogAdapter(JObjectWrapper): """ + This class provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and + snapshots, as well as reading Iceberg tables into Deephaven tables. """ j_object_type = _JIcebergCatalogAdapter or type(None) @@ -126,36 +127,73 @@ def __init__(self, j_object: _JIcebergCatalogAdapter): def namespaces(self, namespace: Optional[str] = None) -> Sequence[str]: """ - Returns the list of namespaces in the catalog as strings. + Returns the list of namespaces in the catalog. - :param namespace: - :return: + Args: + namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the + top-level namespaces are listed. + Returns: + Sequence[str]: the list of namespaces. """ if namespace is not None: return _j_object_list_to_str_list(self.j_object.listNamespaces(namespace)) return _j_object_list_to_str_list(self.j_object.listNamespaces()) def namespaces_as_table(self, namespace: Optional[str] = None) -> Table: + """ + Returns the namespaces in the catalog as a Deephaven table. + + Args: + namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the + top-level namespaces are listed. + + Returns: + a table containing the namespaces. + """ + if namespace is not None: return Table(self.j_object.listNamespaces(namespace)) return Table(self.j_object.listNamespacesAsTable()) - def tables(self, namespace: Optional[str] = None) -> Sequence[str]: + def tables(self, namespace: str) -> Sequence[str]: + """ + Returns the list of tables in the provided namespace. + + Args: + namespace (str): the namespace from which to list tables. + + Returns: + Sequence[str]: the list of table names. + """ + if namespace is not None: return _j_object_list_to_str_list(self.j_object.listTables(namespace)) return _j_object_list_to_str_list(self.j_object.listTables()) def tables_as_table(self, namespace: Optional[str] = None) -> Table: + """ + Returns the list of tables in the provided namespace as a Deephaven table. + + Args: + namespace (str): the namespace from which to list tables. + + Returns: + a table containing the tables in the provided namespace. + """ + if namespace is not None: return Table(self.j_object.listTablesAsTable(namespace)) return Table(self.j_object.listTablesAsTable()) def snapshots(self, table_identifier: str) -> Sequence[str]: """ - Returns a list of snapshots for the specified table. + Returns the list of snapshots for the provided table. + + Args: + namespace (str): the table from which to list snapshots. - :param table_identifier: - :return: + Returns: + the list of snapshots and additional information for each snapshot. """ snaphot_list = [] @@ -171,21 +209,31 @@ def snapshots(self, table_identifier: str) -> Sequence[str]: def snapshots_as_table(self, table_identifier: str) -> Table: """ - Returns a list of snapshots for the specified table. + Returns the list of snapshots of the provided table as a Deephaven table. + + Args: + table_identifier (str): the table from which to list snapshots. - :param table_identifier: - :return: + Returns: + a table containing the snapshot information. """ return self.j_object.listSnapshotsAsTable(table_identifier) def read_table(self, table_identifier: str, instructions: IcebergInstructions, snapshot_id: Optional[int] = None) -> Table: """ - Reads the specified table. + Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to + read a specific snapshot of the table. - :param table_identifier: - :param snapshot_id: - :return: + Args: + table_identifier (str): the table to read. + instructions (IcebergInstructions): the instructions for reading the table. These instructions can include + column renames, table definition, and specific data instructions for reading the data files from the + provider. + snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. + + Returns: + Table: the table read from the catalog. """ if snapshot_id is not None: @@ -205,6 +253,29 @@ def create_s3_rest_adapter( secret_access_key: Optional[str] = None, end_point_override: Optional[str] = None ) -> IcebergCatalogAdapter: + """ + Create a catalog adapter using an S3-compatible provider and a REST catalog. + + Args: + name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the + catalog URI. + catalog_uri (Optional[str]): the URI of the REST catalog. + warehouse_location (Optional[str]): the location of the warehouse. + region_name (Optional[str]): the S3 region name to use. + access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be + provided to use static credentials, else default credentials will be used. + secret_access_key (Optional[str]): the secret access key for reading files. Both access key and secret key + must be provided to use static credentials, else default credentials will be used. + end_point_override (Optional[str]): the S3 endpoint to connect to. Callers connecting to AWS do not typically + need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. + + Returns: + IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + """ + if not _JIcebergToolsS3: + raise DHError(message="`create_s3_rest_adapter` requires the Iceberg specific deephaven S3 extensions to be " + "included in the package") + return IcebergCatalogAdapter( _JIcebergToolsS3.createS3Rest( name, @@ -216,4 +287,14 @@ def create_s3_rest_adapter( end_point_override)) def create_s3_aws_glue_adapter() -> IcebergCatalogAdapter: + """ + Create a catalog adapter using the S3 provider and an AWS Glue catalog . + + Args: + TBD: + + Returns: + IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + """ + # TODO: figure out the parameters to connect to an AWS-hosted Iceberg GLUE catalog return IcebergCatalogAdapter(_JIcebergCatalogAdapter.builder().build()) \ No newline at end of file From 9a30c3b9cb7e9ac6728ceba057fe8f6be6dc3053 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 10 Jun 2024 08:31:12 -0700 Subject: [PATCH 03/11] Fixed typo in iceberg test. --- py/server/tests/test_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py index 262b1b2ed7d..1cb9de8b959 100644 --- a/py/server/tests/test_iceberg.py +++ b/py/server/tests/test_iceberg.py @@ -38,7 +38,7 @@ def test_instruction_create_with_col_renames(self): def test_instruction_create_with_table_definitition_dict(self): table_def={ - "x": dtypes.+ .int32, + "x": dtypes.int32, "y": dtypes.double, "z": dtypes.double, } From f44b937b60ff311b993b90a5d9ee1a0c2a30f32a Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 11 Jun 2024 08:46:44 -0700 Subject: [PATCH 04/11] Added AWS Glue functionality. --- .../iceberg/util/IcebergToolsS3.java | 48 ++++++++++ .../iceberg/util/IcebergInstructions.java | 7 ++ py/server/deephaven/experimental/iceberg.py | 91 ++++++------------- 3 files changed, 83 insertions(+), 63 deletions(-) diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java index 6f7845c43eb..21c18ec31eb 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java @@ -7,6 +7,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.rest.RESTCatalog; @@ -23,6 +24,20 @@ public class IcebergToolsS3 extends IcebergTools { private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO"; + /** + * Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a + * value, the system defaults will be used. + * + * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name + * @param catalogURI the URI of the Iceberg REST catalog + * @param warehouseLocation the location of the S3 datafiles backing the catalog + * @param region the AWS region; if omitted, system defaults will be used + * @param accessKeyId the AWS access key ID; if omitted, system defaults will be used + * @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used + * @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service + * such as MinIO or LocalStack + * @return the Iceberg catalog adapter + */ public static IcebergCatalogAdapter createS3Rest( @Nullable final String name, @NotNull final String catalogURI, @@ -62,4 +77,37 @@ public static IcebergCatalogAdapter createS3Rest( return new IcebergCatalogAdapter(catalog, fileIO); } + /** + * Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region + * and credentials. These can be configured by following + * AWS Authentication and + * access credentials guide. + * + * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name + * @param catalogURI the URI of the AWS Glue catalog + * @param warehouseLocation the location of the S3 datafiles backing the catalog + * @return the Iceberg catalog adapter + */ + public static IcebergCatalogAdapter createS3Glue( + @Nullable final String name, + @NotNull final String catalogURI, + @NotNull final String warehouseLocation) { + + // Set up the properties map for the Iceberg catalog + final Map properties = new HashMap<>(); + + final GlueCatalog catalog = new GlueCatalog(); + + properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); + properties.put(CatalogProperties.URI, catalogURI); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + + // TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider + final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null); + + final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; + catalog.initialize(catalogName, properties); + + return new IcebergCatalogAdapter(catalog, fileIO); + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java index 4788e0e8714..f4845a456c5 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java @@ -17,6 +17,13 @@ @Immutable @BuildableStyle public abstract class IcebergInstructions { + /** + * The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system + * defaults for cloud provider-specific parameters + */ + @SuppressWarnings("unused") + public static IcebergInstructions DEFAULT = ImmutableIcebergInstructions.builder().build(); + public static Builder builder() { return ImmutableIcebergInstructions.builder(); } diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 2e197e24b32..4876e61aef8 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -125,21 +125,7 @@ class IcebergCatalogAdapter(JObjectWrapper): def __init__(self, j_object: _JIcebergCatalogAdapter): self.j_catalog_adapter = j_object - def namespaces(self, namespace: Optional[str] = None) -> Sequence[str]: - """ - Returns the list of namespaces in the catalog. - - Args: - namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the - top-level namespaces are listed. - Returns: - Sequence[str]: the list of namespaces. - """ - if namespace is not None: - return _j_object_list_to_str_list(self.j_object.listNamespaces(namespace)) - return _j_object_list_to_str_list(self.j_object.listNamespaces()) - - def namespaces_as_table(self, namespace: Optional[str] = None) -> Table: + def namespaces(self, namespace: Optional[str] = None) -> Table: """ Returns the namespaces in the catalog as a Deephaven table. @@ -155,22 +141,7 @@ def namespaces_as_table(self, namespace: Optional[str] = None) -> Table: return Table(self.j_object.listNamespaces(namespace)) return Table(self.j_object.listNamespacesAsTable()) - def tables(self, namespace: str) -> Sequence[str]: - """ - Returns the list of tables in the provided namespace. - - Args: - namespace (str): the namespace from which to list tables. - - Returns: - Sequence[str]: the list of table names. - """ - - if namespace is not None: - return _j_object_list_to_str_list(self.j_object.listTables(namespace)) - return _j_object_list_to_str_list(self.j_object.listTables()) - - def tables_as_table(self, namespace: Optional[str] = None) -> Table: + def tables(self, namespace: Optional[str] = None) -> Table: """ Returns the list of tables in the provided namespace as a Deephaven table. @@ -185,29 +156,7 @@ def tables_as_table(self, namespace: Optional[str] = None) -> Table: return Table(self.j_object.listTablesAsTable(namespace)) return Table(self.j_object.listTablesAsTable()) - def snapshots(self, table_identifier: str) -> Sequence[str]: - """ - Returns the list of snapshots for the provided table. - - Args: - namespace (str): the table from which to list snapshots. - - Returns: - the list of snapshots and additional information for each snapshot. - """ - - snaphot_list = [] - for snapshot in j_list_to_list(self.j_object.listSnapshots(table_identifier)): - snaphot_list.append({ - "id": snapshot.snapshotId(), - "timestamp_ms": snapshot.timestampMillis(), - "operation": snapshot.operation(), - "summary": snapshot.summary().toString() - - }) - return snaphot_list - - def snapshots_as_table(self, table_identifier: str) -> Table: + def snapshots(self, table_identifier: str) -> Table: """ Returns the list of snapshots of the provided table as a Deephaven table. @@ -244,10 +193,11 @@ def read_table(self, table_identifier: str, instructions: IcebergInstructions, s def j_object(self) -> jpy.JType: return self.j_catalog_adapter + def create_s3_rest_adapter( + catalog_uri: str, + warehouse_location: str, name: Optional[str] = None, - catalog_uri: Optional[str] = None, - warehouse_location: Optional[str] = None, region_name: Optional[str] = None, access_key_id: Optional[str] = None, secret_access_key: Optional[str] = None, @@ -257,10 +207,10 @@ def create_s3_rest_adapter( Create a catalog adapter using an S3-compatible provider and a REST catalog. Args: + catalog_uri (str): the URI of the REST catalog. + warehouse_location (str): the location of the warehouse. name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the catalog URI. - catalog_uri (Optional[str]): the URI of the REST catalog. - warehouse_location (Optional[str]): the location of the warehouse. region_name (Optional[str]): the S3 region name to use. access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be provided to use static credentials, else default credentials will be used. @@ -286,15 +236,30 @@ def create_s3_rest_adapter( secret_access_key, end_point_override)) -def create_s3_aws_glue_adapter() -> IcebergCatalogAdapter: + +def create_s3_aws_glue_adapter( + catalog_uri: str, + warehouse_location: str, + name: Optional[str] = None +) -> IcebergCatalogAdapter: """ - Create a catalog adapter using the S3 provider and an AWS Glue catalog . + Create a catalog adapter using an AWS Glue catalog . Args: - TBD: + catalog_uri (Optional[str]): the URI of the REST catalog. + warehouse_location (Optional[str]): the location of the warehouse. + name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the + catalog URI. Returns: IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. """ - # TODO: figure out the parameters to connect to an AWS-hosted Iceberg GLUE catalog - return IcebergCatalogAdapter(_JIcebergCatalogAdapter.builder().build()) \ No newline at end of file + if not _JIcebergToolsS3: + raise DHError(message="`create_s3_aws_glue_adapter` requires the Iceberg specific deephaven S3 extensions to " + "be included in the package") + + return IcebergCatalogAdapter( + _JIcebergToolsS3.createS3Glue( + name, + catalog_uri, + warehouse_location)) \ No newline at end of file From d56587dbe58846c018eaa30a7db9ceb92c56eddd Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 11 Jun 2024 09:30:04 -0700 Subject: [PATCH 05/11] Addressed first round of comments. --- .../iceberg/util/IcebergToolsS3.java | 2 +- py/server/deephaven/experimental/iceberg.py | 57 +++++-------------- py/server/deephaven/jcompat.py | 29 ++++++++++ py/server/deephaven/parquet.py | 21 +------ 4 files changed, 45 insertions(+), 64 deletions(-) diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java index 21c18ec31eb..4fb6cff011a 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java @@ -88,7 +88,7 @@ public static IcebergCatalogAdapter createS3Rest( * @param warehouseLocation the location of the S3 datafiles backing the catalog * @return the Iceberg catalog adapter */ - public static IcebergCatalogAdapter createS3Glue( + public static IcebergCatalogAdapter createGlue( @Nullable final String name, @NotNull final String catalogURI, @NotNull final String warehouseLocation) { diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 4876e61aef8..2be2e39899c 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -12,23 +12,15 @@ from deephaven.dtypes import DType from deephaven.experimental import s3 -from deephaven.jcompat import j_list_to_list +from deephaven.jcompat import j_list_to_list, j_table_definition from deephaven.table import Table -# If we move Iceberg to a permanent module, we should remove this try/except block and just import the types directly. -try: - _JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") -except Exception: - _JIcebergInstructions = None -try: - _JIcebergCatalog = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalog") -except Exception: - _JIcebergCatalog = None -try: - _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") -except Exception: - _JIcebergCatalogAdapter = None +_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") +_JIcebergCatalog = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalog") +_JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") + +# IcebergToolsS3 is an optional library try: _JIcebergToolsS3 = jpy.get_type("io.deephaven.iceberg.util.IcebergToolsS3") except Exception: @@ -58,9 +50,9 @@ def __init__(self, Initializes the instructions using the provided parameters. Args: - table_definition (Union[Dict[str, DType], List[Column], None]): the table definition; if ommitted, the - definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table will - have that definition. This is useful for specifying a subset of the Iceberg schema columns. + table_definition (Optional[Union[Dict[str, DType], List[Column], None]]): the table definition; if omitted, + the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table + will have that definition. This is useful for specifying a subset of the Iceberg schema columns. data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when reading files from a non-local file system, like S3. column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in @@ -70,15 +62,11 @@ def __init__(self, DHError: If unable to build the instructions object. """ - if not _JIcebergInstructions: - raise DHError(message="IcebergInstructions requires the Iceberg specific deephaven extensions to be " - "included in the package") - try: builder = self.j_object_type.builder() if table_definition is not None: - builder.tableDefinition(_j_table_definition(table_definition)) + builder.tableDefinition(j_table_definition(table_definition)) if data_instructions is not None: builder.dataInstructions(data_instructions.j_object) @@ -95,25 +83,6 @@ def __init__(self, def j_object(self) -> jpy.JType: return self._j_object -def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]: - if table_definition is None: - return None - elif isinstance(table_definition, Dict): - return _JTableDefinition.of( - [ - Column(name=name, data_type=dtype).j_column_definition - for name, dtype in table_definition.items() - ] - ) - elif isinstance(table_definition, List): - return _JTableDefinition.of( - [col.j_column_definition for col in table_definition] - ) - else: - raise DHError(f"Unexpected table_definition type: {type(table_definition)}") - -def _j_object_list_to_str_list(j_object_list: jpy.JType) -> List[str]: - return [x.toString() for x in j_list_to_list(j_object_list)] class IcebergCatalogAdapter(JObjectWrapper): """ @@ -194,7 +163,7 @@ def j_object(self) -> jpy.JType: return self.j_catalog_adapter -def create_s3_rest_adapter( +def s3_rest_adapter( catalog_uri: str, warehouse_location: str, name: Optional[str] = None, @@ -237,7 +206,7 @@ def create_s3_rest_adapter( end_point_override)) -def create_s3_aws_glue_adapter( +def aws_glue_adapter( catalog_uri: str, warehouse_location: str, name: Optional[str] = None @@ -259,7 +228,7 @@ def create_s3_aws_glue_adapter( "be included in the package") return IcebergCatalogAdapter( - _JIcebergToolsS3.createS3Glue( + _JIcebergToolsS3.createGlue( name, catalog_uri, warehouse_location)) \ No newline at end of file diff --git a/py/server/deephaven/jcompat.py b/py/server/deephaven/jcompat.py index e6e921fd8f1..939d30d26f1 100644 --- a/py/server/deephaven/jcompat.py +++ b/py/server/deephaven/jcompat.py @@ -325,6 +325,35 @@ def _j_array_to_series(dtype: DType, j_array: jpy.JType, conv_null: bool) -> pd. return s +def j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]: + """Produce a Deephaven TableDefinition from user input. + + Args: + table_definition (Union[Dict[str, DType], List[Column], None]): the table definition as a dictionary of column + names and their corresponding data types or a list of Column objects + + Returns: + a Deephaven TableDefinition object or None if the input is None + + Raises: + DHError + """ + if table_definition is None: + return None + elif isinstance(table_definition, Dict): + return _JTableDefinition.of( + [ + Column(name=name, data_type=dtype).j_column_definition + for name, dtype in table_definition.items() + ] + ) + elif isinstance(table_definition, List): + return _JTableDefinition.of( + [col.j_column_definition for col in table_definition] + ) + else: + raise DHError(f"Unexpected table_definition type: {type(table_definition)}") + class AutoCloseable(JObjectWrapper): """A context manager wrapper to allow Java AutoCloseable to be used in with statements. diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py index 75794a151c0..aaa9cd2d135 100644 --- a/py/server/deephaven/parquet.py +++ b/py/server/deephaven/parquet.py @@ -14,7 +14,7 @@ from deephaven import DHError from deephaven.column import Column from deephaven.dtypes import DType -from deephaven.jcompat import j_array_list +from deephaven.jcompat import j_array_list, j_table_definition from deephaven.table import Table, PartitionedTable from deephaven.experimental import s3 @@ -142,7 +142,7 @@ def _build_parquet_instructions( raise ValueError("table_definition and col_definitions cannot both be specified.") if table_definition is not None: - builder.setTableDefinition(_j_table_definition(table_definition)) + builder.setTableDefinition(j_table_definition(table_definition)) if col_definitions is not None: builder.setTableDefinition(_JTableDefinition.of([col.j_column_definition for col in col_definitions])) @@ -155,23 +155,6 @@ def _build_parquet_instructions( return builder.build() -def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]: - if table_definition is None: - return None - elif isinstance(table_definition, Dict): - return _JTableDefinition.of( - [ - Column(name=name, data_type=dtype).j_column_definition - for name, dtype in table_definition.items() - ] - ) - elif isinstance(table_definition, List): - return _JTableDefinition.of( - [col.j_column_definition for col in table_definition] - ) - else: - raise DHError(f"Unexpected table_definition type: {type(table_definition)}") - def _j_file_layout(file_layout: Optional[ParquetFileLayout]) -> Optional[jpy.JType]: if file_layout is None: From 8a549fdfacb13280a44ab1fdd3a92867469843d3 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 11 Jun 2024 10:53:26 -0700 Subject: [PATCH 06/11] Bug fixes, manual tests passing. --- py/server/deephaven/experimental/iceberg.py | 6 +----- py/server/deephaven/jcompat.py | 2 ++ 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 2be2e39899c..246fdc3047e 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -12,12 +12,11 @@ from deephaven.dtypes import DType from deephaven.experimental import s3 -from deephaven.jcompat import j_list_to_list, j_table_definition +from deephaven.jcompat import j_table_definition from deephaven.table import Table _JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") -_JIcebergCatalog = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalog") _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") # IcebergToolsS3 is an optional library @@ -30,8 +29,6 @@ _JTableIdentifier = jpy.get_type("org.apache.iceberg.catalog.TableIdentifier") _JSnapshot = jpy.get_type("org.apache.iceberg.Snapshot") -_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") - class IcebergInstructions(JObjectWrapper): """ @@ -45,7 +42,6 @@ def __init__(self, table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None, data_instructions: Optional[s3.S3Instructions] = None, column_renames: Optional[Dict[str, str]] = None): - """ Initializes the instructions using the provided parameters. diff --git a/py/server/deephaven/jcompat.py b/py/server/deephaven/jcompat.py index 939d30d26f1..d807cb472f3 100644 --- a/py/server/deephaven/jcompat.py +++ b/py/server/deephaven/jcompat.py @@ -14,9 +14,11 @@ from deephaven import dtypes, DHError from deephaven._wrapper import unwrap, wrap_j_object, JObjectWrapper from deephaven.dtypes import DType, _PRIMITIVE_DTYPE_NULL_MAP +from deephaven.column import Column _NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE _JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility") +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") _DH_PANDAS_NULLABLE_TYPE_MAP: Dict[DType, pd.api.extensions.ExtensionDtype] = { dtypes.bool_: pd.BooleanDtype, From 5164fdd01caf61e3cfe2045e6ad2a4aba12b5ffc Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Wed, 12 Jun 2024 17:49:53 -0700 Subject: [PATCH 07/11] First round of comments addressed. --- .../iceberg/util/IcebergInstructions.java | 2 +- py/server/deephaven/experimental/iceberg.py | 89 ++++++++++++------- py/server/deephaven/stream/table_publisher.py | 13 +-- py/server/tests/test_iceberg.py | 22 ++++- 4 files changed, 80 insertions(+), 46 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java index f4845a456c5..b595b4cfd14 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java @@ -22,7 +22,7 @@ public abstract class IcebergInstructions { * defaults for cloud provider-specific parameters */ @SuppressWarnings("unused") - public static IcebergInstructions DEFAULT = ImmutableIcebergInstructions.builder().build(); + public static final IcebergInstructions DEFAULT = builder().build(); public static Builder builder() { return ImmutableIcebergInstructions.builder(); diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 246fdc3047e..a369318b141 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -1,7 +1,7 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # -""" This module supports reading external Iceberg tables into Deephaven. """ +""" This module adds Iceberg table support into Deephaven. """ from typing import List, Optional, Union, Dict, Sequence import jpy @@ -36,7 +36,7 @@ class IcebergInstructions(JObjectWrapper): instructions and table definitions, as well as special data instructions for loading data files from the cloud. """ - j_object_type = _JIcebergInstructions or type(None) + j_object_type = _JIcebergInstructions def __init__(self, table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None, @@ -106,7 +106,7 @@ def namespaces(self, namespace: Optional[str] = None) -> Table: return Table(self.j_object.listNamespaces(namespace)) return Table(self.j_object.listNamespacesAsTable()) - def tables(self, namespace: Optional[str] = None) -> Table: + def tables(self, namespace: str) -> Table: """ Returns the list of tables in the provided namespace as a Deephaven table. @@ -134,32 +134,37 @@ def snapshots(self, table_identifier: str) -> Table: return self.j_object.listSnapshotsAsTable(table_identifier) - def read_table(self, table_identifier: str, instructions: IcebergInstructions, snapshot_id: Optional[int] = None) -> Table: + def read_table(self, table_identifier: str, instructions: Optional[IcebergInstructions] = None, snapshot_id: Optional[int] = None) -> Table: """ Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to read a specific snapshot of the table. Args: table_identifier (str): the table to read. - instructions (IcebergInstructions): the instructions for reading the table. These instructions can include - column renames, table definition, and specific data instructions for reading the data files from the - provider. + instructions (Optional[IcebergInstructions]): the instructions for reading the table. These instructions + can include column renames, table definition, and specific data instructions for reading the data files + from the provider. If omitted, the table will be read with default instructions. snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. Returns: Table: the table read from the catalog. """ + if instructions is not None: + instructions_object = instructions.j_object + else: + instructions_object = _JIcebergInstructions.DEFAULT + if snapshot_id is not None: - return Table(self.j_object.readTable(table_identifier, snapshot_id, instructions.j_object)) - return Table(self.j_object.readTable(table_identifier, instructions.j_object)) + return Table(self.j_object.readTable(table_identifier, snapshot_id, instructions_object)) + return Table(self.j_object.readTable(table_identifier, instructions_object)) @property def j_object(self) -> jpy.JType: return self.j_catalog_adapter -def s3_rest_adapter( +def adapter_s3_rest( catalog_uri: str, warehouse_location: str, name: Optional[str] = None, @@ -176,7 +181,10 @@ def s3_rest_adapter( warehouse_location (str): the location of the warehouse. name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the catalog URI. - region_name (Optional[str]): the S3 region name to use. + region_name (Optional[str]): the S3 region name to use; If not provided, the default region will be + picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the + {user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running in + EC2. access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be provided to use static credentials, else default credentials will be used. secret_access_key (Optional[str]): the secret access key for reading files. Both access key and secret key @@ -186,45 +194,58 @@ def s3_rest_adapter( Returns: IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + + Raises: + DHError: If unable to build the catalog adapter. """ if not _JIcebergToolsS3: - raise DHError(message="`create_s3_rest_adapter` requires the Iceberg specific deephaven S3 extensions to be " + raise DHError(message="`adapter_s3_rest` requires the Iceberg specific deephaven S3 extensions to be " "included in the package") - return IcebergCatalogAdapter( - _JIcebergToolsS3.createS3Rest( - name, - catalog_uri, - warehouse_location, - region_name, - access_key_id, - secret_access_key, - end_point_override)) - - -def aws_glue_adapter( + try: + return IcebergCatalogAdapter( + _JIcebergToolsS3.createS3Rest( + name, + catalog_uri, + warehouse_location, + region_name, + access_key_id, + secret_access_key, + end_point_override)) + except Exception as e: + raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e + + +def adapter_aws_glue( catalog_uri: str, warehouse_location: str, name: Optional[str] = None ) -> IcebergCatalogAdapter: """ - Create a catalog adapter using an AWS Glue catalog . + Create a catalog adapter using an AWS Glue catalog. Args: - catalog_uri (Optional[str]): the URI of the REST catalog. - warehouse_location (Optional[str]): the location of the warehouse. + catalog_uri (str): the URI of the REST catalog. + warehouse_location (str): the location of the warehouse. name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the catalog URI. Returns: - IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + IcebergCatalogAdapter: the catalog adapter for the provided AWS Glue catalog. + + Raises: + DHError: If unable to build the catalog adapter. """ if not _JIcebergToolsS3: - raise DHError(message="`create_s3_aws_glue_adapter` requires the Iceberg specific deephaven S3 extensions to " + raise DHError(message="`adapter_aws_glue` requires the Iceberg specific deephaven S3 extensions to " "be included in the package") - return IcebergCatalogAdapter( - _JIcebergToolsS3.createGlue( - name, - catalog_uri, - warehouse_location)) \ No newline at end of file + try: + return IcebergCatalogAdapter( + _JIcebergToolsS3.createGlue( + name, + catalog_uri, + warehouse_location)) + except Exception as e: + raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e + diff --git a/py/server/deephaven/stream/table_publisher.py b/py/server/deephaven/stream/table_publisher.py index cb0d1073de8..a6c65f47885 100644 --- a/py/server/deephaven/stream/table_publisher.py +++ b/py/server/deephaven/stream/table_publisher.py @@ -5,13 +5,13 @@ import jpy -from typing import Callable, Dict, Optional, Tuple +from typing import Callable, Dict, Optional, Tuple, Union, List from deephaven._wrapper import JObjectWrapper from deephaven.column import Column from deephaven.dtypes import DType from deephaven.execution_context import get_exec_ctx -from deephaven.jcompat import j_lambda, j_runnable +from deephaven.jcompat import j_lambda, j_runnable, j_table_definition from deephaven.table import Table from deephaven.update_graph import UpdateGraph @@ -75,7 +75,7 @@ def publish_failure(self, failure: Exception) -> None: def table_publisher( name: str, - col_defs: Dict[str, DType], + col_defs: Union[Dict[str, DType], List[Column]], on_flush_callback: Optional[Callable[[TablePublisher], None]] = None, on_shutdown_callback: Optional[Callable[[], None]] = None, update_graph: Optional[UpdateGraph] = None, @@ -107,12 +107,7 @@ def adapt_callback(_table_publisher: jpy.JType): j_table_publisher = _JTablePublisher.of( name, - _JTableDefinition.of( - [ - Column(name=name, data_type=dtype).j_column_definition - for name, dtype in col_defs.items() - ] - ), + j_table_definition(col_defs), j_lambda(adapt_callback, _JConsumer, None) if on_flush_callback else None, j_runnable(on_shutdown_callback) if on_shutdown_callback else None, (update_graph or get_exec_ctx().update_graph).j_update_graph, diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py index 1cb9de8b959..62ba31e6636 100644 --- a/py/server/tests/test_iceberg.py +++ b/py/server/tests/test_iceberg.py @@ -1,6 +1,7 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # +import jpy from deephaven import dtypes from deephaven.column import Column, ColumnType @@ -8,6 +9,9 @@ from tests.testbase import BaseTestCase from deephaven.experimental import s3, iceberg +from deephaven.jcompat import j_map_to_dict, j_list_to_list + +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") class IcebergTestCase(BaseTestCase): """ Test cases for the deephaven.iceberg module (performed locally) """ @@ -36,7 +40,12 @@ def test_instruction_create_with_col_renames(self): } iceberg_instructions = iceberg.IcebergInstructions(column_renames=renames) - def test_instruction_create_with_table_definitition_dict(self): + col_rename_dict = j_map_to_dict(iceberg_instructions.j_object.columnRenames()) + self.assertTrue(col_rename_dict["old_name_a"] == "new_name_a") + self.assertTrue(col_rename_dict["old_name_b"] == "new_name_b") + self.assertTrue(col_rename_dict["old_name_c"] == "new_name_c") + + def test_instruction_create_with_table_definition_dict(self): table_def={ "x": dtypes.int32, "y": dtypes.double, @@ -44,8 +53,12 @@ def test_instruction_create_with_table_definitition_dict(self): } iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) + self.assertTrue(col_names[0] == "x") + self.assertTrue(col_names[1] == "y") + self.assertTrue(col_names[2] == "z") - def test_instruction_create_with_table_definitition_list(self): + def test_instruction_create_with_table_definition_list(self): table_def=[ Column( "Partition", dtypes.int32, column_type=ColumnType.PARTITIONING @@ -56,3 +69,8 @@ def test_instruction_create_with_table_definitition_list(self): ] iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) + self.assertTrue(col_names[0] == "Partition") + self.assertTrue(col_names[1] == "x") + self.assertTrue(col_names[2] == "y") + self.assertTrue(col_names[3] == "z") From 58d2e19e169a77bda819efc87f401606d017d557 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 17 Jun 2024 14:01:44 -0700 Subject: [PATCH 08/11] Java comments addressed. --- .../iceberg/util/IcebergToolsS3.java | 2 - .../iceberg/util/IcebergToolsTest.java | 18 ----- .../iceberg/util/IcebergCatalogAdapter.java | 69 +++++++------------ 3 files changed, 24 insertions(+), 65 deletions(-) diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java index 4fb6cff011a..166b47e5d28 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java @@ -68,7 +68,6 @@ public static IcebergCatalogAdapter createS3Rest( properties.put(S3FileIOProperties.ENDPOINT, endpointOverride); } - // TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null); final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; @@ -102,7 +101,6 @@ public static IcebergCatalogAdapter createGlue( properties.put(CatalogProperties.URI, catalogURI); properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); - // TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null); final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index b2fb624cf3e..7544976f27b 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -144,13 +144,6 @@ public void testListTables() { "table_identifier_object column type"); // Test the string versions of the methods - tables = adapter.listTables("sales"); - Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace"); - Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)"); - Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")), - "tables.contains(sales_partitioned)"); - Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)"); - table = adapter.listTablesAsTable("sales"); Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); } @@ -181,17 +174,6 @@ public void testListSnapshots() { "snapshot_object column type"); // Test the string versions of the methods - snapshotIds.clear(); - adapter.listSnapshots("sales.sales_multi") - .forEach(snapshot -> snapshotIds.add(snapshot.snapshotId())); - - Assert.eq(snapshotIds.size(), "snapshots.size()", 4, "4 snapshots for sales/sales_multi"); - - Assert.eqTrue(snapshotIds.contains(2001582482032951248L), "snapshots.contains(2001582482032951248)"); - Assert.eqTrue(snapshotIds.contains(8325605756612719366L), "snapshots.contains(8325605756612719366L)"); - Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)"); - Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)"); - table = adapter.listSnapshotsAsTable("sales.sales_multi"); Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index f38aac36c8c..29fb1369312 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -55,7 +55,7 @@ public class IcebergCatalogAdapter { /** * Create a single {@link TableDefinition} from a given Schema, PartitionSpec, and TableDefinition. Takes into - * account {@link Map column rename instructions} + * account {@link Map<> column rename instructions} * * @param schema The schema of the table. * @param partitionSpec The partition specification of the table. @@ -172,18 +172,6 @@ public List listNamespaces(@NotNull final Namespace namespace) { "%s does not implement org.apache.iceberg.catalog.SupportsNamespaces", catalog.getClass().getName())); } - /** - * List all {@link Namespace namespaces} in a given namespace. This method is only supported if the catalog - * implements {@link SupportsNamespaces} for namespace discovery. See - * {@link SupportsNamespaces#listNamespaces(Namespace)}. - * - * @param namespace The namespace(s) to list namespaces in. - * @return A list of all namespaces in the given namespace. - */ - public List listNamespaces(@NotNull final String... namespace) { - return listNamespaces(Namespace.of(namespace)); - } - /** * List all {@link Namespace namespaces} in the catalog as a Deephaven {@link Table table}. The resulting table will * be static and contain the same information as {@link #listNamespaces()}. @@ -247,16 +235,6 @@ public List listTables(@NotNull final Namespace namespace) { return catalog.listTables(namespace); } - /** - * List all Iceberg {@link TableIdentifier tables} in a given namespace. - * - * @param namespace The namespace to list tables in. - * @return A list of all tables in the given namespace. - */ - public List listTables(@NotNull final String... namespace) { - return listTables(Namespace.of(namespace)); - } - /** * List all Iceberg {@link TableIdentifier tables} in a given namespace as a Deephaven {@link Table table}. The * resulting table will be static and contain the same information as {@link #listTables(Namespace)}. @@ -312,16 +290,6 @@ public List listSnapshots(@NotNull final TableIdentifier tableIdentifi return snapshots; } - /** - * List all {@link Snapshot snapshots} of a given Iceberg table. - * - * @param tableIdentifier The identifier of the table from which to gather snapshots. - * @return A list of all snapshots of the given table. - */ - public List listSnapshots(@NotNull final String tableIdentifier) { - return listSnapshots(TableIdentifier.parse(tableIdentifier)); - } - /** * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting * table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}. @@ -391,7 +359,7 @@ public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) { @SuppressWarnings("unused") public Table readTable( @NotNull final TableIdentifier tableIdentifier, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { return readTableInternal(tableIdentifier, null, instructions); } @@ -405,7 +373,7 @@ public Table readTable( @SuppressWarnings("unused") public Table readTable( @NotNull final String tableIdentifier, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { return readTable(TableIdentifier.parse(tableIdentifier), instructions); } @@ -421,7 +389,7 @@ public Table readTable( public Table readTable( @NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { // Find the snapshot with the given snapshot id final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream() @@ -444,7 +412,7 @@ public Table readTable( public Table readTable( @NotNull final String tableIdentifier, final long tableSnapshotId, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions); } @@ -460,32 +428,35 @@ public Table readTable( public Table readTable( @NotNull final TableIdentifier tableIdentifier, @NotNull final Snapshot tableSnapshot, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { return readTableInternal(tableIdentifier, tableSnapshot, instructions); } private Table readTableInternal( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { - // Load the table from the catalog + // Load the table from the catalog. final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); // Do we want the latest or a specific snapshot? final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); final Schema schema = table.schemas().get(snapshot.schemaId()); - // Load the partitioning schema + // Load the partitioning schema. final org.apache.iceberg.PartitionSpec partitionSpec = table.spec(); + // Get default instructions if none are provided + final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; + // Get the user supplied table definition. - final TableDefinition userTableDef = instructions.tableDefinition().orElse(null); + final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null); // Get the table definition from the schema (potentially limited by the user supplied table definition and // applying column renames). final TableDefinition icebergTableDef = - fromSchema(schema, partitionSpec, userTableDef, instructions.columnRenames()); + fromSchema(schema, partitionSpec, userTableDef, userInstructions.columnRenames()); // If the user supplied a table definition, make sure it's fully compatible. final TableDefinition tableDef; @@ -520,11 +491,11 @@ private Table readTableInternal( if (partitionSpec.isUnpartitioned()) { // Create the flat layout location key finder - keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, instructions); + keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions); } else { // Create the partitioning column location key finder keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec, - instructions); + userInstructions); } refreshService = null; @@ -546,4 +517,12 @@ private Table readTableInternal( return result; } + + /** + * Returns the underlying Iceberg {@link Catalog catalog} used by this adapter. + */ + @SuppressWarnings("unused") + public Catalog catalog() { + return catalog; + } } From a02920091a41d1a46693bdf84647cdb03c25a2b1 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 18 Jun 2024 14:39:58 -0700 Subject: [PATCH 09/11] Update Iceberg and AWS versions, itemize aws dependencies. --- buildSrc/src/main/groovy/Classpaths.groovy | 4 ++-- extensions/iceberg/s3/build.gradle | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/Classpaths.groovy b/buildSrc/src/main/groovy/Classpaths.groovy index 89642b93605..429b59f525d 100644 --- a/buildSrc/src/main/groovy/Classpaths.groovy +++ b/buildSrc/src/main/groovy/Classpaths.groovy @@ -126,10 +126,10 @@ class Classpaths { static final String HADOOP_VERSION = '3.4.0' static final String ICEBERG_GROUP = 'org.apache.iceberg' - static final String ICEBERG_VERSION = '1.5.0' + static final String ICEBERG_VERSION = '1.5.2' static final String AWSSDK_GROUP = 'software.amazon.awssdk' - static final String AWSSDK_VERSION = '2.23.19' + static final String AWSSDK_VERSION = '2.24.5' static final String TESTCONTAINER_GROUP = 'org.testcontainers' static final String TESTCONTAINER_VERSION = '1.19.4' diff --git a/extensions/iceberg/s3/build.gradle b/extensions/iceberg/s3/build.gradle index 359651ec7e6..be495b1373d 100644 --- a/extensions/iceberg/s3/build.gradle +++ b/extensions/iceberg/s3/build.gradle @@ -16,8 +16,10 @@ dependencies { implementation project(':extensions-s3') implementation "org.apache.iceberg:iceberg-aws" - runtimeOnly "org.apache.iceberg:iceberg-aws-bundle" + Classpaths.inheritAWSSDK(project) + runtimeOnly "software.amazon.awssdk:sts" + runtimeOnly "software.amazon.awssdk:glue" Classpaths.inheritTestContainers(project) From 5fab2497390607f424d5fb12a1fc0a6630de56f5 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Fri, 21 Jun 2024 17:31:00 -0700 Subject: [PATCH 10/11] Documentation improvements. --- py/server/deephaven/experimental/iceberg.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index a369318b141..7506bc95a25 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -92,7 +92,8 @@ def __init__(self, j_object: _JIcebergCatalogAdapter): def namespaces(self, namespace: Optional[str] = None) -> Table: """ - Returns the namespaces in the catalog as a Deephaven table. + Returns information on the namespaces in the catalog as a Deephaven table. If a namespace is specified, the + tables in that namespace are listed; otherwise the top-level namespaces are listed. Args: namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the @@ -108,7 +109,7 @@ def namespaces(self, namespace: Optional[str] = None) -> Table: def tables(self, namespace: str) -> Table: """ - Returns the list of tables in the provided namespace as a Deephaven table. + Returns information on the tables in the specified namespace as a Deephaven table. Args: namespace (str): the namespace from which to list tables. @@ -123,7 +124,7 @@ def tables(self, namespace: str) -> Table: def snapshots(self, table_identifier: str) -> Table: """ - Returns the list of snapshots of the provided table as a Deephaven table. + Returns information on the snapshots of the specified table as a Deephaven table. Args: table_identifier (str): the table from which to list snapshots. From fe0ced60e49a7f65fae2abfb6be2ce7c74876de3 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 24 Jun 2024 08:40:57 -0700 Subject: [PATCH 11/11] Change table column names for Iceberg API --- .../iceberg/util/IcebergCatalogAdapter.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index 29fb1369312..c54535599c3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -197,11 +197,11 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) { // Create the column source(s) final String[] namespaceArr = new String[(int) size]; - columnSourceMap.put("namespace", + columnSourceMap.put("Namespace", InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null)); final Namespace[] namespaceObjectArr = new Namespace[(int) size]; - columnSourceMap.put("namespace_object", + columnSourceMap.put("NamespaceObject", InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceObjectArr, Namespace.class, null)); // Populate the column source arrays @@ -251,15 +251,15 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) { // Create the column source(s) final String[] namespaceArr = new String[(int) size]; - columnSourceMap.put("namespace", + columnSourceMap.put("Namespace", InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null)); final String[] tableNameArr = new String[(int) size]; - columnSourceMap.put("table_name", + columnSourceMap.put("TableName", InMemoryColumnSource.getImmutableMemoryColumnSource(tableNameArr, String.class, null)); final TableIdentifier[] tableIdentifierArr = new TableIdentifier[(int) size]; - columnSourceMap.put("table_identifier_object", + columnSourceMap.put("TableIdentifierObject", InMemoryColumnSource.getImmutableMemoryColumnSource(tableIdentifierArr, TableIdentifier.class, null)); // Populate the column source arrays @@ -306,22 +306,22 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier // Create the column source(s) final long[] idArr = new long[(int) size]; - columnSourceMap.put("id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null)); + columnSourceMap.put("Id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null)); final long[] timestampArr = new long[(int) size]; - columnSourceMap.put("timestamp_ms", + columnSourceMap.put("TimestampMs", InMemoryColumnSource.getImmutableMemoryColumnSource(timestampArr, long.class, null)); final String[] operatorArr = new String[(int) size]; - columnSourceMap.put("operation", + columnSourceMap.put("Operation", InMemoryColumnSource.getImmutableMemoryColumnSource(operatorArr, String.class, null)); final Map[] summaryArr = new Map[(int) size]; - columnSourceMap.put("summary", + columnSourceMap.put("Summary", InMemoryColumnSource.getImmutableMemoryColumnSource(summaryArr, Map.class, null)); final Snapshot[] snapshotArr = new Snapshot[(int) size]; - columnSourceMap.put("snapshot_object", + columnSourceMap.put("SnapshotObject", InMemoryColumnSource.getImmutableMemoryColumnSource(snapshotArr, Snapshot.class, null)); // Populate the column source(s)