Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunks in DynamicWhereFilter, fix a chunk leak, and cleanup some code generally #4826

Merged
merged 7 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
package io.deephaven.engine.table.impl.select;

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.indexer.RowSetIndexer;
import io.deephaven.engine.table.iterators.ChunkedColumnIterator;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.table.impl.*;
Expand All @@ -19,16 +22,14 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.engine.table.impl.TupleSourceFactory;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;

import java.util.*;

/**
* A where filter that extracts a set of inclusion or exclusion keys from a set table.
*
* <p>
* Each time the set table ticks, the entire where filter is recalculated.
*/
public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency {
Expand Down Expand Up @@ -70,46 +71,83 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi
if (setRefreshing) {
this.setTable = setTable;
setTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
setTable.getRowSet().forAllRowKeys((final long v) -> addKey(makeKey(v)));
if (setTable.getRowSet().isNonempty()) {
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
setTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) {
initialKeysIterator.forEachRemaining(this::addKey);
}
}

final String[] columnNames = Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new);
final ModifiedColumnSet modTokenSet = setTable.newModifiedColumnSet(columnNames);
final String[] setColumnNames =
Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new);
final ModifiedColumnSet setColumnsMCS = setTable.newModifiedColumnSet(setColumnNames);
setUpdateListener = new InstrumentedTableUpdateListenerAdapter(
"DynamicWhereFilter(" + Arrays.toString(setColumnsNames) + ")", setTable, false) {

@Override
public void onUpdate(final TableUpdate upstream) {
if (upstream.added().isEmpty() && upstream.removed().isEmpty()
&& !upstream.modifiedColumnSet().containsAny(modTokenSet)) {
final boolean hasAdds = upstream.added().isNonempty();
final boolean hasRemoves = upstream.removed().isNonempty();
final boolean hasModifies = upstream.modified().isNonempty()
&& upstream.modifiedColumnSet().containsAny(setColumnsMCS);
if (!hasAdds && !hasRemoves && !hasModifies) {
return;
}

final MutableBoolean trueModification = new MutableBoolean(false);
// Remove removed keys
if (hasRemoves) {
try (final CloseableIterator<?> removedKeysIterator = ChunkedColumnIterator.make(
setTupleSource.getPrevSource(), upstream.removed(), getChunkSize(upstream.removed()))) {
removedKeysIterator.forEachRemaining(DynamicWhereFilter.this::removeKey);
}
}

upstream.added().forAllRowKeys((final long v) -> addKey(makeKey(v)));
upstream.removed().forAllRowKeys((final long v) -> removeKey(makePrevKey(v)));
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
// Update modified keys
boolean trueModification = false;
if (hasModifies) {
// @formatter:off
try (final CloseableIterator<?> preModifiedKeysIterator = ChunkedColumnIterator.make(
setTupleSource.getPrevSource(), upstream.getModifiedPreShift(),
getChunkSize(upstream.getModifiedPreShift()));
final CloseableIterator<?> postModifiedKeysIterator = ChunkedColumnIterator.make(
setTupleSource, upstream.modified(),
getChunkSize(upstream.modified()))) {
// @formatter:on
while (preModifiedKeysIterator.hasNext()) {
Assert.assertion(postModifiedKeysIterator.hasNext(),
"Pre and post modified row sets must be the same size; post is exhausted, but pre is not");
final Object oldKey = preModifiedKeysIterator.next();
final Object newKey = postModifiedKeysIterator.next();
if (!Objects.equals(oldKey, newKey)) {
trueModification = true;
removeKey(oldKey);
addKey(newKey);
}
}
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertion(!postModifiedKeysIterator.hasNext(),
"Pre and post modified row sets must be the same size; pre is exhausted, but post is not");
}
}

upstream.forAllModified((preIndex, postIndex) -> {
final Object oldKey = makePrevKey(preIndex);
final Object newKey = makeKey(postIndex);
if (!Objects.equals(oldKey, newKey)) {
trueModification.setTrue();
removeKey(oldKey);
addKey(newKey);
// Add added keys
if (hasAdds) {
try (final CloseableIterator<?> addedKeysIterator = ChunkedColumnIterator.make(
setTupleSource, upstream.added(), getChunkSize(upstream.added()))) {
addedKeysIterator.forEachRemaining(DynamicWhereFilter.this::addKey);
}
});
}

// Pretend every row of the original table was modified, this is essential so that the where clause
// can be re-evaluated based on the updated live set.
if (listener != null) {
if (upstream.added().isNonempty() || trueModification.booleanValue()) {
if (hasAdds || trueModification) {
if (inclusion) {
listener.requestRecomputeUnmatched();
} else {
listener.requestRecomputeMatched();
}
}
if (upstream.removed().isNonempty() || trueModification.booleanValue()) {
if (hasRemoves || trueModification) {
if (inclusion) {
listener.requestRecomputeMatched();
} else {
Expand All @@ -132,8 +170,13 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
} else {
this.setTable = null;
setTupleSource = null;
final TupleSource<?> temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
setTable.getRowSet().forAllRowKeys((final long v) -> addKeyUnchecked(makeKey(temporaryTupleSource, v)));
if (setTable.getRowSet().isNonempty()) {
final TupleSource<?> temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
temporaryTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) {
initialKeysIterator.forEachRemaining(this::addKeyUnchecked);
}
}
kernelValid = liveValuesArrayValid = false;
setInclusionKernel = null;
setUpdateListener = null;
Expand All @@ -145,18 +188,6 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

private Object makeKey(long index) {
return makeKey(setTupleSource, index);
}

private static Object makeKey(TupleSource<?> tupleSource, long index) {
return tupleSource.createTuple(index);
}

private Object makePrevKey(long index) {
return setTupleSource.createPreviousTuple(index);
}

private void removeKey(Object key) {
final boolean removed = liveValues.remove(key);
if (!removed) {
Expand Down Expand Up @@ -225,7 +256,7 @@ public WritableRowSet filter(
if (selection.size() > (selectionIndexer.getGrouping(tupleSource).size() * 2L)) {
return filterGrouping(trackingSelection, selectionIndexer, tupleSource);
} else {
return filterLinear(selection, keyColumns, tupleSource);
return filterLinear(selection, tupleSource);
}
}
final boolean allGrouping = Arrays.stream(keyColumns).allMatch(selectionIndexer::hasGrouping);
Expand All @@ -241,80 +272,61 @@ public WritableRowSet filter(
return filterGrouping(trackingSelection, selectionIndexer, tupleSource);
}
}
return filterLinear(selection, keyColumns, tupleSource);
return filterLinear(selection, tupleSource);
}

private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer,
private WritableRowSet filterGrouping(
TrackingRowSet selection,
RowSetIndexer selectionIndexer,
TupleSource<?> tupleSource) {
final RowSet matchingKeys = selectionIndexer.getSubSetForKeySet(liveValues, tupleSource);
return (inclusion ? matchingKeys.copy() : selection.minus(matchingKeys));
}

private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer, Table table) {
final ColumnSource<?>[] keyColumns = Arrays.stream(matchPairs)
.map(mp -> table.getColumnSource(mp.leftColumn())).toArray(ColumnSource[]::new);
final TupleSource<?> tupleSource = TupleSourceFactory.makeTupleSource(keyColumns);
return filterGrouping(selection, selectionIndexer, tupleSource);
}

private WritableRowSet filterLinear(RowSet selection, ColumnSource<?>[] keyColumns, TupleSource<?> tupleSource) {
if (keyColumns.length == 1) {
return filterLinearOne(selection, keyColumns[0]);
} else {
return filterLinearTuple(selection, tupleSource);
}
}

private WritableRowSet filterLinearOne(RowSet selection, ColumnSource<?> keyColumn) {
private WritableRowSet filterLinear(RowSet selection, TupleSource<?> tupleSource) {
if (selection.isEmpty()) {
return RowSetFactory.empty();
}

if (!kernelValid) {
setInclusionKernel = SetInclusionKernel.makeKernel(keyColumn.getChunkType(), liveValues, inclusion);
setInclusionKernel = SetInclusionKernel.makeKernel(tupleSource.getChunkType(), liveValues, inclusion);
kernelValid = true;
}

final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();

try (final ColumnSource.GetContext getContext = keyColumn.makeGetContext(CHUNK_SIZE);
final RowSequence.Iterator rsIt = selection.getRowSequenceIterator()) {
final WritableLongChunk<OrderedRowKeys> keyIndices = WritableLongChunk.makeWritableChunk(CHUNK_SIZE);
final WritableBooleanChunk<Values> matches = WritableBooleanChunk.makeWritableChunk(CHUNK_SIZE);
final int maxChunkSize = getChunkSize(selection);
// @formatter:off
try (final ColumnSource.GetContext keyGetContext = tupleSource.makeGetContext(maxChunkSize);
final RowSequence.Iterator selectionIterator = selection.getRowSequenceIterator();
final WritableLongChunk<OrderedRowKeys> selectionRowKeyChunk =
WritableLongChunk.makeWritableChunk(maxChunkSize);
final WritableBooleanChunk<Values> matches = WritableBooleanChunk.makeWritableChunk(maxChunkSize)) {
// @formatter:on

while (rsIt.hasMore()) {
final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(CHUNK_SIZE);
while (selectionIterator.hasMore()) {
final RowSequence selectionChunk = selectionIterator.getNextRowSequenceWithLength(maxChunkSize);

final Chunk<Values> chunk = Chunk.downcast(keyColumn.getChunk(getContext, chunkOk));
setInclusionKernel.matchValues(chunk, matches);
final Chunk<Values> keyChunk = Chunk.downcast(tupleSource.getChunk(keyGetContext, selectionChunk));
final int thisChunkSize = keyChunk.size();
setInclusionKernel.matchValues(keyChunk, matches);
Comment on lines +310 to +312
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't anything "special" about SetInclusionKernel; do we always know it will be an object chunk now?

There doesn't seem to be any value in copying into matches, to then just use it later as filter. I think we can simply check against liveValues as we iterate and build index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I combined the single column case and the tuple case, we do not know it will always be an object chunk. Set inclusion let's us do non-boxing inclusion tests. That is, it is special.


keyIndices.setSize(chunk.size());
chunkOk.fillRowKeyChunk(keyIndices);
selectionRowKeyChunk.setSize(thisChunkSize);
selectionChunk.fillRowKeyChunk(selectionRowKeyChunk);

for (int ii = 0; ii < chunk.size(); ++ii) {
for (int ii = 0; ii < thisChunkSize; ++ii) {
if (matches.get(ii)) {
indexBuilder.appendKey(keyIndices.get(ii));
indexBuilder.appendKey(selectionRowKeyChunk.get(ii));
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}


return indexBuilder.build();
}

private WritableRowSet filterLinearTuple(RowSet selection, TupleSource<?> tupleSource) {
final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();

for (final RowSet.Iterator it = selection.iterator(); it.hasNext();) {
final long row = it.nextLong();
final Object tuple = tupleSource.createTuple(row);
if (liveValues.contains(tuple) == inclusion) {
indexBuilder.appendKey(row);
}
}

return indexBuilder.build();
private static int getChunkSize(@NotNull final RowSet selection) {
return (int) Math.min(selection.size(), CHUNK_SIZE);
}

@Override
Expand Down Expand Up @@ -352,9 +364,4 @@ public LogOutput append(LogOutput logOutput) {
return logOutput.append("DynamicWhereFilter(").append(MatchPair.MATCH_PAIR_ARRAY_FORMATTER, matchPairs)
.append(")");
}

@Override
public String toString() {
return new LogOutputStringImpl().append(this).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;

import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.testutil.testcase.RefreshingTableTestCase.printTableUpdates;
Expand Down Expand Up @@ -417,6 +419,21 @@ public void testWhereDynamicIn() {
asList((String[]) DataAccessHelpers.getColumn(result, "X").getDirect()));
assertEquals(1, resultInverse.size());
assertEquals(asList("E"), asList((String[]) DataAccessHelpers.getColumn(resultInverse, "X").getDirect()));

// Real modification to set table, followed by spurious modification to set table
IntStream.range(0, 2).forEach(ri -> {
updateGraph.runWithinUnitTestCycle(() -> {
addToTable(setTable, i(7), col("X", "C"));
setTable.notifyListeners(i(), i(), i(7));
});
showWithRowSet(result);
assertEquals(4, result.size());
assertEquals(asList("A", "B", "C", "A"),
asList((String[]) DataAccessHelpers.getColumn(result, "X").getDirect()));
assertEquals(2, resultInverse.size());
assertEquals(asList("D", "E"),
asList((String[]) DataAccessHelpers.getColumn(resultInverse, "X").getDirect()));
});
}

@Test
Expand Down
Loading