From f949bb07b3e0a4f3d87fa626a3ed6e2a0627fc24 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Wed, 15 Nov 2023 10:55:09 -0500 Subject: [PATCH] Use chunks in DynamicWhereFilter, fix a chunk leak, and cleanup the code generally (#4826) * Use chunks in DynamicWhereFilter for all column reading * Combine the single-column and multi-column linear filter paths --- .../table/impl/select/DynamicWhereFilter.java | 177 +++++++++--------- .../table/impl/QueryTableWhereTest.java | 17 ++ 2 files changed, 109 insertions(+), 85 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java index 22e5d7ed7cd..29152e038d5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java @@ -4,12 +4,15 @@ package io.deephaven.engine.table.impl.select; import io.deephaven.base.log.LogOutput; +import io.deephaven.base.verify.Assert; import io.deephaven.chunk.attributes.Values; import io.deephaven.datastructures.util.CollectionUtil; +import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.indexer.RowSetIndexer; +import io.deephaven.engine.table.iterators.ChunkedColumnIterator; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.DynamicNode; import io.deephaven.engine.table.impl.*; @@ -19,16 +22,14 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.engine.table.impl.TupleSourceFactory; import io.deephaven.engine.updategraph.UpdateGraph; -import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; -import org.apache.commons.lang3.mutable.MutableBoolean; import org.jetbrains.annotations.NotNull; import java.util.*; /** * A where filter that extracts a set of inclusion or exclusion keys from a set table. - * + *

* Each time the set table ticks, the entire where filter is recalculated. */ public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency { @@ -70,46 +71,83 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi if (setRefreshing) { this.setTable = setTable; setTupleSource = TupleSourceFactory.makeTupleSource(setColumns); - setTable.getRowSet().forAllRowKeys((final long v) -> addKey(makeKey(v))); + if (setTable.getRowSet().isNonempty()) { + try (final CloseableIterator initialKeysIterator = ChunkedColumnIterator.make( + setTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) { + initialKeysIterator.forEachRemaining(this::addKey); + } + } - final String[] columnNames = Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new); - final ModifiedColumnSet modTokenSet = setTable.newModifiedColumnSet(columnNames); + final String[] setColumnNames = + Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new); + final ModifiedColumnSet setColumnsMCS = setTable.newModifiedColumnSet(setColumnNames); setUpdateListener = new InstrumentedTableUpdateListenerAdapter( "DynamicWhereFilter(" + Arrays.toString(setColumnsNames) + ")", setTable, false) { @Override public void onUpdate(final TableUpdate upstream) { - if (upstream.added().isEmpty() && upstream.removed().isEmpty() - && !upstream.modifiedColumnSet().containsAny(modTokenSet)) { + final boolean hasAdds = upstream.added().isNonempty(); + final boolean hasRemoves = upstream.removed().isNonempty(); + final boolean hasModifies = upstream.modified().isNonempty() + && upstream.modifiedColumnSet().containsAny(setColumnsMCS); + if (!hasAdds && !hasRemoves && !hasModifies) { return; } - final MutableBoolean trueModification = new MutableBoolean(false); + // Remove removed keys + if (hasRemoves) { + try (final CloseableIterator removedKeysIterator = ChunkedColumnIterator.make( + setTupleSource.getPrevSource(), upstream.removed(), getChunkSize(upstream.removed()))) { + removedKeysIterator.forEachRemaining(DynamicWhereFilter.this::removeKey); + } + } - upstream.added().forAllRowKeys((final long v) -> addKey(makeKey(v))); - upstream.removed().forAllRowKeys((final long v) -> removeKey(makePrevKey(v))); + // Update modified keys + boolean trueModification = false; + if (hasModifies) { + // @formatter:off + try (final CloseableIterator preModifiedKeysIterator = ChunkedColumnIterator.make( + setTupleSource.getPrevSource(), upstream.getModifiedPreShift(), + getChunkSize(upstream.getModifiedPreShift())); + final CloseableIterator postModifiedKeysIterator = ChunkedColumnIterator.make( + setTupleSource, upstream.modified(), + getChunkSize(upstream.modified()))) { + // @formatter:on + while (preModifiedKeysIterator.hasNext()) { + Assert.assertion(postModifiedKeysIterator.hasNext(), + "Pre and post modified row sets must be the same size; post is exhausted, but pre is not"); + final Object oldKey = preModifiedKeysIterator.next(); + final Object newKey = postModifiedKeysIterator.next(); + if (!Objects.equals(oldKey, newKey)) { + trueModification = true; + removeKey(oldKey); + addKey(newKey); + } + } + Assert.assertion(!postModifiedKeysIterator.hasNext(), + "Pre and post modified row sets must be the same size; pre is exhausted, but post is not"); + } + } - upstream.forAllModified((preIndex, postIndex) -> { - final Object oldKey = makePrevKey(preIndex); - final Object newKey = makeKey(postIndex); - if (!Objects.equals(oldKey, newKey)) { - trueModification.setTrue(); - removeKey(oldKey); - addKey(newKey); + // Add added keys + if (hasAdds) { + try (final CloseableIterator addedKeysIterator = ChunkedColumnIterator.make( + setTupleSource, upstream.added(), getChunkSize(upstream.added()))) { + addedKeysIterator.forEachRemaining(DynamicWhereFilter.this::addKey); } - }); + } // Pretend every row of the original table was modified, this is essential so that the where clause // can be re-evaluated based on the updated live set. if (listener != null) { - if (upstream.added().isNonempty() || trueModification.booleanValue()) { + if (hasAdds || trueModification) { if (inclusion) { listener.requestRecomputeUnmatched(); } else { listener.requestRecomputeMatched(); } } - if (upstream.removed().isNonempty() || trueModification.booleanValue()) { + if (hasRemoves || trueModification) { if (inclusion) { listener.requestRecomputeMatched(); } else { @@ -132,8 +170,13 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { } else { this.setTable = null; setTupleSource = null; - final TupleSource temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns); - setTable.getRowSet().forAllRowKeys((final long v) -> addKeyUnchecked(makeKey(temporaryTupleSource, v))); + if (setTable.getRowSet().isNonempty()) { + final TupleSource temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns); + try (final CloseableIterator initialKeysIterator = ChunkedColumnIterator.make( + temporaryTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) { + initialKeysIterator.forEachRemaining(this::addKeyUnchecked); + } + } kernelValid = liveValuesArrayValid = false; setInclusionKernel = null; setUpdateListener = null; @@ -145,18 +188,6 @@ public UpdateGraph getUpdateGraph() { return updateGraph; } - private Object makeKey(long index) { - return makeKey(setTupleSource, index); - } - - private static Object makeKey(TupleSource tupleSource, long index) { - return tupleSource.createTuple(index); - } - - private Object makePrevKey(long index) { - return setTupleSource.createPreviousTuple(index); - } - private void removeKey(Object key) { final boolean removed = liveValues.remove(key); if (!removed) { @@ -225,7 +256,7 @@ public WritableRowSet filter( if (selection.size() > (selectionIndexer.getGrouping(tupleSource).size() * 2L)) { return filterGrouping(trackingSelection, selectionIndexer, tupleSource); } else { - return filterLinear(selection, keyColumns, tupleSource); + return filterLinear(selection, tupleSource); } } final boolean allGrouping = Arrays.stream(keyColumns).allMatch(selectionIndexer::hasGrouping); @@ -241,80 +272,61 @@ public WritableRowSet filter( return filterGrouping(trackingSelection, selectionIndexer, tupleSource); } } - return filterLinear(selection, keyColumns, tupleSource); + return filterLinear(selection, tupleSource); } - private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer, + private WritableRowSet filterGrouping( + TrackingRowSet selection, + RowSetIndexer selectionIndexer, TupleSource tupleSource) { final RowSet matchingKeys = selectionIndexer.getSubSetForKeySet(liveValues, tupleSource); return (inclusion ? matchingKeys.copy() : selection.minus(matchingKeys)); } - private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer, Table table) { - final ColumnSource[] keyColumns = Arrays.stream(matchPairs) - .map(mp -> table.getColumnSource(mp.leftColumn())).toArray(ColumnSource[]::new); - final TupleSource tupleSource = TupleSourceFactory.makeTupleSource(keyColumns); - return filterGrouping(selection, selectionIndexer, tupleSource); - } - - private WritableRowSet filterLinear(RowSet selection, ColumnSource[] keyColumns, TupleSource tupleSource) { - if (keyColumns.length == 1) { - return filterLinearOne(selection, keyColumns[0]); - } else { - return filterLinearTuple(selection, tupleSource); - } - } - - private WritableRowSet filterLinearOne(RowSet selection, ColumnSource keyColumn) { + private WritableRowSet filterLinear(RowSet selection, TupleSource tupleSource) { if (selection.isEmpty()) { return RowSetFactory.empty(); } if (!kernelValid) { - setInclusionKernel = SetInclusionKernel.makeKernel(keyColumn.getChunkType(), liveValues, inclusion); + setInclusionKernel = SetInclusionKernel.makeKernel(tupleSource.getChunkType(), liveValues, inclusion); kernelValid = true; } final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential(); - try (final ColumnSource.GetContext getContext = keyColumn.makeGetContext(CHUNK_SIZE); - final RowSequence.Iterator rsIt = selection.getRowSequenceIterator()) { - final WritableLongChunk keyIndices = WritableLongChunk.makeWritableChunk(CHUNK_SIZE); - final WritableBooleanChunk matches = WritableBooleanChunk.makeWritableChunk(CHUNK_SIZE); + final int maxChunkSize = getChunkSize(selection); + // @formatter:off + try (final ColumnSource.GetContext keyGetContext = tupleSource.makeGetContext(maxChunkSize); + final RowSequence.Iterator selectionIterator = selection.getRowSequenceIterator(); + final WritableLongChunk selectionRowKeyChunk = + WritableLongChunk.makeWritableChunk(maxChunkSize); + final WritableBooleanChunk matches = WritableBooleanChunk.makeWritableChunk(maxChunkSize)) { + // @formatter:on - while (rsIt.hasMore()) { - final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(CHUNK_SIZE); + while (selectionIterator.hasMore()) { + final RowSequence selectionChunk = selectionIterator.getNextRowSequenceWithLength(maxChunkSize); - final Chunk chunk = Chunk.downcast(keyColumn.getChunk(getContext, chunkOk)); - setInclusionKernel.matchValues(chunk, matches); + final Chunk keyChunk = Chunk.downcast(tupleSource.getChunk(keyGetContext, selectionChunk)); + final int thisChunkSize = keyChunk.size(); + setInclusionKernel.matchValues(keyChunk, matches); - keyIndices.setSize(chunk.size()); - chunkOk.fillRowKeyChunk(keyIndices); + selectionRowKeyChunk.setSize(thisChunkSize); + selectionChunk.fillRowKeyChunk(selectionRowKeyChunk); - for (int ii = 0; ii < chunk.size(); ++ii) { + for (int ii = 0; ii < thisChunkSize; ++ii) { if (matches.get(ii)) { - indexBuilder.appendKey(keyIndices.get(ii)); + indexBuilder.appendKey(selectionRowKeyChunk.get(ii)); } } } } - return indexBuilder.build(); } - private WritableRowSet filterLinearTuple(RowSet selection, TupleSource tupleSource) { - final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential(); - - for (final RowSet.Iterator it = selection.iterator(); it.hasNext();) { - final long row = it.nextLong(); - final Object tuple = tupleSource.createTuple(row); - if (liveValues.contains(tuple) == inclusion) { - indexBuilder.appendKey(row); - } - } - - return indexBuilder.build(); + private static int getChunkSize(@NotNull final RowSet selection) { + return (int) Math.min(selection.size(), CHUNK_SIZE); } @Override @@ -352,9 +364,4 @@ public LogOutput append(LogOutput logOutput) { return logOutput.append("DynamicWhereFilter(").append(MatchPair.MATCH_PAIR_ARRAY_FORMATTER, matchPairs) .append(")"); } - - @Override - public String toString() { - return new LogOutputStringImpl().append(this).toString(); - } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 4b2058bbeb3..5d6621d0eb0 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -47,7 +47,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntSupplier; import java.util.function.IntUnaryOperator; +import java.util.stream.IntStream; import static io.deephaven.engine.testutil.TstUtils.*; import static io.deephaven.engine.testutil.testcase.RefreshingTableTestCase.printTableUpdates; @@ -417,6 +419,21 @@ public void testWhereDynamicIn() { asList((String[]) DataAccessHelpers.getColumn(result, "X").getDirect())); assertEquals(1, resultInverse.size()); assertEquals(asList("E"), asList((String[]) DataAccessHelpers.getColumn(resultInverse, "X").getDirect())); + + // Real modification to set table, followed by spurious modification to set table + IntStream.range(0, 2).forEach(ri -> { + updateGraph.runWithinUnitTestCycle(() -> { + addToTable(setTable, i(7), col("X", "C")); + setTable.notifyListeners(i(), i(), i(7)); + }); + showWithRowSet(result); + assertEquals(4, result.size()); + assertEquals(asList("A", "B", "C", "A"), + asList((String[]) DataAccessHelpers.getColumn(result, "X").getDirect())); + assertEquals(2, resultInverse.size()); + assertEquals(asList("D", "E"), + asList((String[]) DataAccessHelpers.getColumn(resultInverse, "X").getDirect())); + }); } @Test