Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose Iceberg features to python users #5590

Merged
merged 13 commits into from
Jun 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
Expand All @@ -23,6 +24,20 @@
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the Iceberg REST catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @param region the AWS region; if omitted, system defaults will be used
* @param accessKeyId the AWS access key ID; if omitted, system defaults will be used
* @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used
* @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service
* such as MinIO or LocalStack
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createS3Rest(
@Nullable final String name,
@NotNull final String catalogURI,
Expand Down Expand Up @@ -53,7 +68,6 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

// TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider
final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
Expand All @@ -62,4 +76,36 @@ public static IcebergCatalogAdapter createS3Rest(
return new IcebergCatalogAdapter(catalog, fileIO);
}

/**
* Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region
* and credentials. These can be configured by following
* <a href="https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html">AWS Authentication and
* access credentials</a> guide.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the AWS Glue catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createGlue(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation) {

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,23 @@ public void testListTables() {

final Namespace ns = Namespace.of("sales");

final Collection<TableIdentifier> tables = adapter.listTables(ns);
Collection<TableIdentifier> tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

final Table table = adapter.listTablesAsTable(ns);
Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class),
"table_identifier_object column type");

// Test the string versions of the methods
table = adapter.listTablesAsTable("sales");
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
}

@Test
Expand All @@ -160,14 +164,18 @@ public void testListSnapshots() {
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

final Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type");
Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type");
Assert.eqTrue(table.getColumnSource("operation").getType().equals(String.class), "operation column type");
Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type");
Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class),
"snapshot_object column type");

// Test the string versions of the methods
table = adapter.listSnapshotsAsTable("sales.sales_multi");
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
}

@Test
Expand All @@ -180,7 +188,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_partitioned", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand All @@ -196,9 +210,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_multi", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}

@Test
Expand All @@ -211,7 +231,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_single", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand Down Expand Up @@ -563,16 +589,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx
final List<Snapshot> snapshots = adapter.listSnapshots(tableId);

// Verify we retrieved all the rows.
final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods

// Verify we retrieved all the rows.
table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class IcebergCatalogAdapter {

/**
* Create a single {@link TableDefinition} from a given Schema, PartitionSpec, and TableDefinition. Takes into
* account {@link Map<String,String> column rename instructions}
* account {@link Map<> column rename instructions}
*
* @param schema The schema of the table.
* @param partitionSpec The partition specification of the table.
Expand Down Expand Up @@ -215,6 +215,16 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

/**
* List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listNamespaces(Namespace)}.
*
* @return A {@link Table table} of all namespaces.
*/
public Table listNamespacesAsTable(@NotNull final String... namespace) {
return listNamespacesAsTable(Namespace.of(namespace));
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace.
*
Expand Down Expand Up @@ -264,6 +274,10 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

public Table listTablesAsTable(@NotNull final String... namespace) {
return listTablesAsTable(Namespace.of(namespace));
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table.
*
Expand Down Expand Up @@ -324,6 +338,17 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}.
*
* @param tableIdentifier The identifier of the table from which to gather snapshots.
* @return A list of all tables in the given namespace.
*/
public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier));
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -334,10 +359,24 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
@SuppressWarnings("unused")
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
@NotNull final IcebergInstructions instructions) {
@Nullable final IcebergInstructions instructions) {
return readTableInternal(tableIdentifier, null, instructions);
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param instructions The instructions for customizations while reading
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
@Nullable final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -350,7 +389,7 @@ public Table readTable(
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
final long tableSnapshotId,
@NotNull final IcebergInstructions instructions) {
@Nullable final IcebergInstructions instructions) {

// Find the snapshot with the given snapshot id
final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream()
Expand All @@ -361,6 +400,22 @@ public Table readTable(
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param tableSnapshotId The snapshot id to load
* @param instructions The instructions for customizations while reading
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
final long tableSnapshotId,
@Nullable final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -373,32 +428,35 @@ public Table readTable(
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
@NotNull final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions) {
@Nullable final IcebergInstructions instructions) {
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}

private Table readTableInternal(
@NotNull final TableIdentifier tableIdentifier,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions) {
@Nullable final IcebergInstructions instructions) {

// Load the table from the catalog
// Load the table from the catalog.
final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier);

// Do we want the latest or a specific snapshot?
final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot();
final Schema schema = table.schemas().get(snapshot.schemaId());

// Load the partitioning schema
// Load the partitioning schema.
final org.apache.iceberg.PartitionSpec partitionSpec = table.spec();

// Get default instructions if none are provided
final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions;

// Get the user supplied table definition.
final TableDefinition userTableDef = instructions.tableDefinition().orElse(null);
final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null);

// Get the table definition from the schema (potentially limited by the user supplied table definition and
// applying column renames).
final TableDefinition icebergTableDef =
fromSchema(schema, partitionSpec, userTableDef, instructions.columnRenames());
fromSchema(schema, partitionSpec, userTableDef, userInstructions.columnRenames());

// If the user supplied a table definition, make sure it's fully compatible.
final TableDefinition tableDef;
Expand Down Expand Up @@ -433,11 +491,11 @@ private Table readTableInternal(

if (partitionSpec.isUnpartitioned()) {
// Create the flat layout location key finder
keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, instructions);
keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions);
} else {
// Create the partitioning column location key finder
keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec,
instructions);
userInstructions);
}

refreshService = null;
Expand All @@ -459,4 +517,12 @@ private Table readTableInternal(

return result;
}

/**
* Returns the underlying Iceberg {@link Catalog catalog} used by this adapter.
*/
@SuppressWarnings("unused")
public Catalog catalog() {
return catalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
@Immutable
@BuildableStyle
public abstract class IcebergInstructions {
/**
* The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system
* defaults for cloud provider-specific parameters
*/
@SuppressWarnings("unused")
public static final IcebergInstructions DEFAULT = builder().build();

public static Builder builder() {
return ImmutableIcebergInstructions.builder();
}
Expand Down
Loading
Loading