From 1e4bb0fcc50136154e7e996634d2f97ffa9293b9 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Tue, 7 Jan 2025 20:02:52 -0500 Subject: [PATCH] feat: Add SourcePartitionedTable support to PythonTableDataService (#6527) --- .../barrage/util/PythonTableDataService.java | 23 ++++++++++++ .../experimental/table_data_service.py | 21 ++++++++++- py/server/tests/test_table_data_service.py | 37 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java index 477dd05cc4e..7586ee831fc 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java @@ -14,9 +14,11 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.BasicDataIndex; import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.PartitionedTable; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.PartitionAwareSourceTable; +import io.deephaven.engine.table.impl.SourcePartitionedTable; import io.deephaven.engine.table.impl.TableUpdateMode; import io.deephaven.engine.table.impl.chunkboxer.ChunkBoxer; import io.deephaven.engine.table.impl.locations.*; @@ -45,6 +47,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; +import java.util.function.UnaryOperator; import java.util.stream.IntStream; import static io.deephaven.extensions.barrage.util.ArrowToTableConverter.parseArrowIpcMessage; @@ -112,6 +115,26 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live) live ? ExecutionContext.getContext().getUpdateGraph() : null); } + /** + * Get a Deephaven {@link PartitionedTable} for the supplied {@link TableKey}. + * + * @param tableKey The table key + * @param live Whether the result should update as new data becomes available + * @return The {@link PartitionedTable} + */ + @ScriptApi + public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) { + final TableLocationProviderImpl tableLocationProvider = + (TableLocationProviderImpl) getTableLocationProvider(tableKey); + return new SourcePartitionedTable( + tableLocationProvider.tableDefinition, + UnaryOperator.identity(), + tableLocationProvider, + live, + live, + tlk -> true); + } + /** * This Backend impl marries the Python TableDataService with the Deephaven TableDataService. By performing the * object translation here, we can keep the Python TableDataService implementation simple and focused on the Python diff --git a/py/server/deephaven/experimental/table_data_service.py b/py/server/deephaven/experimental/table_data_service.py index 1a2cc6a5295..fdf6d677294 100644 --- a/py/server/deephaven/experimental/table_data_service.py +++ b/py/server/deephaven/experimental/table_data_service.py @@ -15,7 +15,7 @@ from deephaven.dherror import DHError from deephaven._wrapper import JObjectWrapper -from deephaven.table import Table +from deephaven.table import Table, PartitionedTable _JPythonTableDataService = jpy.get_type("io.deephaven.extensions.barrage.util.PythonTableDataService") _JTableKeyImpl = jpy.get_type("io.deephaven.extensions.barrage.util.PythonTableDataService$TableKeyImpl") @@ -274,6 +274,25 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table: except Exception as e: raise DHError(e, message=f"failed to make a table for the key {table_key}") from e + def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> PartitionedTable: + """ Creates a PartitionedTable backed by the backend service with the given table key. + + Args: + table_key (TableKey): the table key + refreshing (bool): whether the partitioned table is live or static + + Returns: + PartitionedTable: a new partitioned table + + Raises: + DHError + """ + j_table_key = _JTableKeyImpl(table_key) + try: + return PartitionedTable(self._j_tbl_service.makePartitionedTable(j_table_key, refreshing)) + except Exception as e: + raise DHError(e, message=f"failed to make a partitioned table for the key {table_key}") from e + def _table_schema(self, table_key: TableKey, schema_cb: jpy.JType, failure_cb: jpy.JType) -> None: """ Provides the table data schema and the partitioning values schema for the table with the given table key as two serialized byte buffers to the PythonTableDataService (Java) via callbacks. Only called by the diff --git a/py/server/tests/test_table_data_service.py b/py/server/tests/test_table_data_service.py index 6c524edf263..28843d0a332 100644 --- a/py/server/tests/test_table_data_service.py +++ b/py/server/tests/test_table_data_service.py @@ -230,6 +230,14 @@ def test_make_table_without_partition_schema(self): self.assertIsNotNone(table) self.assertEqual(table.columns, self.test_table.columns) + def test_make_partitioned_table_without_partition_schema(self): + backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema) + data_service = TableDataService(backend) + partitioned_table = data_service.make_partitioned_table(TableKeyImpl("test"), refreshing=False) + self.assertIsNotNone(partitioned_table) + for constituent_table in partitioned_table.constituent_tables: + self.assertEqual(constituent_table.columns, self.test_table.columns) + def test_make_static_table_with_partition_schema(self): pc_schema = pa.schema( [pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())]) @@ -244,6 +252,19 @@ def test_make_static_table_with_partition_schema(self): self.assertEqual(backend.existing_partitions_called, 1) self.assertEqual(backend.partition_size_called, 1) + def test_make_static_partitioned_table_with_partition_schema(self): + pc_schema = pa.schema( + [pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())]) + backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema, pc_schema=pc_schema) + data_service = TableDataService(backend) + partitioned_table = data_service.make_partitioned_table(TableKeyImpl("test"), refreshing=False) + self.assertIsNotNone(partitioned_table) + merged = partitioned_table.merge() + self.assertEqual(merged.columns, self.test_table.columns) + self.assertEqual(merged.size, 2) + self.assertEqual(backend.existing_partitions_called, 1) + self.assertEqual(backend.partition_size_called, 1) + def test_make_live_table_with_partition_schema(self): pc_schema = pa.schema( [pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())]) @@ -261,6 +282,22 @@ def test_make_live_table_with_partition_schema(self): self.assertEqual(backend.existing_partitions_called, 0) self.assertEqual(backend.partition_size_called, 0) + def test_make_live_partitioned_table_with_partition_schema(self): + pc_schema = pa.schema( + [pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())]) + backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema, pc_schema=pc_schema) + data_service = TableDataService(backend) + partitioned_table = data_service.make_partitioned_table(TableKeyImpl("test"), refreshing=True) + self.assertIsNotNone(partitioned_table) + merged = partitioned_table.merge() + self.assertEqual(merged.columns, self.test_table.columns) + + self.wait_ticking_table_update(merged, 20, 5) + + self.assertGreaterEqual(merged.size, 20) + self.assertEqual(backend.existing_partitions_called, 0) + self.assertEqual(backend.partition_size_called, 0) + def test_make_live_table_with_partition_schema_ops(self): pc_schema = pa.schema( [pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())])