Skip to content

Commit

Permalink
Clean up DynamicWhereFilter a bit, and make TestConcurrentInstantiati…
Browse files Browse the repository at this point in the history
…on pass
  • Loading branch information
rcaudy committed Mar 14, 2024
1 parent de6c8c0 commit 0e3a802
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.*;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
Expand All @@ -18,9 +19,10 @@
import io.deephaven.engine.table.impl.select.setinclusion.SetInclusionKernel;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.iterators.ChunkedColumnIterator;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReferentialIntegrity;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -32,39 +34,42 @@
* Each time the set table ticks, the entire where filter is recalculated.
*/
public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency {

private static final int CHUNK_SIZE = 1 << 16;

private final boolean setRefreshing;
private final MatchPair[] matchPairs;
private final ChunkSource.WithPrev<Values> setKeySource;
private final boolean inclusion;
private final ChunkSource.WithPrev<Values> setKeySource;

private final DataIndexKeySet liveValues;
private boolean liveValuesArrayValid = false;
private boolean kernelValid = false;
private Object[] liveValuesArray = null;
private SetInclusionKernel setInclusionKernel = null;

// this reference must be maintained for reachability
@SuppressWarnings({"FieldCanBeLocal", "unused"})
private final QueryTable setTable;

@SuppressWarnings("FieldCanBeLocal")
// this reference must be maintained for reachability
@ReferentialIntegrity
private final InstrumentedTableUpdateListener setUpdateListener;

/** Stores the optimal data index for this filter. */
/**
* The source table to be filtered. Recorded when initializing data indexes
*/
@ReferentialIntegrity
private Table sourceTable;
/**
* The optimal data index for this filter.
*/
@Nullable
private DataIndex sourceDataIndex;

private RecomputeListener listener;
private QueryTable resultTable;

public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, final MatchPair... setColumnsNames) {
setRefreshing = setTable.isRefreshing();
if (setRefreshing) {
if (setTable.isRefreshing()) {
updateGraph.checkInitiateSerialTableOperation();
}

this.matchPairs = setColumnsNames;
this.inclusion = inclusion;

Expand All @@ -73,8 +78,7 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi
final ColumnSource<?>[] setColumns = Arrays.stream(matchPairs)
.map(mp -> setTable.getColumnSource(mp.rightColumn())).toArray(ColumnSource[]::new);

if (setRefreshing) {
this.setTable = setTable;
if (setTable.isRefreshing()) {
setKeySource = DataIndexUtils.makeBoxedKeySource(setColumns);
if (setTable.getRowSet().isNonempty()) {
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
Expand Down Expand Up @@ -173,9 +177,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {

manage(setUpdateListener);
} else {
this.setTable = null;
setKeySource = null;

if (setTable.getRowSet().isNonempty()) {
final ChunkSource.WithPrev<Values> tmpKeySource = DataIndexUtils.makeBoxedKeySource(setColumns);
try (final CloseableIterator<?> initialKeysIterator = ChunkedColumnIterator.make(
Expand Down Expand Up @@ -216,13 +218,38 @@ private void addKeyUnchecked(Object key) {
liveValues.add(key);
}

/**
* Initializes the data index for this filter, if not already initialized. It is an error to call this method with
* more than one distinct {@code sourceTable}. If {@sourceTable} is refreshing, this method must only be invoked
* when it's {@link UpdateGraph#checkInitiateSerialTableOperation() safe} to initialize serial table operations.
*
* @param sourceTable The table that this DynamicWhereFilter will be used to filter
*/
public void initializeDataIndex(@NotNull final Table sourceTable) {
if (this.sourceTable != null) {
if (this.sourceTable == sourceTable) {
return;
}
throw new IllegalStateException(String.format(
"Data indexes already initialized with source table %s, cannot reinitialize with %s",
this.sourceTable, sourceTable));
}
this.sourceTable = sourceTable;
try (final SafeCloseable ignored = sourceTable.isRefreshing() ? LivenessScopeStack.open() : null) {
sourceDataIndex = optimalIndex(matchPairs, sourceTable);
if (sourceDataIndex != null && sourceDataIndex.isRefreshing()) {
manage(sourceDataIndex);
}
}
}

/**
* Returns the optimal data index for the supplied table, or null if no index is available. The ideal index would
* contain all key columns but a partial match is also acceptable.
*/
@Nullable
private DataIndex optimalIndex(final Table inputTable) {
final String[] keyColumnNames = MatchPair.getLeftColumns(matchPairs);
private static DataIndex optimalIndex(final MatchPair[] filterPairs, final Table inputTable) {
final String[] keyColumnNames = MatchPair.getLeftColumns(filterPairs);

final DataIndex fullIndex = DataIndexer.getDataIndex(inputTable, keyColumnNames);
if (fullIndex != null) {
Expand All @@ -244,13 +271,8 @@ public List<String> getColumnArrays() {

@Override
public List<DataIndex> getDataIndexes(final Table sourceTable) {
if (sourceDataIndex == null) {
sourceDataIndex = optimalIndex(sourceTable);
}
if (sourceDataIndex == null) {
return Collections.emptyList();
}
return List.of(sourceDataIndex);
initializeDataIndex(sourceTable);
return sourceDataIndex == null ? List.of() : List.of(sourceDataIndex);
}

@Override
Expand Down Expand Up @@ -492,21 +514,21 @@ public boolean isSimpleFilter() {

@Override
public boolean isRefreshing() {
return setRefreshing;
return setUpdateListener != null;
}

@Override
public void setRecomputeListener(RecomputeListener listener) {
this.listener = listener;
this.resultTable = listener.getTable();
if (DynamicNode.isDynamicAndIsRefreshing(setTable)) {
if (isRefreshing()) {
listener.setIsRefreshing(true);
}
}

@Override
public DynamicWhereFilter copy() {
return new DynamicWhereFilter(setTable, inclusion, matchPairs);
throw new UnsupportedOperationException("DynamicWhereFilter cannot be copied");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,13 @@ private void testWhereDynamicInternal(final boolean sourceIndexed, final boolean
DataIndexer.getOrCreateDataIndex(whereTable, "z");
}

final DynamicWhereFilter filter = updateGraph.exclusiveLock().computeLocked(
() -> new DynamicWhereFilter(whereTable, true, MatchPairFactory.getExpressions("z")));
final DynamicWhereFilter filter = updateGraph.sharedLock().computeLocked(
() -> {
final DynamicWhereFilter result =
new DynamicWhereFilter(whereTable, true, MatchPairFactory.getExpressions("z"));
result.initializeDataIndex(table);
return result;
});

updateGraph.startCycleForUnitTests(false);

Expand Down

0 comments on commit 0e3a802

Please sign in to comment.