Skip to content

Commit

Permalink
WIP commit, functional but needs docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jun 7, 2024
1 parent 6881afb commit 1b39b99
Show file tree
Hide file tree
Showing 4 changed files with 433 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,30 @@ public void testListTables() {

final Namespace ns = Namespace.of("sales");

final Collection<TableIdentifier> tables = adapter.listTables(ns);
Collection<TableIdentifier> tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

final Table table = adapter.listTablesAsTable(ns);
Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class),
"table_identifier_object column type");

// Test the string versions of the methods
tables = adapter.listTables("sales");
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

table = adapter.listTablesAsTable("sales");
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
}

@Test
Expand All @@ -160,14 +171,29 @@ public void testListSnapshots() {
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

final Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type");
Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type");
Assert.eqTrue(table.getColumnSource("operation").getType().equals(String.class), "operation column type");
Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type");
Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class),
"snapshot_object column type");

// Test the string versions of the methods
snapshotIds.clear();
adapter.listSnapshots("sales.sales_multi")
.forEach(snapshot -> snapshotIds.add(snapshot.snapshotId()));

Assert.eq(snapshotIds.size(), "snapshots.size()", 4, "4 snapshots for sales/sales_multi");

Assert.eqTrue(snapshotIds.contains(2001582482032951248L), "snapshots.contains(2001582482032951248)");
Assert.eqTrue(snapshotIds.contains(8325605756612719366L), "snapshots.contains(8325605756612719366L)");
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

table = adapter.listSnapshotsAsTable("sales.sales_multi");
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
}

@Test
Expand All @@ -180,7 +206,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_partitioned", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand All @@ -196,9 +228,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_multi", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}

@Test
Expand All @@ -211,7 +249,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_single", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand Down Expand Up @@ -563,16 +607,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx
final List<Snapshot> snapshots = adapter.listSnapshots(tableId);

// Verify we retrieved all the rows.
final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods

// Verify we retrieved all the rows.
table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ public List<Namespace> listNamespaces(@NotNull final Namespace namespace) {
"%s does not implement org.apache.iceberg.catalog.SupportsNamespaces", catalog.getClass().getName()));
}

/**
* List all {@link Namespace namespaces} in a given namespace. This method is only supported if the catalog
* implements {@link SupportsNamespaces} for namespace discovery. See
* {@link SupportsNamespaces#listNamespaces(Namespace)}.
*
* @param namespace The namespace(s) to list namespaces in.
* @return A list of all namespaces in the given namespace.
*/
public List<Namespace> listNamespaces(@NotNull final String... namespace) {
return listNamespaces(Namespace.of(namespace));
}

/**
* List all {@link Namespace namespaces} in the catalog as a Deephaven {@link Table table}. The resulting table will
* be static and contain the same information as {@link #listNamespaces()}.
Expand Down Expand Up @@ -215,6 +227,16 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

/**
* List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listNamespaces(Namespace)}.
*
* @return A {@link Table table} of all namespaces.
*/
public Table listNamespacesAsTable(@NotNull final String... namespace) {
return listNamespacesAsTable(Namespace.of(namespace));
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace.
*
Expand All @@ -225,6 +247,16 @@ public List<TableIdentifier> listTables(@NotNull final Namespace namespace) {
return catalog.listTables(namespace);
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace.
*
* @param namespace The namespace to list tables in.
* @return A list of all tables in the given namespace.
*/
public List<TableIdentifier> listTables(@NotNull final String... namespace) {
return listTables(Namespace.of(namespace));
}

/**
* List all Iceberg {@link TableIdentifier tables} in a given namespace as a Deephaven {@link Table table}. The
* resulting table will be static and contain the same information as {@link #listTables(Namespace)}.
Expand Down Expand Up @@ -264,6 +296,10 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) {
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

public Table listTablesAsTable(@NotNull final String... namespace) {
return listTablesAsTable(Namespace.of(namespace));
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table.
*
Expand All @@ -276,6 +312,16 @@ public List<Snapshot> listSnapshots(@NotNull final TableIdentifier tableIdentifi
return snapshots;
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table.
*
* @param tableIdentifier The identifier of the table from which to gather snapshots.
* @return A list of all snapshots of the given table.
*/
public List<Snapshot> listSnapshots(@NotNull final String tableIdentifier) {
return listSnapshots(TableIdentifier.parse(tableIdentifier));
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}.
Expand Down Expand Up @@ -324,6 +370,17 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier
return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap);
}

/**
* List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting
* table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}.
*
* @param tableIdentifier The identifier of the table from which to gather snapshots.
* @return A list of all tables in the given namespace.
*/
public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) {
return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier));
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -338,6 +395,20 @@ public Table readTable(
return readTableInternal(tableIdentifier, null, instructions);
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param instructions The instructions for customizations while reading
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
@NotNull final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -361,6 +432,22 @@ public Table readTable(
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param tableSnapshotId The snapshot id to load
* @param instructions The instructions for customizations while reading
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
final long tableSnapshotId,
@NotNull final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand Down
Loading

0 comments on commit 1b39b99

Please sign in to comment.