Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose Iceberg features to python users #5590

Merged
merged 13 commits into from
Jun 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
Expand All @@ -23,6 +24,20 @@
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the Iceberg REST catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @param region the AWS region; if omitted, system defaults will be used
* @param accessKeyId the AWS access key ID; if omitted, system defaults will be used
* @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used
* @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service
* such as MinIO or LocalStack
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createS3Rest(
@Nullable final String name,
@NotNull final String catalogURI,
Expand Down Expand Up @@ -62,4 +77,37 @@ public static IcebergCatalogAdapter createS3Rest(
return new IcebergCatalogAdapter(catalog, fileIO);
}

/**
* Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region
* and credentials. These can be configured by following
* <a href="https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html">AWS Authentication and
* access credentials</a> guide.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the AWS Glue catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createS3Glue(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation) {

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,30 @@ public void testListTables() {

final Namespace ns = Namespace.of("sales");

final Collection<TableIdentifier> tables = adapter.listTables(ns);
Collection<TableIdentifier> tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

final Table table = adapter.listTablesAsTable(ns);
Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class),
"table_identifier_object column type");

// Test the string versions of the methods
tables = adapter.listTables("sales");
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

table = adapter.listTablesAsTable("sales");
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
}

@Test
Expand All @@ -160,14 +171,29 @@ public void testListSnapshots() {
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

final Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type");
Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type");
Assert.eqTrue(table.getColumnSource("operation").getType().equals(String.class), "operation column type");
Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type");
Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class),
"snapshot_object column type");

// Test the string versions of the methods
snapshotIds.clear();
adapter.listSnapshots("sales.sales_multi")
.forEach(snapshot -> snapshotIds.add(snapshot.snapshotId()));

Assert.eq(snapshotIds.size(), "snapshots.size()", 4, "4 snapshots for sales/sales_multi");

Assert.eqTrue(snapshotIds.contains(2001582482032951248L), "snapshots.contains(2001582482032951248)");
Assert.eqTrue(snapshotIds.contains(8325605756612719366L), "snapshots.contains(8325605756612719366L)");
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

table = adapter.listSnapshotsAsTable("sales.sales_multi");
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
}

@Test
Expand All @@ -180,7 +206,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_partitioned", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand All @@ -196,9 +228,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_multi", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}

@Test
Expand All @@ -211,7 +249,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_single", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand Down Expand Up @@ -563,16 +607,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx
final List<Snapshot> snapshots = adapter.listSnapshots(tableId);

// Verify we retrieved all the rows.
final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods

// Verify we retrieved all the rows.
table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ public List<Namespace> listNamespaces(@NotNull final Namespace namespace) {
"%s does not implement org.apache.iceberg.catalog.SupportsNamespaces", catalog.getClass().getName()));
}

/**
* List all {@link Namespace namespaces} in a given namespace. This method is only supported if the catalog
* implements {@link SupportsNamespaces} for namespace discovery. See
* {@link SupportsNamespaces#listNamespaces(Namespace)}.
*
* @param namespace The namespace(s) to list namespaces in.
* @return A list of all namespaces in the given namespace.
*/
public List<Namespace> listNamespaces(@NotNull final String... namespace) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return listNamespaces(Namespace.of(namespace));
}

/**
* List all {@link Namespace namespaces} in the catalog as a Deephaven {@link Table table}. The resulting table will
* be static and contain the same information as {@link #listNamespaces()}.
Expand Down Expand Up @@ -215,6 +227,16 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

/**
* List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listNamespaces(Namespace)}.
*
* @return A {@link Table table} of all namespaces.
*/
public Table listNamespacesAsTable(@NotNull final String... namespace) {
return listNamespacesAsTable(Namespace.of(namespace));
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace.
*
Expand All @@ -225,6 +247,16 @@ public List<TableIdentifier> listTables(@NotNull final Namespace namespace) {
return catalog.listTables(namespace);
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace.
*
* @param namespace The namespace to list tables in.
* @return A list of all tables in the given namespace.
*/
public List<TableIdentifier> listTables(@NotNull final String... namespace) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return listTables(Namespace.of(namespace));
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace as a Deephaven {@link Table table}. The
* resulting table will be static and contain the same information as {@link #listTables(Namespace)}.
Expand Down Expand Up @@ -264,6 +296,10 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

public Table listTablesAsTable(@NotNull final String... namespace) {
return listTablesAsTable(Namespace.of(namespace));
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table.
*
Expand All @@ -276,6 +312,16 @@ public List<Snapshot> listSnapshots(@NotNull final TableIdentifier tableIdentifi
return snapshots;
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table.
*
* @param tableIdentifier The identifier of the table from which to gather snapshots.
* @return A list of all snapshots of the given table.
*/
public List<Snapshot> listSnapshots(@NotNull final String tableIdentifier) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return listSnapshots(TableIdentifier.parse(tableIdentifier));
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}.
Expand Down Expand Up @@ -324,6 +370,17 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}.
*
* @param tableIdentifier The identifier of the table from which to gather snapshots.
* @return A list of all tables in the given namespace.
*/
public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier));
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -338,6 +395,20 @@ public Table readTable(
return readTableInternal(tableIdentifier, null, instructions);
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param instructions The instructions for customizations while reading
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
@NotNull final IcebergInstructions instructions) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return readTable(TableIdentifier.parse(tableIdentifier), instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -361,6 +432,22 @@ public Table readTable(
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param tableSnapshotId The snapshot id to load
* @param instructions The instructions for customizations while reading
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
final long tableSnapshotId,
@NotNull final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
@Immutable
@BuildableStyle
public abstract class IcebergInstructions {
/**
* The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system
* defaults for cloud provider-specific parameters
*/
@SuppressWarnings("unused")
public static IcebergInstructions DEFAULT = ImmutableIcebergInstructions.builder().build();

public static Builder builder() {
return ImmutableIcebergInstructions.builder();
}
Expand Down
Loading
Loading