Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunks in DynamicWhereFilter, fix a chunk leak, and cleanup some code generally #4826

Merged
merged 7 commits into from
Nov 15, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
package io.deephaven.engine.table.impl.select;

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.indexer.RowSetIndexer;
import io.deephaven.engine.table.iterators.ChunkedColumnIterator;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.table.impl.*;
Expand All @@ -19,16 +23,14 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.engine.table.impl.TupleSourceFactory;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;

import java.util.*;

/**
* A where filter that extracts a set of inclusion or exclusion keys from a set table.
*
* <p>
* Each time the set table ticks, the entire where filter is recalculated.
*/
public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency {
Expand Down Expand Up @@ -70,46 +72,77 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi
if (setRefreshing) {
this.setTable = setTable;
setTupleSource = TupleSourceFactory.makeTupleSource(setColumns);
setTable.getRowSet().forAllRowKeys((final long v) -> addKey(makeKey(v)));
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
setTupleSource, setTable.getRowSet())) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
initialKeysIterator.forEachRemaining(this::addKey);
}

final String[] columnNames = Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new);
final ModifiedColumnSet modTokenSet = setTable.newModifiedColumnSet(columnNames);
final String[] setColumnNames =
Arrays.stream(matchPairs).map(MatchPair::rightColumn).toArray(String[]::new);
final ModifiedColumnSet setColumnsMCS = setTable.newModifiedColumnSet(setColumnNames);
setUpdateListener = new InstrumentedTableUpdateListenerAdapter(
"DynamicWhereFilter(" + Arrays.toString(setColumnsNames) + ")", setTable, false) {

@Override
public void onUpdate(final TableUpdate upstream) {
if (upstream.added().isEmpty() && upstream.removed().isEmpty()
&& !upstream.modifiedColumnSet().containsAny(modTokenSet)) {
final boolean hasAdds = upstream.added().isNonempty();
final boolean hasRemoves = upstream.removed().isNonempty();
final boolean hasModifies = upstream.modified().isNonempty()
&& upstream.modifiedColumnSet().containsAny(setColumnsMCS);
if (!hasAdds && !hasRemoves && !hasModifies) {
return;
}

final MutableBoolean trueModification = new MutableBoolean(false);
// Remove removed keys
if (hasRemoves) {
try (final CloseableIterator<?> removedKeysIterator = ChunkedColumnIterator.make(
setTupleSource.getPrevSource(), upstream.removed())) {
removedKeysIterator.forEachRemaining(DynamicWhereFilter.this::removeKey);
}
}

upstream.added().forAllRowKeys((final long v) -> addKey(makeKey(v)));
upstream.removed().forAllRowKeys((final long v) -> removeKey(makePrevKey(v)));
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
// Update modified keys
boolean trueModification = false;
if (hasModifies) {
// @formatter:off
try (final CloseableIterator<?> preModifiedKeysIterator = ChunkedColumnIterator.make(
setTupleSource.getPrevSource(), upstream.getModifiedPreShift());
final CloseableIterator<?> postModifiedKeysIterator = ChunkedColumnIterator.make(
setTupleSource, upstream.modified())) {
// @formatter:on
while (preModifiedKeysIterator.hasNext()) {
Assert.assertion(postModifiedKeysIterator.hasNext(),
"Pre and post modified row sets must be the same size");
final Object oldKey = preModifiedKeysIterator.next();
final Object newKey = postModifiedKeysIterator.next();
if (!Objects.equals(oldKey, newKey)) {
trueModification = true;
removeKey(oldKey);
addKey(newKey);
}
}
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
}

upstream.forAllModified((preIndex, postIndex) -> {
final Object oldKey = makePrevKey(preIndex);
final Object newKey = makeKey(postIndex);
if (!Objects.equals(oldKey, newKey)) {
trueModification.setTrue();
removeKey(oldKey);
addKey(newKey);
// Add added keys
if (hasAdds) {
try (final CloseableIterator<?> addedKeysIterator = ChunkedColumnIterator.make(
setTupleSource, upstream.added())) {
addedKeysIterator.forEachRemaining(DynamicWhereFilter.this::addKey);
}
});
}

// Pretend every row of the original table was modified, this is essential so that the where clause
// can be re-evaluated based on the updated live set.
if (listener != null) {
if (upstream.added().isNonempty() || trueModification.booleanValue()) {
if (hasAdds || trueModification) {
if (inclusion) {
listener.requestRecomputeUnmatched();
} else {
listener.requestRecomputeMatched();
}
}
if (upstream.removed().isNonempty() || trueModification.booleanValue()) {
if (hasRemoves || trueModification) {
if (inclusion) {
listener.requestRecomputeMatched();
} else {
Expand Down Expand Up @@ -244,7 +277,9 @@ public WritableRowSet filter(
return filterLinear(selection, keyColumns, tupleSource);
}

private WritableRowSet filterGrouping(TrackingRowSet selection, RowSetIndexer selectionIndexer,
private WritableRowSet filterGrouping(
TrackingRowSet selection,
RowSetIndexer selectionIndexer,
TupleSource<?> tupleSource) {
final RowSet matchingKeys = selectionIndexer.getSubSetForKeySet(liveValues, tupleSource);
return (inclusion ? matchingKeys.copy() : selection.minus(matchingKeys));
Expand Down Expand Up @@ -277,23 +312,28 @@ private WritableRowSet filterLinearOne(RowSet selection, ColumnSource<?> keyColu

final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();

try (final ColumnSource.GetContext getContext = keyColumn.makeGetContext(CHUNK_SIZE);
final RowSequence.Iterator rsIt = selection.getRowSequenceIterator()) {
final WritableLongChunk<OrderedRowKeys> keyIndices = WritableLongChunk.makeWritableChunk(CHUNK_SIZE);
final WritableBooleanChunk<Values> matches = WritableBooleanChunk.makeWritableChunk(CHUNK_SIZE);
final int maxChunkSize = (int) Math.min(CHUNK_SIZE, selection.size());
// @formatter:off
try (final ColumnSource.GetContext keyGetContext = keyColumn.makeGetContext(maxChunkSize);
final RowSequence.Iterator selectionIterator = selection.getRowSequenceIterator();
final WritableLongChunk<OrderedRowKeys> selectionRowKeyChunk =
WritableLongChunk.makeWritableChunk(maxChunkSize);
final WritableBooleanChunk<Values> matches = WritableBooleanChunk.makeWritableChunk(maxChunkSize)) {
// @formatter:on

while (rsIt.hasMore()) {
final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(CHUNK_SIZE);
while (selectionIterator.hasMore()) {
final RowSequence selectionChunk = selectionIterator.getNextRowSequenceWithLength(maxChunkSize);

final Chunk<Values> chunk = Chunk.downcast(keyColumn.getChunk(getContext, chunkOk));
setInclusionKernel.matchValues(chunk, matches);
final Chunk<Values> keyChunk = Chunk.downcast(keyColumn.getChunk(keyGetContext, selectionChunk));
final int thisChunkSize = keyChunk.size();
setInclusionKernel.matchValues(keyChunk, matches);

keyIndices.setSize(chunk.size());
chunkOk.fillRowKeyChunk(keyIndices);
selectionRowKeyChunk.setSize(thisChunkSize);
selectionChunk.fillRowKeyChunk(selectionRowKeyChunk);

for (int ii = 0; ii < chunk.size(); ++ii) {
for (int ii = 0; ii < thisChunkSize; ++ii) {
if (matches.get(ii)) {
indexBuilder.appendKey(keyIndices.get(ii));
indexBuilder.appendKey(selectionRowKeyChunk.get(ii));
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand All @@ -306,11 +346,28 @@ private WritableRowSet filterLinearOne(RowSet selection, ColumnSource<?> keyColu
private WritableRowSet filterLinearTuple(RowSet selection, TupleSource<?> tupleSource) {
final RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();

for (final RowSet.Iterator it = selection.iterator(); it.hasNext();) {
final long row = it.nextLong();
final Object tuple = tupleSource.createTuple(row);
if (liveValues.contains(tuple) == inclusion) {
indexBuilder.appendKey(row);
final int maxChunkSize = (int) Math.min(CHUNK_SIZE, selection.size());
// @formatter:off
try (final ColumnSource.GetContext keyGetContext = tupleSource.makeGetContext(maxChunkSize);
final RowSequence.Iterator selectionIterator = selection.getRowSequenceIterator();
final WritableLongChunk<OrderedRowKeys> selectionRowKeyChunk =
WritableLongChunk.makeWritableChunk(maxChunkSize)) {
// @formatter:on

while (selectionIterator.hasMore()) {
final RowSequence selectionChunk = selectionIterator.getNextRowSequenceWithLength(maxChunkSize);

final ObjectChunk<?, ?> keyChunk = tupleSource.getChunk(keyGetContext, selectionChunk).asObjectChunk();
final int thisChunkSize = keyChunk.size();

selectionRowKeyChunk.setSize(thisChunkSize);
selectionChunk.fillRowKeyChunk(selectionRowKeyChunk);

for (int ii = 0; ii < thisChunkSize; ++ii) {
if (liveValues.contains(keyChunk.get(ii)) == inclusion) {
indexBuilder.appendKey(selectionRowKeyChunk.get(ii));
}
}
}
}

Expand Down Expand Up @@ -352,9 +409,4 @@ public LogOutput append(LogOutput logOutput) {
return logOutput.append("DynamicWhereFilter(").append(MatchPair.MATCH_PAIR_ARRAY_FORMATTER, matchPairs)
.append(")");
}

@Override
public String toString() {
return new LogOutputStringImpl().append(this).toString();
}
}
Loading