diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 7ef10f4e31a..c39dc289f1b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -54,6 +54,7 @@ public class SourcePartitionedTable extends PartitionedTableImpl { * @param refreshSizes Whether the locations found should be refreshed * @param locationKeyMatcher Function to filter desired location keys */ + @Deprecated(forRemoval = true) public SourcePartitionedTable( @NotNull final TableDefinition constituentDefinition, @NotNull final UnaryOperator applyTablePermissions, @@ -61,13 +62,44 @@ public SourcePartitionedTable( final boolean refreshLocations, final boolean refreshSizes, @NotNull final Predicate locationKeyMatcher) { + this(constituentDefinition, applyTablePermissions, tableLocationProvider, refreshLocations, refreshSizes, + locationKeyMatcher, true); + } + + /** + * Construct a {@link SourcePartitionedTable} from the supplied parameters. + *

+ * Note that refreshLocations and refreshSizes are distinct because there are use cases that supply an external + * RowSet and hence don't require size refreshes. Others might care for size refreshes, but only the + * initially-available set of locations. + * + * @param constituentDefinition The {@link TableDefinition} expected of constituent {@link Table tables} + * @param applyTablePermissions Function to apply in order to correctly restrict the visible result rows + * @param tableLocationProvider Source for table locations + * @param refreshLocations Whether the set of locations should be refreshed + * @param refreshSizes Whether the locations found should be refreshed + * @param locationKeyMatcher Function to filter desired location keys + * @param preCheckExistence Whether to pre-check the existence (non-null, non-zero size) of locations before + * including them in the result SourcePartitionedTable as constituents. It is recommended to set this to + * {@code false} if you will do subsequent filtering on the result, or if you are confident that all + * locations are valid. + */ + public SourcePartitionedTable( + @NotNull final TableDefinition constituentDefinition, + @NotNull final UnaryOperator

applyTablePermissions, + @NotNull final TableLocationProvider tableLocationProvider, + final boolean refreshLocations, + final boolean refreshSizes, + @NotNull final Predicate locationKeyMatcher, + final boolean preCheckExistence) { super(new UnderlyingTableMaintainer( constituentDefinition, applyTablePermissions, tableLocationProvider, refreshLocations, refreshSizes, - locationKeyMatcher).result(), + locationKeyMatcher, + preCheckExistence).result(), Set.of(KEY_COLUMN_NAME), true, CONSTITUENT_COLUMN_NAME, @@ -86,6 +118,8 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv private final Predicate locationKeyMatcher; private final TrackingWritableRowSet resultRows; + private final String[] partitioningColumnNames; + private final WritableColumnSource[] resultPartitionValues; private final WritableColumnSource resultTableLocationKeys; private final WritableColumnSource
resultLocationTables; private final QueryTable result; @@ -106,7 +140,8 @@ private UnderlyingTableMaintainer( @NotNull final TableLocationProvider tableLocationProvider, final boolean refreshLocations, final boolean refreshSizes, - @NotNull final Predicate locationKeyMatcher) { + @NotNull final Predicate locationKeyMatcher, + final boolean preCheckExistence) { super(false); this.constituentDefinition = constituentDefinition; @@ -116,10 +151,20 @@ private UnderlyingTableMaintainer( this.locationKeyMatcher = locationKeyMatcher; resultRows = RowSetFactory.empty().toTracking(); + final List> partitioningColumns = constituentDefinition.getPartitioningColumns(); + partitioningColumnNames = partitioningColumns.stream() + .map(ColumnDefinition::getName) + .toArray(String[]::new); + resultPartitionValues = partitioningColumns.stream() + .map(cd -> ArrayBackedColumnSource.getMemoryColumnSource(cd.getDataType(), cd.getComponentType())) + .toArray(WritableColumnSource[]::new); resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null); resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null); - final Map> resultSources = new LinkedHashMap<>(2); + final Map> resultSources = new LinkedHashMap<>(partitioningColumns.size() + 2); + for (int pci = 0; pci < partitioningColumns.size(); ++pci) { + resultSources.put(partitioningColumnNames[pci], resultPartitionValues[pci]); + } resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys); resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables); result = new QueryTable(resultRows, resultSources); @@ -135,14 +180,17 @@ private UnderlyingTableMaintainer( } if (needToRefreshLocations) { + Arrays.stream(resultPartitionValues).forEach(ColumnSource::startTrackingPrevValues); resultTableLocationKeys.startTrackingPrevValues(); resultLocationTables.startTrackingPrevValues(); subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider); manage(subscriptionBuffer); - pendingLocationStates = new IntrusiveDoublyLinkedQueue<>( - IntrusiveDoublyLinkedNode.Adapter.getInstance()); + pendingLocationStates = preCheckExistence + ? new IntrusiveDoublyLinkedQueue<>( + IntrusiveDoublyLinkedNode.Adapter.getInstance()) + : null; readyLocationStates = new IntrusiveDoublyLinkedQueue<>( IntrusiveDoublyLinkedNode.Adapter.getInstance()); processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource( @@ -206,12 +254,17 @@ private RowSet sortAndAddLocations(@NotNull final Stream location // Note that makeConstituentTable expects us to subsequently unmanage the TableLocations unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> { final long constituentRowKey = lastInsertedRowKey.incrementAndGet(); - final Table constituentTable = makeConstituentTable(tl); + + for (int pci = 0; pci < resultPartitionValues.length; ++pci) { + addPartitionValue(tl.getKey(), partitioningColumnNames[pci], resultPartitionValues[pci], + constituentRowKey); + } resultTableLocationKeys.ensureCapacity(constituentRowKey + 1); resultTableLocationKeys.set(constituentRowKey, tl.getKey()); resultLocationTables.ensureCapacity(constituentRowKey + 1); + final Table constituentTable = makeConstituentTable(tl); resultLocationTables.set(constituentRowKey, constituentTable); if (result.isRefreshing()) { @@ -223,6 +276,15 @@ private RowSet sortAndAddLocations(@NotNull final Stream location : RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get()); } + private static void addPartitionValue( + @NotNull final TableLocationKey tableLocationKey, + @NotNull final String partitioningColumnName, + @NotNull final WritableColumnSource partitionValueColumn, + final long rowKey) { + partitionValueColumn.ensureCapacity(rowKey + 1); + partitionValueColumn.set(rowKey, tableLocationKey.getPartitionValue(partitioningColumnName)); + } + private Table makeConstituentTable(@NotNull final TableLocation tableLocation) { final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable( constituentDefinition, @@ -280,19 +342,24 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp * population in STM ColumnSources. */ // TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table - locationUpdate.getPendingAddedLocationKeys().stream() + final Stream newPendingLocations = locationUpdate.getPendingAddedLocationKeys() + .stream() .map(LiveSupplier::get) .filter(locationKeyMatcher) .map(tableLocationProvider::getTableLocation) .peek(this::manage) - .map(PendingLocationState::new) - .forEach(pendingLocationStates::offer); - for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { - final PendingLocationState pendingLocationState = iter.next(); - if (pendingLocationState.exists()) { - iter.remove(); - readyLocationStates.offer(pendingLocationState); + .map(PendingLocationState::new); + if (pendingLocationStates != null) { + newPendingLocations.forEach(pendingLocationStates::offer); + for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { + final PendingLocationState pendingLocationState = iter.next(); + if (pendingLocationState.exists()) { + iter.remove(); + readyLocationStates.offer(pendingLocationState); + } } + } else { + newPendingLocations.forEach(readyLocationStates::offer); } if (readyLocationStates.isEmpty()) { @@ -317,22 +384,24 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd } // Iterate through the pending locations and remove any that are in the removed set. - List toUnmanage = null; - for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { - final PendingLocationState pendingLocationState = iter.next(); - if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) { - iter.remove(); - // Release the state and plan to unmanage the location - if (toUnmanage == null) { - toUnmanage = new ArrayList<>(); + if (pendingLocationStates != null) { + List toUnmanage = null; + for (final Iterator iter = pendingLocationStates.iterator(); iter.hasNext();) { + final PendingLocationState pendingLocationState = iter.next(); + if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) { + iter.remove(); + // Release the state and plan to unmanage the location + if (toUnmanage == null) { + toUnmanage = new ArrayList<>(); + } + toUnmanage.add(pendingLocationState.release()); } - toUnmanage.add(pendingLocationState.release()); } - } - if (toUnmanage != null) { - unmanage(toUnmanage.stream()); - // noinspection UnusedAssignment - toUnmanage = null; + if (toUnmanage != null) { + unmanage(toUnmanage.stream()); + // noinspection UnusedAssignment + toUnmanage = null; + } } // At the end of the cycle we need to make sure we unmanage any removed constituents. @@ -367,6 +436,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd this.removedLocationsCommitter.maybeActivate(); final WritableRowSet deletedRows = deleteBuilder.build(); + Arrays.stream(resultPartitionValues).forEach(cs -> cs.setNull(deletedRows)); resultTableLocationKeys.setNull(deletedRows); resultLocationTables.setNull(deletedRows); return deletedRows; diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java index b040d3a7a5f..68906192f8c 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java @@ -104,7 +104,8 @@ private SourcePartitionedTable setUpData() { tlp, true, true, - l -> true); + l -> true, + true); } private void verifyStringColumnContents(Table table, String columnName, String... expectedValues) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java index 7586ee831fc..e0bc8127aec 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java @@ -120,10 +120,14 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live) * * @param tableKey The table key * @param live Whether the result should update as new data becomes available + * @param preCheckExistence Whether to include only locations observed to have non-empty data * @return The {@link PartitionedTable} */ @ScriptApi - public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) { + public PartitionedTable makePartitionedTable( + @NotNull final TableKeyImpl tableKey, + final boolean live, + final boolean preCheckExistence) { final TableLocationProviderImpl tableLocationProvider = (TableLocationProviderImpl) getTableLocationProvider(tableKey); return new SourcePartitionedTable( @@ -132,7 +136,8 @@ public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKe tableLocationProvider, live, live, - tlk -> true); + tlk -> true, + preCheckExistence); } /** diff --git a/py/server/deephaven/experimental/table_data_service.py b/py/server/deephaven/experimental/table_data_service.py index fdf6d677294..9d171b21881 100644 --- a/py/server/deephaven/experimental/table_data_service.py +++ b/py/server/deephaven/experimental/table_data_service.py @@ -260,7 +260,7 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table: Args: table_key (TableKey): the table key - refreshing (bool): whether the table is live or static + refreshing (bool): whether the table is live (True) or static (False) Returns: Table: a new table @@ -274,13 +274,15 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table: except Exception as e: raise DHError(e, message=f"failed to make a table for the key {table_key}") from e - def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> PartitionedTable: + def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool, + pre_check_existence: bool = False) -> PartitionedTable: """ Creates a PartitionedTable backed by the backend service with the given table key. Args: table_key (TableKey): the table key - refreshing (bool): whether the partitioned table is live or static - + refreshing (bool): whether the partitioned table is live (True) or static (False) + pre_check_existence (bool): whether the partitioned table should verify that locations exist and are + non-empty before including them in the table Returns: PartitionedTable: a new partitioned table @@ -289,7 +291,8 @@ def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> Pa """ j_table_key = _JTableKeyImpl(table_key) try: - return PartitionedTable(self._j_tbl_service.makePartitionedTable(j_table_key, refreshing)) + return PartitionedTable( + self._j_tbl_service.makePartitionedTable(j_table_key, refreshing, pre_check_existence)) except Exception as e: raise DHError(e, message=f"failed to make a partitioned table for the key {table_key}") from e