diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java index 6d757803b84..feea7caa5ec 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java @@ -7,6 +7,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.chunk.*; import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; @@ -18,9 +19,10 @@ import io.deephaven.engine.table.impl.select.setinclusion.SetInclusionKernel; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.iterators.ChunkedColumnIterator; -import io.deephaven.engine.updategraph.DynamicNode; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.annotations.ReferentialIntegrity; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -32,12 +34,12 @@ * Each time the set table ticks, the entire where filter is recalculated. */ public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency { + private static final int CHUNK_SIZE = 1 << 16; - private final boolean setRefreshing; private final MatchPair[] matchPairs; - private final ChunkSource.WithPrev setKeySource; private final boolean inclusion; + private final ChunkSource.WithPrev setKeySource; private final DataIndexKeySet liveValues; private boolean liveValuesArrayValid = false; @@ -45,14 +47,19 @@ public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implemen private Object[] liveValuesArray = null; private SetInclusionKernel setInclusionKernel = null; - // this reference must be maintained for reachability - @SuppressWarnings({"FieldCanBeLocal", "unused"}) - private final QueryTable setTable; + @SuppressWarnings("FieldCanBeLocal") - // this reference must be maintained for reachability + @ReferentialIntegrity private final InstrumentedTableUpdateListener setUpdateListener; - /** Stores the optimal data index for this filter. */ + /** + * The source table to be filtered. Recorded when initializing data indexes + */ + @ReferentialIntegrity + private Table sourceTable; + /** + * The optimal data index for this filter. + */ @Nullable private DataIndex sourceDataIndex; @@ -60,11 +67,9 @@ public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implemen private QueryTable resultTable; public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, final MatchPair... setColumnsNames) { - setRefreshing = setTable.isRefreshing(); - if (setRefreshing) { + if (setTable.isRefreshing()) { updateGraph.checkInitiateSerialTableOperation(); } - this.matchPairs = setColumnsNames; this.inclusion = inclusion; @@ -73,8 +78,7 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi final ColumnSource[] setColumns = Arrays.stream(matchPairs) .map(mp -> setTable.getColumnSource(mp.rightColumn())).toArray(ColumnSource[]::new); - if (setRefreshing) { - this.setTable = setTable; + if (setTable.isRefreshing()) { setKeySource = DataIndexUtils.makeBoxedKeySource(setColumns); if (setTable.getRowSet().isNonempty()) { try (final CloseableIterator initialKeysIterator = ChunkedColumnIterator.make( @@ -173,9 +177,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { manage(setUpdateListener); } else { - this.setTable = null; setKeySource = null; - if (setTable.getRowSet().isNonempty()) { final ChunkSource.WithPrev tmpKeySource = DataIndexUtils.makeBoxedKeySource(setColumns); try (final CloseableIterator initialKeysIterator = ChunkedColumnIterator.make( @@ -216,13 +218,38 @@ private void addKeyUnchecked(Object key) { liveValues.add(key); } + /** + * Initializes the data index for this filter, if not already initialized. It is an error to call this method with + * more than one distinct {@code sourceTable}. If {@sourceTable} is refreshing, this method must only be invoked + * when it's {@link UpdateGraph#checkInitiateSerialTableOperation() safe} to initialize serial table operations. + * + * @param sourceTable The table that this DynamicWhereFilter will be used to filter + */ + public void initializeDataIndex(@NotNull final Table sourceTable) { + if (this.sourceTable != null) { + if (this.sourceTable == sourceTable) { + return; + } + throw new IllegalStateException(String.format( + "Data indexes already initialized with source table %s, cannot reinitialize with %s", + this.sourceTable, sourceTable)); + } + this.sourceTable = sourceTable; + try (final SafeCloseable ignored = sourceTable.isRefreshing() ? LivenessScopeStack.open() : null) { + sourceDataIndex = optimalIndex(matchPairs, sourceTable); + if (sourceDataIndex != null && sourceDataIndex.isRefreshing()) { + manage(sourceDataIndex); + } + } + } + /** * Returns the optimal data index for the supplied table, or null if no index is available. The ideal index would * contain all key columns but a partial match is also acceptable. */ @Nullable - private DataIndex optimalIndex(final Table inputTable) { - final String[] keyColumnNames = MatchPair.getLeftColumns(matchPairs); + private static DataIndex optimalIndex(final MatchPair[] filterPairs, final Table inputTable) { + final String[] keyColumnNames = MatchPair.getLeftColumns(filterPairs); final DataIndex fullIndex = DataIndexer.getDataIndex(inputTable, keyColumnNames); if (fullIndex != null) { @@ -244,13 +271,8 @@ public List getColumnArrays() { @Override public List getDataIndexes(final Table sourceTable) { - if (sourceDataIndex == null) { - sourceDataIndex = optimalIndex(sourceTable); - } - if (sourceDataIndex == null) { - return Collections.emptyList(); - } - return List.of(sourceDataIndex); + initializeDataIndex(sourceTable); + return sourceDataIndex == null ? List.of() : List.of(sourceDataIndex); } @Override @@ -492,21 +514,21 @@ public boolean isSimpleFilter() { @Override public boolean isRefreshing() { - return setRefreshing; + return setUpdateListener != null; } @Override public void setRecomputeListener(RecomputeListener listener) { this.listener = listener; this.resultTable = listener.getTable(); - if (DynamicNode.isDynamicAndIsRefreshing(setTable)) { + if (isRefreshing()) { listener.setIsRefreshing(true); } } @Override public DynamicWhereFilter copy() { - return new DynamicWhereFilter(setTable, inclusion, matchPairs); + throw new UnsupportedOperationException("DynamicWhereFilter cannot be copied"); } @Override diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java index 350664789a8..4eb7261f312 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestConcurrentInstantiation.java @@ -418,8 +418,13 @@ private void testWhereDynamicInternal(final boolean sourceIndexed, final boolean DataIndexer.getOrCreateDataIndex(whereTable, "z"); } - final DynamicWhereFilter filter = updateGraph.exclusiveLock().computeLocked( - () -> new DynamicWhereFilter(whereTable, true, MatchPairFactory.getExpressions("z"))); + final DynamicWhereFilter filter = updateGraph.sharedLock().computeLocked( + () -> { + final DynamicWhereFilter result = + new DynamicWhereFilter(whereTable, true, MatchPairFactory.getExpressions("z")); + result.initializeDataIndex(table); + return result; + }); updateGraph.startCycleForUnitTests(false);