Skip to content

Commit

Permalink
Much better docs, need to figure out AWS Glue ASAP.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jun 7, 2024
1 parent 1b39b99 commit c93d052
Showing 1 changed file with 107 additions and 26 deletions.
133 changes: 107 additions & 26 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
""" This module supports reading external Iceberg tables into Deephaven. """
from typing import List, Optional, Union, Dict, Sequence

import jpy
Expand Down Expand Up @@ -40,12 +41,10 @@
_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")


"""
XXXXXXXX
"""
class IcebergInstructions(JObjectWrapper):
"""
XXXXXXXXXX provides specialized instructions for reading from S3-compatible APIs.
This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename
instructions and table definitions, as well as special data instructions for loading data files from the cloud.
"""

j_object_type = _JIcebergInstructions or type(None)
Expand All @@ -56,16 +55,16 @@ def __init__(self,
column_renames: Optional[Dict[str, str]] = None):

"""
Initializes the instructions.
Initializes the instructions using the provided parameters.
Args:
table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None,
the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will
have that definition. This is useful for bootstrapping purposes when the initially partitioned directory is
empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition.
data_instructions (Optional[s3.S3Instructions]): Special instructions for reading parquet files, useful when
reading files from a non-local file system, like S3. By default, None.
column_renames (Optional[Dict[str, str]]): A dictionary of column renames, by default None. When None, no columns will be renamed.
table_definition (Union[Dict[str, DType], List[Column], None]): the table definition; if ommitted, the
definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table will
have that definition. This is useful for specifying a subset of the Iceberg schema columns.
data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when
reading files from a non-local file system, like S3.
column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in
the output table.
Raises:
DHError: If unable to build the instructions object.
Expand Down Expand Up @@ -118,6 +117,8 @@ def _j_object_list_to_str_list(j_object_list: jpy.JType) -> List[str]:

class IcebergCatalogAdapter(JObjectWrapper):
"""
This class provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and
snapshots, as well as reading Iceberg tables into Deephaven tables.
"""
j_object_type = _JIcebergCatalogAdapter or type(None)

Expand All @@ -126,36 +127,73 @@ def __init__(self, j_object: _JIcebergCatalogAdapter):

def namespaces(self, namespace: Optional[str] = None) -> Sequence[str]:
"""
Returns the list of namespaces in the catalog as strings.
Returns the list of namespaces in the catalog.
:param namespace:
:return:
Args:
namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the
top-level namespaces are listed.
Returns:
Sequence[str]: the list of namespaces.
"""
if namespace is not None:
return _j_object_list_to_str_list(self.j_object.listNamespaces(namespace))
return _j_object_list_to_str_list(self.j_object.listNamespaces())

def namespaces_as_table(self, namespace: Optional[str] = None) -> Table:
"""
Returns the namespaces in the catalog as a Deephaven table.
Args:
namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the
top-level namespaces are listed.
Returns:
a table containing the namespaces.
"""

if namespace is not None:
return Table(self.j_object.listNamespaces(namespace))
return Table(self.j_object.listNamespacesAsTable())

def tables(self, namespace: Optional[str] = None) -> Sequence[str]:
def tables(self, namespace: str) -> Sequence[str]:
"""
Returns the list of tables in the provided namespace.
Args:
namespace (str): the namespace from which to list tables.
Returns:
Sequence[str]: the list of table names.
"""

if namespace is not None:
return _j_object_list_to_str_list(self.j_object.listTables(namespace))
return _j_object_list_to_str_list(self.j_object.listTables())

def tables_as_table(self, namespace: Optional[str] = None) -> Table:
"""
Returns the list of tables in the provided namespace as a Deephaven table.
Args:
namespace (str): the namespace from which to list tables.
Returns:
a table containing the tables in the provided namespace.
"""

if namespace is not None:
return Table(self.j_object.listTablesAsTable(namespace))
return Table(self.j_object.listTablesAsTable())

def snapshots(self, table_identifier: str) -> Sequence[str]:
"""
Returns a list of snapshots for the specified table.
Returns the list of snapshots for the provided table.
Args:
namespace (str): the table from which to list snapshots.
:param table_identifier:
:return:
Returns:
the list of snapshots and additional information for each snapshot.
"""

snaphot_list = []
Expand All @@ -171,21 +209,31 @@ def snapshots(self, table_identifier: str) -> Sequence[str]:

def snapshots_as_table(self, table_identifier: str) -> Table:
"""
Returns a list of snapshots for the specified table.
Returns the list of snapshots of the provided table as a Deephaven table.
Args:
table_identifier (str): the table from which to list snapshots.
:param table_identifier:
:return:
Returns:
a table containing the snapshot information.
"""

return self.j_object.listSnapshotsAsTable(table_identifier)

def read_table(self, table_identifier: str, instructions: IcebergInstructions, snapshot_id: Optional[int] = None) -> Table:
"""
Reads the specified table.
Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to
read a specific snapshot of the table.
:param table_identifier:
:param snapshot_id:
:return:
Args:
table_identifier (str): the table to read.
instructions (IcebergInstructions): the instructions for reading the table. These instructions can include
column renames, table definition, and specific data instructions for reading the data files from the
provider.
snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected.
Returns:
Table: the table read from the catalog.
"""

if snapshot_id is not None:
Expand All @@ -205,6 +253,29 @@ def create_s3_rest_adapter(
secret_access_key: Optional[str] = None,
end_point_override: Optional[str] = None
) -> IcebergCatalogAdapter:
"""
Create a catalog adapter using an S3-compatible provider and a REST catalog.
Args:
name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the
catalog URI.
catalog_uri (Optional[str]): the URI of the REST catalog.
warehouse_location (Optional[str]): the location of the warehouse.
region_name (Optional[str]): the S3 region name to use.
access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be
provided to use static credentials, else default credentials will be used.
secret_access_key (Optional[str]): the secret access key for reading files. Both access key and secret key
must be provided to use static credentials, else default credentials will be used.
end_point_override (Optional[str]): the S3 endpoint to connect to. Callers connecting to AWS do not typically
need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs.
Returns:
IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog.
"""
if not _JIcebergToolsS3:
raise DHError(message="`create_s3_rest_adapter` requires the Iceberg specific deephaven S3 extensions to be "
"included in the package")

return IcebergCatalogAdapter(
_JIcebergToolsS3.createS3Rest(
name,
Expand All @@ -216,4 +287,14 @@ def create_s3_rest_adapter(
end_point_override))

def create_s3_aws_glue_adapter() -> IcebergCatalogAdapter:
"""
Create a catalog adapter using the S3 provider and an AWS Glue catalog .
Args:
TBD:
Returns:
IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog.
"""
# TODO: figure out the parameters to connect to an AWS-hosted Iceberg GLUE catalog
return IcebergCatalogAdapter(_JIcebergCatalogAdapter.builder().build())

0 comments on commit c93d052

Please sign in to comment.