Skip to content

Commit

Permalink
feat: Add SourcePartitionedTable support to PythonTableDataService (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy authored Jan 8, 2025
1 parent 92c1301 commit 1e4bb0f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.BasicDataIndex;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.PartitionedTable;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.PartitionAwareSourceTable;
import io.deephaven.engine.table.impl.SourcePartitionedTable;
import io.deephaven.engine.table.impl.TableUpdateMode;
import io.deephaven.engine.table.impl.chunkboxer.ChunkBoxer;
import io.deephaven.engine.table.impl.locations.*;
Expand Down Expand Up @@ -45,6 +47,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;

import static io.deephaven.extensions.barrage.util.ArrowToTableConverter.parseArrowIpcMessage;
Expand Down Expand Up @@ -112,6 +115,26 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live)
live ? ExecutionContext.getContext().getUpdateGraph() : null);
}

/**
* Get a Deephaven {@link PartitionedTable} for the supplied {@link TableKey}.
*
* @param tableKey The table key
* @param live Whether the result should update as new data becomes available
* @return The {@link PartitionedTable}
*/
@ScriptApi
public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) {
final TableLocationProviderImpl tableLocationProvider =
(TableLocationProviderImpl) getTableLocationProvider(tableKey);
return new SourcePartitionedTable(
tableLocationProvider.tableDefinition,
UnaryOperator.identity(),
tableLocationProvider,
live,
live,
tlk -> true);
}

/**
* This Backend impl marries the Python TableDataService with the Deephaven TableDataService. By performing the
* object translation here, we can keep the Python TableDataService implementation simple and focused on the Python
Expand Down
21 changes: 20 additions & 1 deletion py/server/deephaven/experimental/table_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from deephaven.dherror import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.table import Table
from deephaven.table import Table, PartitionedTable

_JPythonTableDataService = jpy.get_type("io.deephaven.extensions.barrage.util.PythonTableDataService")
_JTableKeyImpl = jpy.get_type("io.deephaven.extensions.barrage.util.PythonTableDataService$TableKeyImpl")
Expand Down Expand Up @@ -274,6 +274,25 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
except Exception as e:
raise DHError(e, message=f"failed to make a table for the key {table_key}") from e

def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> PartitionedTable:
""" Creates a PartitionedTable backed by the backend service with the given table key.
Args:
table_key (TableKey): the table key
refreshing (bool): whether the partitioned table is live or static
Returns:
PartitionedTable: a new partitioned table
Raises:
DHError
"""
j_table_key = _JTableKeyImpl(table_key)
try:
return PartitionedTable(self._j_tbl_service.makePartitionedTable(j_table_key, refreshing))
except Exception as e:
raise DHError(e, message=f"failed to make a partitioned table for the key {table_key}") from e

def _table_schema(self, table_key: TableKey, schema_cb: jpy.JType, failure_cb: jpy.JType) -> None:
""" Provides the table data schema and the partitioning values schema for the table with the given table key as
two serialized byte buffers to the PythonTableDataService (Java) via callbacks. Only called by the
Expand Down
37 changes: 37 additions & 0 deletions py/server/tests/test_table_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ def test_make_table_without_partition_schema(self):
self.assertIsNotNone(table)
self.assertEqual(table.columns, self.test_table.columns)

def test_make_partitioned_table_without_partition_schema(self):
backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema)
data_service = TableDataService(backend)
partitioned_table = data_service.make_partitioned_table(TableKeyImpl("test"), refreshing=False)
self.assertIsNotNone(partitioned_table)
for constituent_table in partitioned_table.constituent_tables:
self.assertEqual(constituent_table.columns, self.test_table.columns)

def test_make_static_table_with_partition_schema(self):
pc_schema = pa.schema(
[pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())])
Expand All @@ -244,6 +252,19 @@ def test_make_static_table_with_partition_schema(self):
self.assertEqual(backend.existing_partitions_called, 1)
self.assertEqual(backend.partition_size_called, 1)

def test_make_static_partitioned_table_with_partition_schema(self):
pc_schema = pa.schema(
[pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())])
backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema, pc_schema=pc_schema)
data_service = TableDataService(backend)
partitioned_table = data_service.make_partitioned_table(TableKeyImpl("test"), refreshing=False)
self.assertIsNotNone(partitioned_table)
merged = partitioned_table.merge()
self.assertEqual(merged.columns, self.test_table.columns)
self.assertEqual(merged.size, 2)
self.assertEqual(backend.existing_partitions_called, 1)
self.assertEqual(backend.partition_size_called, 1)

def test_make_live_table_with_partition_schema(self):
pc_schema = pa.schema(
[pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())])
Expand All @@ -261,6 +282,22 @@ def test_make_live_table_with_partition_schema(self):
self.assertEqual(backend.existing_partitions_called, 0)
self.assertEqual(backend.partition_size_called, 0)

def test_make_live_partitioned_table_with_partition_schema(self):
pc_schema = pa.schema(
[pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())])
backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema, pc_schema=pc_schema)
data_service = TableDataService(backend)
partitioned_table = data_service.make_partitioned_table(TableKeyImpl("test"), refreshing=True)
self.assertIsNotNone(partitioned_table)
merged = partitioned_table.merge()
self.assertEqual(merged.columns, self.test_table.columns)

self.wait_ticking_table_update(merged, 20, 5)

self.assertGreaterEqual(merged.size, 20)
self.assertEqual(backend.existing_partitions_called, 0)
self.assertEqual(backend.partition_size_called, 0)

def test_make_live_table_with_partition_schema_ops(self):
pc_schema = pa.schema(
[pa.field(name="Ticker", type=pa.string()), pa.field(name="Exchange", type=pa.string())])
Expand Down

0 comments on commit 1e4bb0f

Please sign in to comment.