From c93d05257fbadc847276485bd653c9f0258a0759 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Fri, 7 Jun 2024 16:28:31 -0700 Subject: [PATCH] Much better docs, need to figure out AWS Glue ASAP. --- py/server/deephaven/experimental/iceberg.py | 133 ++++++++++++++++---- 1 file changed, 107 insertions(+), 26 deletions(-) diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 29f104b3392..2e197e24b32 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -1,6 +1,7 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # +""" This module supports reading external Iceberg tables into Deephaven. """ from typing import List, Optional, Union, Dict, Sequence import jpy @@ -40,12 +41,10 @@ _JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") -""" - XXXXXXXX -""" class IcebergInstructions(JObjectWrapper): """ - XXXXXXXXXX provides specialized instructions for reading from S3-compatible APIs. + This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename + instructions and table definitions, as well as special data instructions for loading data files from the cloud. """ j_object_type = _JIcebergInstructions or type(None) @@ -56,16 +55,16 @@ def __init__(self, column_renames: Optional[Dict[str, str]] = None): """ - Initializes the instructions. + Initializes the instructions using the provided parameters. Args: - table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None, - the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will - have that definition. This is useful for bootstrapping purposes when the initially partitioned directory is - empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. - data_instructions (Optional[s3.S3Instructions]): Special instructions for reading parquet files, useful when - reading files from a non-local file system, like S3. By default, None. - column_renames (Optional[Dict[str, str]]): A dictionary of column renames, by default None. When None, no columns will be renamed. + table_definition (Union[Dict[str, DType], List[Column], None]): the table definition; if ommitted, the + definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table will + have that definition. This is useful for specifying a subset of the Iceberg schema columns. + data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when + reading files from a non-local file system, like S3. + column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in + the output table. Raises: DHError: If unable to build the instructions object. @@ -118,6 +117,8 @@ def _j_object_list_to_str_list(j_object_list: jpy.JType) -> List[str]: class IcebergCatalogAdapter(JObjectWrapper): """ + This class provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and + snapshots, as well as reading Iceberg tables into Deephaven tables. """ j_object_type = _JIcebergCatalogAdapter or type(None) @@ -126,36 +127,73 @@ def __init__(self, j_object: _JIcebergCatalogAdapter): def namespaces(self, namespace: Optional[str] = None) -> Sequence[str]: """ - Returns the list of namespaces in the catalog as strings. + Returns the list of namespaces in the catalog. - :param namespace: - :return: + Args: + namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the + top-level namespaces are listed. + Returns: + Sequence[str]: the list of namespaces. """ if namespace is not None: return _j_object_list_to_str_list(self.j_object.listNamespaces(namespace)) return _j_object_list_to_str_list(self.j_object.listNamespaces()) def namespaces_as_table(self, namespace: Optional[str] = None) -> Table: + """ + Returns the namespaces in the catalog as a Deephaven table. + + Args: + namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the + top-level namespaces are listed. + + Returns: + a table containing the namespaces. + """ + if namespace is not None: return Table(self.j_object.listNamespaces(namespace)) return Table(self.j_object.listNamespacesAsTable()) - def tables(self, namespace: Optional[str] = None) -> Sequence[str]: + def tables(self, namespace: str) -> Sequence[str]: + """ + Returns the list of tables in the provided namespace. + + Args: + namespace (str): the namespace from which to list tables. + + Returns: + Sequence[str]: the list of table names. + """ + if namespace is not None: return _j_object_list_to_str_list(self.j_object.listTables(namespace)) return _j_object_list_to_str_list(self.j_object.listTables()) def tables_as_table(self, namespace: Optional[str] = None) -> Table: + """ + Returns the list of tables in the provided namespace as a Deephaven table. + + Args: + namespace (str): the namespace from which to list tables. + + Returns: + a table containing the tables in the provided namespace. + """ + if namespace is not None: return Table(self.j_object.listTablesAsTable(namespace)) return Table(self.j_object.listTablesAsTable()) def snapshots(self, table_identifier: str) -> Sequence[str]: """ - Returns a list of snapshots for the specified table. + Returns the list of snapshots for the provided table. + + Args: + namespace (str): the table from which to list snapshots. - :param table_identifier: - :return: + Returns: + the list of snapshots and additional information for each snapshot. """ snaphot_list = [] @@ -171,21 +209,31 @@ def snapshots(self, table_identifier: str) -> Sequence[str]: def snapshots_as_table(self, table_identifier: str) -> Table: """ - Returns a list of snapshots for the specified table. + Returns the list of snapshots of the provided table as a Deephaven table. + + Args: + table_identifier (str): the table from which to list snapshots. - :param table_identifier: - :return: + Returns: + a table containing the snapshot information. """ return self.j_object.listSnapshotsAsTable(table_identifier) def read_table(self, table_identifier: str, instructions: IcebergInstructions, snapshot_id: Optional[int] = None) -> Table: """ - Reads the specified table. + Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to + read a specific snapshot of the table. - :param table_identifier: - :param snapshot_id: - :return: + Args: + table_identifier (str): the table to read. + instructions (IcebergInstructions): the instructions for reading the table. These instructions can include + column renames, table definition, and specific data instructions for reading the data files from the + provider. + snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. + + Returns: + Table: the table read from the catalog. """ if snapshot_id is not None: @@ -205,6 +253,29 @@ def create_s3_rest_adapter( secret_access_key: Optional[str] = None, end_point_override: Optional[str] = None ) -> IcebergCatalogAdapter: + """ + Create a catalog adapter using an S3-compatible provider and a REST catalog. + + Args: + name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the + catalog URI. + catalog_uri (Optional[str]): the URI of the REST catalog. + warehouse_location (Optional[str]): the location of the warehouse. + region_name (Optional[str]): the S3 region name to use. + access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be + provided to use static credentials, else default credentials will be used. + secret_access_key (Optional[str]): the secret access key for reading files. Both access key and secret key + must be provided to use static credentials, else default credentials will be used. + end_point_override (Optional[str]): the S3 endpoint to connect to. Callers connecting to AWS do not typically + need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. + + Returns: + IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + """ + if not _JIcebergToolsS3: + raise DHError(message="`create_s3_rest_adapter` requires the Iceberg specific deephaven S3 extensions to be " + "included in the package") + return IcebergCatalogAdapter( _JIcebergToolsS3.createS3Rest( name, @@ -216,4 +287,14 @@ def create_s3_rest_adapter( end_point_override)) def create_s3_aws_glue_adapter() -> IcebergCatalogAdapter: + """ + Create a catalog adapter using the S3 provider and an AWS Glue catalog . + + Args: + TBD: + + Returns: + IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + """ + # TODO: figure out the parameters to connect to an AWS-hosted Iceberg GLUE catalog return IcebergCatalogAdapter(_JIcebergCatalogAdapter.builder().build()) \ No newline at end of file