Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose Iceberg features to python users #5590

Merged
merged 13 commits into from
Jun 24, 2024
4 changes: 2 additions & 2 deletions buildSrc/src/main/groovy/Classpaths.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ class Classpaths {
static final String HADOOP_VERSION = '3.4.0'

static final String ICEBERG_GROUP = 'org.apache.iceberg'
static final String ICEBERG_VERSION = '1.5.0'
static final String ICEBERG_VERSION = '1.5.2'

static final String AWSSDK_GROUP = 'software.amazon.awssdk'
static final String AWSSDK_VERSION = '2.23.19'
static final String AWSSDK_VERSION = '2.24.5'

static final String TESTCONTAINER_GROUP = 'org.testcontainers'
static final String TESTCONTAINER_VERSION = '1.19.4'
Expand Down
4 changes: 3 additions & 1 deletion extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ dependencies {

implementation project(':extensions-s3')
implementation "org.apache.iceberg:iceberg-aws"
runtimeOnly "org.apache.iceberg:iceberg-aws-bundle"

Classpaths.inheritAWSSDK(project)
runtimeOnly "software.amazon.awssdk:sts"
runtimeOnly "software.amazon.awssdk:glue"

Classpaths.inheritTestContainers(project)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
Expand All @@ -23,6 +24,20 @@
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the Iceberg REST catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @param region the AWS region; if omitted, system defaults will be used
* @param accessKeyId the AWS access key ID; if omitted, system defaults will be used
* @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used
* @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service
* such as MinIO or LocalStack
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createS3Rest(
@Nullable final String name,
@NotNull final String catalogURI,
Expand Down Expand Up @@ -53,7 +68,6 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

// TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider
final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
Expand All @@ -62,4 +76,36 @@ public static IcebergCatalogAdapter createS3Rest(
return new IcebergCatalogAdapter(catalog, fileIO);
}

/**
* Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region
* and credentials. These can be configured by following
* <a href="https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html">AWS Authentication and
* access credentials</a> guide.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the AWS Glue catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createGlue(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation) {

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,23 @@ public void testListTables() {

final Namespace ns = Namespace.of("sales");

final Collection<TableIdentifier> tables = adapter.listTables(ns);
Collection<TableIdentifier> tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

final Table table = adapter.listTablesAsTable(ns);
Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class),
"table_identifier_object column type");

// Test the string versions of the methods
table = adapter.listTablesAsTable("sales");
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
}

@Test
Expand All @@ -160,14 +164,18 @@ public void testListSnapshots() {
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

final Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type");
Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type");
Assert.eqTrue(table.getColumnSource("operation").getType().equals(String.class), "operation column type");
Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type");
Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class),
"snapshot_object column type");

// Test the string versions of the methods
table = adapter.listSnapshotsAsTable("sales.sales_multi");
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
}

@Test
Expand All @@ -180,7 +188,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_partitioned", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand All @@ -196,9 +210,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_multi", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}

@Test
Expand All @@ -211,7 +231,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_single", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand Down Expand Up @@ -563,16 +589,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx
final List<Snapshot> snapshots = adapter.listSnapshots(tableId);

// Verify we retrieved all the rows.
final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods

// Verify we retrieved all the rows.
table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
}

Expand Down
Loading
Loading