Skip to content

Commit

Permalink
Use chunks in DynamicWhereFilter, fix a chunk leak, and cleanup the c…
Browse files Browse the repository at this point in the history
…ode generally (#4826)

* Use chunks in DynamicWhereFilter for all column reading

* Combine the single-column and multi-column linear filter paths
  • Loading branch information
rcaudy authored Nov 15, 2023
1 parent e60cb54 commit f949bb0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
package io.deephaven.engine.table.impl.select;

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.indexer.RowSetIndexer;
import io.deephaven.engine.table.iterators.ChunkedColumnIterator;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.table.impl.*;
Expand All @@ -19,16 +22,14 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.engine.table.impl.TupleSourceFactory;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;

import java.util.*;

/**
* A where filter that extracts a set of inclusion or exclusion keys from a set table.
*
* <p>
* Each time the set table ticks, the entire where filter is recalculated.
*/
public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency {
Expand Down Expand Up @@ -70,46 +71,83 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi
if (setRefreshing) {
this.setTable = setTable;
setTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
setTable.getRowSet().forAllRowKeys((final long v) -> addKey(makeKey(v)));
if (setTable.getRowSet().isNonempty()) {
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
setTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) {
initialKeysIterator.forEachRemaining(this::addKey);
}
}

final String[] columnNames = Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new);
final ModifiedColumnSet modTokenSet = setTable.newModifiedColumnSet(columnNames);
final String[] setColumnNames =
Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new);
final ModifiedColumnSet setColumnsMCS = setTable.newModifiedColumnSet(setColumnNames);
setUpdateListener = new InstrumentedTableUpdateListenerAdapter(
"DynamicWhereFilter(" + Arrays.toString(setColumnsNames) + ")", setTable, false) {

@Override
public void onUpdate(final TableUpdate upstream) {
if (upstream.added().isEmpty() && upstream.removed().isEmpty()
&& !upstream.modifiedColumnSet().containsAny(modTokenSet)) {
final boolean hasAdds = upstream.added().isNonempty();
final boolean hasRemoves = upstream.removed().isNonempty();
final boolean hasModifies = upstream.modified().isNonempty()
&& upstream.modifiedColumnSet().containsAny(setColumnsMCS);
if (!hasAdds && !hasRemoves && !hasModifies) {
return;
}

final MutableBoolean trueModification = new MutableBoolean(false);
// Remove removed keys
if (hasRemoves) {
try (final CloseableIterator<?> removedKeysIterator = ChunkedColumnIterator.make(
setTupleSource.getPrevSource(), upstream.removed(), getChunkSize(upstream.removed()))) {
removedKeysIterator.forEachRemaining(DynamicWhereFilter.this::removeKey);
}
}

upstream.added().forAllRowKeys((final long v) -> addKey(makeKey(v)));
upstream.removed().forAllRowKeys((final long v) -> removeKey(makePrevKey(v)));
// Update modified keys
boolean trueModification = false;
if (hasModifies) {
// @formatter:off
try (final CloseableIterator<?> preModifiedKeysIterator = ChunkedColumnIterator.make(
setTupleSource.getPrevSource(), upstream.getModifiedPreShift(),
getChunkSize(upstream.getModifiedPreShift()));
final CloseableIterator<?> postModifiedKeysIterator = ChunkedColumnIterator.make(
setTupleSource, upstream.modified(),
getChunkSize(upstream.modified()))) {
// @formatter:on
while (preModifiedKeysIterator.hasNext()) {
Assert.assertion(postModifiedKeysIterator.hasNext(),
"Pre and post modified row sets must be the same size; post is exhausted, but pre is not");
final Object oldKey = preModifiedKeysIterator.next();
final Object newKey = postModifiedKeysIterator.next();
if (!Objects.equals(oldKey, newKey)) {
trueModification = true;
removeKey(oldKey);
addKey(newKey);
}
}
Assert.assertion(!postModifiedKeysIterator.hasNext(),
"Pre and post modified row sets must be the same size; pre is exhausted, but post is not");
}
}

upstream.forAllModified((preIndex, postIndex) -> {
final Object oldKey = makePrevKey(preIndex);
final Object newKey = makeKey(postIndex);
if (!Objects.equals(oldKey, newKey)) {
trueModification.setTrue();
removeKey(oldKey);
addKey(newKey);
// Add added keys
if (hasAdds) {
try (final CloseableIterator<?> addedKeysIterator = ChunkedColumnIterator.make(
setTupleSource, upstream.added(), getChunkSize(upstream.added()))) {
addedKeysIterator.forEachRemaining(DynamicWhereFilter.this::addKey);
}
});
}

// Pretend every row of the original table was modified, this is essential so that the where clause
// can be re-evaluated based on the updated live set.
if (listener != null) {
if (upstream.added().isNonempty() || trueModification.booleanValue()) {
if (hasAdds || trueModification) {
if (inclusion) {
listener.requestRecomputeUnmatched();
} else {
listener.requestRecomputeMatched();
}
}
if (upstream.removed().isNonempty() || trueModification.booleanValue()) {
if (hasRemoves || trueModification) {
if (inclusion) {
listener.requestRecomputeMatched();
} else {
Expand All @@ -132,8 +170,13 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
} else {
this.setTable = null;
setTupleSource = null;
final TupleSource<?> temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
setTable.getRowSet().forAllRowKeys((final long v) -> addKeyUnchecked(makeKey(temporaryTupleSource, v)));
if (setTable.getRowSet().isNonempty()) {
final TupleSource<?> temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
temporaryTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) {
initialKeysIterator.forEachRemaining(this::addKeyUnchecked);
}
}
kernelValid = liveValuesArrayValid = false;
setInclusionKernel = null;
setUpdateListener = null;
Expand All @@ -145,18 +188,6 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

private Object makeKey(long index) {
return makeKey(setTupleSource, index);
}

private static Object makeKey(TupleSource<?> tupleSource, long index) {
return tupleSource.createTuple(index);
}

private Object makePrevKey(long index) {
return setTupleSource.createPreviousTuple(index);
}

private void removeKey(Object key) {
final boolean removed = liveValues.remove(key);
if (!removed) {
Expand Down Expand Up @@ -225,7 +256,7 @@ public WritableRowSet filter(
if (selection.size() > (selectionIndexer.getGrouping(tupleSource).size() * 2L)) {
return filterGrouping(trackingSelection, selectionIndexer, tupleSource);
} else {
return filterLinear(selection, keyColumns, tupleSource);
return filterLinear(selection, tupleSource);
}
}
final boolean allGrouping = Arrays.stream(keyColumns).allMatch(selectionIndexer::hasGrouping);
Expand All @@ -241,80 +272,61 @@ public WritableRowSet filter(
return filterGrouping(trackingSelection, selectionIndexer, tupleSource);
}
}
return filterLinear(selection, keyColumns, tupleSource);
return filterLinear(selection, tupleSource);
}

private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer,
private WritableRowSet filterGrouping(
TrackingRowSet selection,
RowSetIndexer selectionIndexer,
TupleSource<?> tupleSource) {
final RowSet matchingKeys = selectionIndexer.getSubSetForKeySet(liveValues, tupleSource);
return (inclusion ? matchingKeys.copy() : selection.minus(matchingKeys));
}

private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer, Table table) {
final ColumnSource<?>[] keyColumns = Arrays.stream(matchPairs)
.map(mp -> table.getColumnSource(mp.leftColumn())).toArray(ColumnSource[]::new);
final TupleSource<?> tupleSource = TupleSourceFactory.makeTupleSource(keyColumns);
return filterGrouping(selection, selectionIndexer, tupleSource);
}

private WritableRowSet filterLinear(RowSet selection, ColumnSource<?>[] keyColumns, TupleSource<?> tupleSource) {
if (keyColumns.length == 1) {
return filterLinearOne(selection, keyColumns[0]);
} else {
return filterLinearTuple(selection, tupleSource);
}
}

private WritableRowSet filterLinearOne(RowSet selection, ColumnSource<?> keyColumn) {
private WritableRowSet filterLinear(RowSet selection, TupleSource<?> tupleSource) {
if (selection.isEmpty()) {
return RowSetFactory.empty();
}

if (!kernelValid) {
setInclusionKernel = SetInclusionKernel.makeKernel(keyColumn.getChunkType(), liveValues, inclusion);
setInclusionKernel = SetInclusionKernel.makeKernel(tupleSource.getChunkType(), liveValues, inclusion);
kernelValid = true;
}

final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();

try (final ColumnSource.GetContext getContext = keyColumn.makeGetContext(CHUNK_SIZE);
final RowSequence.Iterator rsIt = selection.getRowSequenceIterator()) {
final WritableLongChunk<OrderedRowKeys> keyIndices = WritableLongChunk.makeWritableChunk(CHUNK_SIZE);
final WritableBooleanChunk<Values> matches = WritableBooleanChunk.makeWritableChunk(CHUNK_SIZE);
final int maxChunkSize = getChunkSize(selection);
// @formatter:off
try (final ColumnSource.GetContext keyGetContext = tupleSource.makeGetContext(maxChunkSize);
final RowSequence.Iterator selectionIterator = selection.getRowSequenceIterator();
final WritableLongChunk<OrderedRowKeys> selectionRowKeyChunk =
WritableLongChunk.makeWritableChunk(maxChunkSize);
final WritableBooleanChunk<Values> matches = WritableBooleanChunk.makeWritableChunk(maxChunkSize)) {
// @formatter:on

while (rsIt.hasMore()) {
final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(CHUNK_SIZE);
while (selectionIterator.hasMore()) {
final RowSequence selectionChunk = selectionIterator.getNextRowSequenceWithLength(maxChunkSize);

final Chunk<Values> chunk = Chunk.downcast(keyColumn.getChunk(getContext, chunkOk));
setInclusionKernel.matchValues(chunk, matches);
final Chunk<Values> keyChunk = Chunk.downcast(tupleSource.getChunk(keyGetContext, selectionChunk));
final int thisChunkSize = keyChunk.size();
setInclusionKernel.matchValues(keyChunk, matches);

keyIndices.setSize(chunk.size());
chunkOk.fillRowKeyChunk(keyIndices);
selectionRowKeyChunk.setSize(thisChunkSize);
selectionChunk.fillRowKeyChunk(selectionRowKeyChunk);

for (int ii = 0; ii < chunk.size(); ++ii) {
for (int ii = 0; ii < thisChunkSize; ++ii) {
if (matches.get(ii)) {
indexBuilder.appendKey(keyIndices.get(ii));
indexBuilder.appendKey(selectionRowKeyChunk.get(ii));
}
}
}
}


return indexBuilder.build();
}

private WritableRowSet filterLinearTuple(RowSet selection, TupleSource<?> tupleSource) {
final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();

for (final RowSet.Iterator it = selection.iterator(); it.hasNext();) {
final long row = it.nextLong();
final Object tuple = tupleSource.createTuple(row);
if (liveValues.contains(tuple) == inclusion) {
indexBuilder.appendKey(row);
}
}

return indexBuilder.build();
private static int getChunkSize(@NotNull final RowSet selection) {
return (int) Math.min(selection.size(), CHUNK_SIZE);
}

@Override
Expand Down Expand Up @@ -352,9 +364,4 @@ public LogOutput append(LogOutput logOutput) {
return logOutput.append("DynamicWhereFilter(").append(MatchPair.MATCH_PAIR_ARRAY_FORMATTER, matchPairs)
.append(")");
}

@Override
public String toString() {
return new LogOutputStringImpl().append(this).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;

import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.testutil.testcase.RefreshingTableTestCase.printTableUpdates;
Expand Down Expand Up @@ -417,6 +419,21 @@ public void testWhereDynamicIn() {
asList((String[]) DataAccessHelpers.getColumn(result, "X").getDirect()));
assertEquals(1, resultInverse.size());
assertEquals(asList("E"), asList((String[]) DataAccessHelpers.getColumn(resultInverse, "X").getDirect()));

// Real modification to set table, followed by spurious modification to set table
IntStream.range(0, 2).forEach(ri -> {
updateGraph.runWithinUnitTestCycle(() -> {
addToTable(setTable, i(7), col("X", "C"));
setTable.notifyListeners(i(), i(), i(7));
});
showWithRowSet(result);
assertEquals(4, result.size());
assertEquals(asList("A", "B", "C", "A"),
asList((String[]) DataAccessHelpers.getColumn(result, "X").getDirect()));
assertEquals(2, resultInverse.size());
assertEquals(asList("D", "E"),
asList((String[]) DataAccessHelpers.getColumn(resultInverse, "X").getDirect()));
});
}

@Test
Expand Down

0 comments on commit f949bb0

Please sign in to comment.