From 893d644e2161c4e4ab9023d3ec24683e012f4e01 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Wed, 14 Aug 2024 15:53:20 -0600 Subject: [PATCH] Refactoring Based on Review and Discussion --- .../engine/table/impl/QueryTable.java | 95 +- .../table/impl/SelectOrUpdateListener.java | 18 +- .../table/impl/ShiftedColumnsFactory.java | 4 +- .../impl/select/analyzers/BaseLayer.java | 86 -- .../select/analyzers/ConstantColumnLayer.java | 42 +- .../select/analyzers/DependencyLayerBase.java | 47 +- .../select/analyzers/PreserveColumnLayer.java | 45 +- .../select/analyzers/RedirectionLayer.java | 49 +- .../analyzers/SelectAndViewAnalyzer.java | 969 +++++++++++------- .../SelectAndViewAnalyzerWrapper.java | 128 --- .../select/analyzers/SelectColumnLayer.java | 191 ++-- .../analyzers/SelectOrViewColumnLayer.java | 9 +- .../select/analyzers/StaticFlattenLayer.java | 124 --- .../select/analyzers/ViewColumnLayer.java | 29 +- 14 files changed, 796 insertions(+), 1040 deletions(-) delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 6b709ee24d3..c596c9fd2f0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -43,7 +43,6 @@ import io.deephaven.engine.table.impl.remote.ConstructSnapshot; import io.deephaven.engine.table.impl.select.*; import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer; -import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; import io.deephaven.engine.table.impl.snapshot.SnapshotIncrementalListener; import io.deephaven.engine.table.impl.snapshot.SnapshotInternalListener; import io.deephaven.engine.table.impl.snapshot.SnapshotUtils; @@ -72,6 +71,7 @@ import io.deephaven.util.type.ArrayTypeUtils; import io.deephaven.vector.Vector; import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -1498,10 +1498,10 @@ public Table update(final Collection newColumns) { */ public SelectValidationResult validateSelect(final SelectColumn... selectColumns) { final SelectColumn[] clones = SelectColumn.copyFrom(selectColumns); - SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, columns, rowSet, getModifiedColumnSetForUpdates(), true, + SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create( + this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, getModifiedColumnSetForUpdates(), true, false, clones); - return new SelectValidationResult(analyzerWrapper.getAnalyzer(), clones); + return new SelectValidationResult(analyzerContext.createAnalyzer(), clones); } private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns) { @@ -1526,18 +1526,17 @@ private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns) } } final boolean publishTheseSources = flavor == Flavor.Update; - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSources, true, + final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create( + this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true, selectColumns); - final SelectAndViewAnalyzer analyzer = analyzerWrapper.getAnalyzer(); - final SelectColumn[] processedColumns = analyzerWrapper.getProcessedColumns() + final SelectAndViewAnalyzer analyzer = analyzerContext.createAnalyzer(); + final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); // Init all the rows by cooking up a fake Update final TableUpdate fakeUpdate = new TableUpdateImpl( - analyzer.flatResult() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(), - RowSetFactory.empty(), RowSetFactory.empty(), + rowSet.copy(), RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.ALL); final CompletableFuture waitForResult = new CompletableFuture<>(); @@ -1558,8 +1557,23 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc new SelectAndViewAnalyzer.UpdateHelper(emptyRowSet, fakeUpdate)) { try { - analyzer.applyUpdate(fakeUpdate, emptyRowSet, updateHelper, jobScheduler, - liveResultCapture, analyzer.futureCompletionHandler(waitForResult)); + final MutableBoolean errorOccurred = new MutableBoolean(); + + analyzer.applyUpdate( + fakeUpdate, emptyRowSet, updateHelper, jobScheduler, liveResultCapture, + () -> { + if (errorOccurred.booleanValue()) { + return; + } + waitForResult.complete(null); + }, + err -> { + if (errorOccurred.booleanValue()) { + return; + } + errorOccurred.setTrue(); + waitForResult.completeExceptionally(err); + }); } catch (Exception e) { waitForResult.completeExceptionally(e); } @@ -1580,12 +1594,13 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc } } - final TrackingRowSet resultRowSet = - analyzer.flattenedResult() ? RowSetFactory.flat(rowSet.size()).toTracking() : rowSet; - resultTable = new QueryTable(resultRowSet, analyzerWrapper.getPublishedColumnResources()); + final TrackingRowSet resultRowSet = analyzer.flatResult() && !rowSet.isFlat() + ? RowSetFactory.flat(rowSet.size()).toTracking() + : rowSet; + resultTable = new QueryTable(resultRowSet, analyzerContext.getPublishedColumnSources()); if (liveResultCapture != null) { analyzer.startTrackingPrev(); - final Map effects = analyzerWrapper.calcEffects(); + final Map effects = analyzerContext.calcEffects(); final SelectOrUpdateListener soul = new SelectOrUpdateListener(updateDescription, this, resultTable, effects, analyzer); liveResultCapture.transferTo(soul); @@ -1596,7 +1611,8 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc resultTable.setFlat(); } propagateDataIndexes(processedColumns, resultTable); - for (final ColumnSource columnSource : analyzer.getNewColumnSources().values()) { + for (final ColumnSource columnSource : analyzerContext.getSelectedColumnSources() + .values()) { if (columnSource instanceof PossiblyImmutableColumnSource) { ((PossiblyImmutableColumnSource) columnSource).setImmutable(); } @@ -1610,10 +1626,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc } else { maybeCopyColumnDescriptions(resultTable); } - SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = flavor == Flavor.Update - ? SelectAndViewAnalyzerWrapper.UpdateFlavor.Update - : SelectAndViewAnalyzerWrapper.UpdateFlavor.Select; - return analyzerWrapper.applyShiftsAndRemainingColumns(this, resultTable, updateFlavor); + SelectAndViewAnalyzer.UpdateFlavor updateFlavor = flavor == Flavor.Update + ? SelectAndViewAnalyzer.UpdateFlavor.Update + : SelectAndViewAnalyzer.UpdateFlavor.Select; + return analyzerContext.applyShiftsAndRemainingColumns(this, resultTable, updateFlavor); })); } @@ -1761,15 +1777,17 @@ updateDescription, sizeForInstrumentation(), () -> { createSnapshotControlIfRefreshing(OperationSnapshotControl::new); initializeWithSnapshot(humanReadablePrefix, sc, (usePrev, beforeClockValue) -> { final boolean publishTheseSources = flavor == Flavor.UpdateView; - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, columns, rowSet, - getModifiedColumnSetForUpdates(), publishTheseSources, true, viewColumns); - final SelectColumn[] processedViewColumns = analyzerWrapper.getProcessedColumns() + final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = + SelectAndViewAnalyzer.create( + this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, + getModifiedColumnSetForUpdates(), publishTheseSources, true, + viewColumns); + final SelectColumn[] processedViewColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); QueryTable queryTable = new QueryTable( - rowSet, analyzerWrapper.getPublishedColumnResources()); + rowSet, analyzerContext.getPublishedColumnSources()); if (sc != null) { - final Map effects = analyzerWrapper.calcEffects(); + final Map effects = analyzerContext.calcEffects(); final TableUpdateListener listener = new ViewOrUpdateViewListener(updateDescription, this, queryTable, effects); sc.setListenerAndResult(listener, queryTable); @@ -1786,11 +1804,11 @@ updateDescription, sizeForInstrumentation(), () -> { } else { maybeCopyColumnDescriptions(queryTable); } - final SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = + final SelectAndViewAnalyzer.UpdateFlavor updateFlavor = flavor == Flavor.UpdateView - ? SelectAndViewAnalyzerWrapper.UpdateFlavor.UpdateView - : SelectAndViewAnalyzerWrapper.UpdateFlavor.View; - queryTable = analyzerWrapper.applyShiftsAndRemainingColumns( + ? SelectAndViewAnalyzer.UpdateFlavor.UpdateView + : SelectAndViewAnalyzer.UpdateFlavor.View; + queryTable = analyzerContext.applyShiftsAndRemainingColumns( this, queryTable, updateFlavor); result.setValue(queryTable); @@ -1851,14 +1869,13 @@ public Table lazyUpdate(final Collection newColumns) { sizeForInstrumentation(), () -> { checkInitiateOperation(); - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, columns, rowSet, - getModifiedColumnSetForUpdates(), - true, true, selectColumns); - final SelectColumn[] processedColumns = analyzerWrapper.getProcessedColumns() + final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create( + this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, + getModifiedColumnSetForUpdates(), true, true, selectColumns); + final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); final QueryTable result = new QueryTable( - rowSet, analyzerWrapper.getPublishedColumnResources()); + rowSet, analyzerContext.getPublishedColumnSources()); if (isRefreshing()) { addUpdateListener(new ListenerImpl( "lazyUpdate(" + Arrays.deepToString(processedColumns) + ')', this, result)); @@ -1868,8 +1885,8 @@ public Table lazyUpdate(final Collection newColumns) { copySortableColumns(result, processedColumns); maybeCopyColumnDescriptions(result, processedColumns); - return analyzerWrapper.applyShiftsAndRemainingColumns( - this, result, SelectAndViewAnalyzerWrapper.UpdateFlavor.LazyUpdate); + return analyzerContext.applyShiftsAndRemainingColumns( + this, result, SelectAndViewAnalyzer.UpdateFlavor.LazyUpdate); }); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 6a1c0a653a6..f5a0bddc7db 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -15,7 +15,6 @@ import io.deephaven.engine.table.impl.util.JobScheduler; import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; -import java.util.BitSet; import java.util.Map; /** @@ -29,8 +28,6 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl { private final SelectAndViewAnalyzer analyzer; private volatile boolean updateInProgress = false; - private final BitSet completedColumns = new BitSet(); - private final BitSet allNewColumns = new BitSet(); private final boolean enableParallelUpdate; /** @@ -61,7 +58,6 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl { (QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE && getUpdateGraph().parallelismFactor() > 1)) && analyzer.allowCrossColumnParallelization(); - analyzer.setAllNewColumns(allNewColumns); } @Override @@ -76,7 +72,6 @@ public void onUpdate(final TableUpdate upstream) { // - create parallel arrays of pre-shift-keys and post-shift-keys so we can move them in chunks updateInProgress = true; - completedColumns.clear(); final TableUpdate acquiredUpdate = upstream.acquire(); final WritableRowSet toClear = resultRowSet.copyPrev(); @@ -92,17 +87,8 @@ public void onUpdate(final TableUpdate upstream) { } analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, - new SelectAndViewAnalyzer.Layer.CompletionHandler(allNewColumns, completedColumns) { - @Override - public void onAllRequiredColumnsCompleted() { - completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper); - } - - @Override - protected void onError(Exception error) { - handleException(error); - } - }); + () -> completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper), + this::handleException); } private void handleException(Exception e) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java index 4ab1c70248c..c84deddd93e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java @@ -117,7 +117,7 @@ import io.deephaven.engine.table.impl.select.FormulaColumn; import io.deephaven.engine.table.impl.select.WhereFilter; import io.deephaven.engine.table.impl.select.WhereFilterFactory; -import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; +import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer; import io.deephaven.util.mutable.MutableInt; import org.jetbrains.annotations.NotNull; @@ -260,7 +260,7 @@ private static Pair getShiftedTableFilterPair( public static Table getShiftedColumnsTable( @NotNull final Table source, @NotNull FormulaColumn formulaColumn, - @NotNull SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor) { + @NotNull SelectAndViewAnalyzer.UpdateFlavor updateFlavor) { String nuggetName = "getShiftedColumnsTable( " + formulaColumn + ", " + updateFlavor + ") "; return QueryPerformanceRecorder.withNugget(nuggetName, source.sizeForInstrumentation(), () -> { Table tableSoFar = source; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java deleted file mode 100644 index 01b749efe5e..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java +++ /dev/null @@ -1,86 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.select.analyzers; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.engine.liveness.LivenessNode; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; - -import java.util.*; - -public class BaseLayer extends SelectAndViewAnalyzer.Layer { - private final Map> sources; - private final boolean publishTheseSources; - - BaseLayer(Map> sources, boolean publishTheseSources) { - super(BASE_LAYER_INDEX); - this.sources = sources; - this.publishTheseSources = publishTheseSources; - } - - @Override - Set getLayerColumnNames() { - return sources.keySet(); - } - - @Override - void populateModifiedColumnSetInReverse( - final ModifiedColumnSet mcsBuilder, - final Set remainingDepsToSatisfy) { - mcsBuilder.setAll(remainingDepsToSatisfy.toArray(String[]::new)); - } - - @Override - void populateColumnSources( - final Map> result, - final GetMode mode) { - // We specifically return a LinkedHashMap so the columns get populated in order - if (mode == GetMode.All || (mode == GetMode.Published && publishTheseSources)) { - result.putAll(sources); - } - } - - @Override - public CompletionHandler createUpdateHandler( - final TableUpdate upstream, - final RowSet toClear, - final SelectAndViewAnalyzer.UpdateHelper helper, - final JobScheduler jobScheduler, - @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion) { - return new CompletionHandler(new BitSet(), onCompletion) { - @Override - protected void onAllRequiredColumnsCompleted() { - // nothing to do at the base layer - onCompletion.onLayerCompleted(getLayerIndex()); - } - }; - } - - @Override - final void calcDependsOn( - final Map> result, - boolean forcePublishAllSources) { - if (publishTheseSources || forcePublishAllSources) { - for (final String col : sources.keySet()) { - result.computeIfAbsent(col, dummy -> new HashSet<>()).add(col); - } - } - } - - @Override - boolean allowCrossColumnParallelization() { - return true; - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("{BaseLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java index d385cbcfabe..4738d944825 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java @@ -5,35 +5,23 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSequence; -import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.WritableColumnSource; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.VectorChunkAdapter; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; - -import java.util.Arrays; -import java.util.BitSet; public class ConstantColumnLayer extends SelectOrViewColumnLayer { - private final BitSet dependencyBitSet; ConstantColumnLayer( - SelectAndViewAnalyzer analyzer, - String name, - SelectColumn sc, - WritableColumnSource ws, - String[] deps, - ModifiedColumnSet mcsBuilder) { - super(analyzer, name, sc, ws, null, deps, mcsBuilder); - this.dependencyBitSet = new BitSet(); - Arrays.stream(deps).mapToInt(analyzer::getLayerIndexFor).forEach(dependencyBitSet::set); + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final WritableColumnSource ws, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(context, sc, ws, null, deps, mcsBuilder); initialize(ws); } @@ -54,22 +42,8 @@ private void initialize(final WritableColumnSource writableSource) { } @Override - public CompletionHandler createUpdateHandler( - final TableUpdate upstream, - final RowSet toClear, - final SelectAndViewAnalyzer.UpdateHelper helper, - final JobScheduler jobScheduler, - @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion) { - // Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g. - // because they are SelectColumnLayers) - return new CompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we don't need to do anything specific here; our result value is constant - onCompletion.onLayerCompleted(getLayerIndex()); - } - }; + public boolean hasRefreshingLogic() { + return false; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java index 3c703382ea5..44e081451f6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java @@ -15,26 +15,24 @@ public abstract class DependencyLayerBase extends SelectAndViewAnalyzer.Layer { final SelectColumn selectColumn; final boolean selectColumnHoldsVector; final ColumnSource columnSource; - // probably don't need this any more - private final String[] dependencies; final ModifiedColumnSet myModifiedColumnSet; + final BitSet myLayerDependencySet; DependencyLayerBase( - final SelectAndViewAnalyzer analyzer, - final String name, + final SelectAndViewAnalyzer.AnalyzerContext context, final SelectColumn selectColumn, final ColumnSource columnSource, final String[] dependencies, final ModifiedColumnSet mcsBuilder) { - super(analyzer.getNextLayerIndex()); - this.name = name; + super(context.getNextLayerIndex()); + this.name = selectColumn.getName(); this.selectColumn = selectColumn; selectColumnHoldsVector = Vector.class.isAssignableFrom(selectColumn.getReturnedType()); this.columnSource = columnSource; - this.dependencies = dependencies; - final Set remainingDepsToSatisfy = new HashSet<>(Arrays.asList(dependencies)); - analyzer.populateModifiedColumnSet(mcsBuilder, remainingDepsToSatisfy); + context.populateModifiedColumnSet(mcsBuilder, dependencies); this.myModifiedColumnSet = mcsBuilder; + this.myLayerDependencySet = new BitSet(); + context.populateLayerDependencySet(myLayerDependencySet, dependencies); } @Override @@ -43,34 +41,7 @@ Set getLayerColumnNames() { } @Override - void populateModifiedColumnSetInReverse( - final ModifiedColumnSet mcsBuilder, - final Set remainingDepsToSatisfy) { - // Later-defined columns override earlier-defined columns. So we satisfy column dependencies "on the way - // down" the recursion. - if (remainingDepsToSatisfy.remove(name)) { - // Caller had a dependency on us, so caller gets our dependencies - mcsBuilder.setAll(myModifiedColumnSet); - } - } - - @Override - void calcDependsOn( - final Map> result, - final boolean forcePublishAllSources) { - - final Set thisResult = new HashSet<>(); - for (final String dep : dependencies) { - final Set innerDependencies = result.get(dep); - if (innerDependencies == null) { - // There are no further expansions of 'dep', so add it as a dependency. - thisResult.add(dep); - } else { - // Instead of adding 'dep', add what 'dep' expands to. - thisResult.addAll(innerDependencies); - } - } - - result.put(name, thisResult); + public ModifiedColumnSet getModifiedColumnSet() { + return myModifiedColumnSet; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java index b85ead29b3a..9b0f4b690f7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java @@ -4,17 +4,10 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.log.LogOutput; -import io.deephaven.engine.liveness.LivenessNode; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; -import java.util.Arrays; -import java.util.BitSet; import java.util.Map; /** @@ -23,50 +16,24 @@ * {@implNote This class is part of the Deephaven engine, and not intended for direct use.} */ final public class PreserveColumnLayer extends DependencyLayerBase { - private final BitSet dependencyBitSet; PreserveColumnLayer( - final SelectAndViewAnalyzer analyzer, - final String name, + final SelectAndViewAnalyzer.AnalyzerContext context, final SelectColumn sc, final ColumnSource cs, final String[] deps, final ModifiedColumnSet mcsBuilder) { - super(analyzer, name, sc, cs, deps, mcsBuilder); - this.dependencyBitSet = new BitSet(); - Arrays.stream(deps).mapToInt(analyzer::getLayerIndexFor).forEach(dependencyBitSet::set); + super(context, sc, cs, deps, mcsBuilder); } @Override - public CompletionHandler createUpdateHandler( - final TableUpdate upstream, - final RowSet toClear, - final SelectAndViewAnalyzer.UpdateHelper helper, - final JobScheduler jobScheduler, - @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion) { - return new CompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we don't need to do anything specific here - onCompletion.onLayerCompleted(getLayerIndex()); - } - }; + public boolean hasRefreshingLogic() { + return false; } @Override - void populateColumnSources( - final Map> result, - final GetMode mode) { - switch (mode) { - case New: - // we have no new sources - break; - case Published: - case All: - result.put(name, columnSource); - break; - } + void populateColumnSources(final Map> result) { + result.put(name, columnSource); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java index 1245898b46b..46fe6ea4c31 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java @@ -4,7 +4,6 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; @@ -17,6 +16,7 @@ import org.jetbrains.annotations.Nullable; import java.util.*; +import java.util.function.Consumer; /** * A layer that maintains the row redirection for future SelectColumnLayers. @@ -27,12 +27,14 @@ public final class RedirectionLayer extends SelectAndViewAnalyzer.Layer { private final TrackingRowSet resultRowSet; private final WritableRowRedirection rowRedirection; private final WritableRowSet freeValues = RowSetFactory.empty(); + private final BitSet layerDependencySet = new BitSet(); private long maxInnerIndex; - RedirectionLayer(SelectAndViewAnalyzer analyzer, TrackingRowSet resultRowSet, - WritableRowRedirection rowRedirection) { - super(REDIRECTION_LAYER_INDEX); - Assert.eq(analyzer.getNextLayerIndex(), "analyzer.getNextLayerIndex()", REDIRECTION_LAYER_INDEX); + RedirectionLayer( + final SelectAndViewAnalyzer.AnalyzerContext context, + final TrackingRowSet resultRowSet, + final WritableRowRedirection rowRedirection) { + super(context.getNextLayerIndex()); this.resultRowSet = resultRowSet; this.rowRedirection = rowRedirection; this.maxInnerIndex = -1; @@ -44,24 +46,18 @@ Set getLayerColumnNames() { } @Override - void populateModifiedColumnSetInReverse( - final ModifiedColumnSet mcsBuilder, - final Set remainingDepsToSatisfy) { + void populateColumnSources(final Map> result) { // we don't generate any column sources, so we don't need to do anything here } @Override - void populateColumnSources( - final Map> result, - final GetMode mode) { - // we don't generate any column sources, so we don't need to do anything here + ModifiedColumnSet getModifiedColumnSet() { + return ModifiedColumnSet.EMPTY; } @Override - void calcDependsOn( - final Map> result, - final boolean forcePublishAllSources) { - // we don't generate any column sources, so we don't need to do anything here + BitSet getLayerDependencySet() { + return layerDependencySet; } @Override @@ -70,28 +66,21 @@ boolean allowCrossColumnParallelization() { } @Override - public CompletionHandler createUpdateHandler( + public Runnable createUpdateHandler( final TableUpdate upstream, final RowSet toClear, final SelectAndViewAnalyzer.UpdateHelper helper, final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion) { - final BitSet baseLayerBitSet = new BitSet(); - baseLayerBitSet.set(BASE_LAYER_INDEX); - return new CompletionHandler(baseLayerBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we only have a base layer underneath us, so we do not care about the bitSet; it is always - // empty - doApplyUpdate(upstream, onCompletion); - } - }; + final Runnable onSuccess, + final Consumer onError) { + // note that we process this layer directly because all subsequent layers depend on it + return () -> doApplyUpdate(upstream, onSuccess); } private void doApplyUpdate( final TableUpdate upstream, - final CompletionHandler onCompletion) { + final Runnable onSuccess) { // we need to remove the removed values from our row redirection, and add them to our free RowSet; so that // updating tables will not consume more space over the course of a day for abandoned rows final RowSetBuilderRandom innerToFreeBuilder = RowSetFactory.builderRandom(); @@ -162,7 +151,7 @@ private void doApplyUpdate( freeValues.removeRange(0, lastAllocated.get()); } - onCompletion.onLayerCompleted(getLayerIndex()); + onSuccess.run(); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index b1dce4e6e27..570b4412916 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -15,28 +15,32 @@ import io.deephaven.engine.table.impl.MatchPair; import io.deephaven.engine.table.impl.QueryCompilerRequestProcessor; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.ShiftedColumnsFactory; import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.select.FormulaColumn; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.SourceColumn; import io.deephaven.engine.table.impl.select.SwitchColumn; import io.deephaven.engine.table.impl.sources.InMemoryColumnSource; +import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; import io.deephaven.engine.table.impl.sources.SingleValueColumnSource; import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource; import io.deephaven.engine.table.impl.util.InverseWrappedRowSetRowRedirection; import io.deephaven.engine.table.impl.util.JobScheduler; import io.deephaven.engine.table.impl.util.RowRedirection; +import io.deephaven.engine.table.impl.util.WrappedRowSetRowRedirection; import io.deephaven.engine.table.impl.util.WritableRowRedirection; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.SafeCloseablePair; import io.deephaven.vector.Vector; -import org.apache.commons.lang3.mutable.MutableObject; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Stream; @@ -47,6 +51,9 @@ public class SelectAndViewAnalyzer implements LogOutputAppendable { public enum Mode { VIEW_LAZY, VIEW_EAGER, SELECT_STATIC, SELECT_REFRESHING, SELECT_REDIRECTED_REFRESHING, SELECT_REDIRECTED_STATIC } + public enum UpdateFlavor { + Select, View, Update, UpdateView, LazyUpdate + } public static void initializeSelectColumns( final Map> parentColumnMap, @@ -69,36 +76,22 @@ public static void initializeSelectColumns( } } - public static SelectAndViewAnalyzerWrapper create( - QueryTable sourceTable, Mode mode, Map> columnSources, - TrackingRowSet rowSet, ModifiedColumnSet parentMcs, boolean publishTheseSources, boolean useShiftedColumns, - SelectColumn... selectColumns) { - return create(sourceTable, mode, columnSources, rowSet, parentMcs, publishTheseSources, useShiftedColumns, - true, selectColumns); - } - - public static SelectAndViewAnalyzerWrapper create( + public static AnalyzerContext create( final QueryTable sourceTable, final Mode mode, - final Map> columnSources, - TrackingRowSet rowSet, final ModifiedColumnSet parentMcs, final boolean publishTheseSources, boolean useShiftedColumns, - final boolean allowInternalFlatten, final SelectColumn... selectColumns) { final UpdateGraph updateGraph = sourceTable.getUpdateGraph(); - final Map> allColumnSources = new HashMap<>(columnSources); - final SelectAndViewAnalyzer analyzer = new SelectAndViewAnalyzer() { - @Override - void addLayer(final Layer layer) { - super.addLayer(layer); - layer.populateColumnSources(allColumnSources, Layer.GetMode.All); - } - }; + final Map> columnSources = sourceTable.getColumnSourceMap(); + final TrackingRowSet rowSet = sourceTable.getRowSet(); - analyzer.addLayer(new BaseLayer(columnSources, publishTheseSources)); + final boolean flatResult = !sourceTable.isFlat() + && (columnSources.isEmpty() || !publishTheseSources) + && mode == Mode.SELECT_STATIC; + final AnalyzerContext context = new AnalyzerContext(sourceTable, publishTheseSources, flatResult); final Map> columnDefinitions = new LinkedHashMap<>(); final RowRedirection rowRedirection; @@ -107,19 +100,12 @@ void addLayer(final Layer layer) { } else if (mode == Mode.SELECT_REDIRECTED_REFRESHING && rowSet.size() < Integer.MAX_VALUE) { final WritableRowRedirection writableRowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(rowSet.intSize()); - analyzer.addLayer(new RedirectionLayer(analyzer, rowSet, writableRowRedirection)); + context.addLayer(new RedirectionLayer(context, rowSet, writableRowRedirection)); rowRedirection = writableRowRedirection; } else { rowRedirection = null; } - List processedCols = new LinkedList<>(); - List remainingCols = null; - FormulaColumn shiftColumn = null; - boolean shiftColumnHasPositiveOffset = false; - - final HashSet resultColumns = new HashSet<>(); - // First pass to initialize all columns and to compile formulas in one batch. final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch(); for (Map.Entry> entry : columnSources.entrySet()) { @@ -129,9 +115,10 @@ void addLayer(final Layer layer) { columnDefinitions.put(name, cd); } + final Set resultColumnNames = new HashSet<>(); for (final SelectColumn sc : selectColumns) { - if (remainingCols != null) { - remainingCols.add(sc); + if (context.remainingCols != null) { + context.remainingCols.add(sc); continue; } @@ -141,49 +128,43 @@ void addLayer(final Layer layer) { columnDefinitions.put(sc.getName(), cd); if (useShiftedColumns && hasConstantArrayAccess(sc)) { - remainingCols = new LinkedList<>(); - shiftColumn = sc instanceof FormulaColumn + context.remainingCols = new LinkedList<>(); + context.shiftColumn = sc instanceof FormulaColumn ? (FormulaColumn) sc : (FormulaColumn) ((SwitchColumn) sc).getRealColumn(); - shiftColumnHasPositiveOffset = hasPositiveOffsetConstantArrayAccess(sc); + context.shiftColumnHasPositiveOffset = hasPositiveOffsetConstantArrayAccess(sc); continue; } - processedCols.add(sc); + // In our first pass, determine whether any columns will be preserved so that we don't prematurely flatten. + final SourceColumn realColumn = tryToGetSourceColumn(sc); + + if (realColumn != null && !resultColumnNames.contains(realColumn.getSourceName())) { + // if we are preserving a column, then we cannot change key space + context.flatResult &= !shouldPreserve(sc, columnSources.get(realColumn.getSourceName())); + } + + // TODO (deephaven#5760): If layers may define more than one column, we'll need to add all of them here. + resultColumnNames.add(sc.getName()); + + context.processedCols.add(sc); } compilationProcessor.compile(); // Second pass builds the analyzer and destination columns - final TrackingRowSet originalRowSet = rowSet; - analyzer.flatResult = rowSet.isFlat(); - // if we preserve a column, we set this to false - analyzer.flattenedResult = !analyzer.flatResult - && allowInternalFlatten - && (columnSources.isEmpty() || !publishTheseSources) - && mode == Mode.SELECT_STATIC; - int numberOfInternallyFlattenedColumns = 0; - final HashMap> resultAlias = new HashMap<>(); - for (final SelectColumn sc : processedCols) { - sc.initInputs(rowSet, allColumnSources); + for (final SelectColumn sc : context.processedCols) { - // When flattening the result, intermediate columns generate results in position space. When we discover - // that a select column depends on an intermediate result, then we must flatten all parent columns so - // that all dependent columns are in the same result-key space. - if (!analyzer.flatResult && analyzer.flattenedResult + // if this select column depends on result column then its updates must happen in result-key-space + final boolean useResultKeySpace = !sourceTable.isFlat() && context.flatResult && Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()) - .anyMatch(resultColumns::contains)) { - analyzer.addLayer(new StaticFlattenLayer(analyzer, rowSet, allColumnSources)); - rowSet = RowSetFactory.flat(rowSet.size()).toTracking(); - analyzer.flatResult = true; + .anyMatch(context.selectedSources::containsKey); - // we must re-initialize the column inputs as they may have changed post-flatten - sc.initInputs(rowSet, allColumnSources); - } + sc.initInputs(rowSet, useResultKeySpace ? context.allSourcesInResultKeySpace : context.allSources); - final boolean isNewResultColumn = resultColumns.add(sc.getName()); - // this shadows any known alias + // TODO (deephaven-core#5760): If layers may define more than one column, we'll need to fix resultAlias. + // new columns shadow known aliases resultAlias.remove(sc.getName()); final Stream allDependencies = @@ -205,43 +186,14 @@ void addLayer(final Layer layer) { if (hasConstantValue(sc)) { final WritableColumnSource constViewSource = SingleValueColumnSource.getSingleValueColumnSource(sc.getReturnedType()); - final String name = sc.getName(); - analyzer.addLayer( - new ConstantColumnLayer(analyzer, name, sc, constViewSource, distinctDeps, mcsBuilder)); + context.addLayer(new ConstantColumnLayer(context, sc, constViewSource, distinctDeps, mcsBuilder)); continue; } - final SourceColumn realColumn; - if (sc instanceof SourceColumn) { - realColumn = (SourceColumn) sc; - } else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) { - realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn(); - } else { - realColumn = null; - } - - if (realColumn != null && shouldPreserve(sc)) { - boolean sourceIsNew = resultColumns.contains(realColumn.getSourceName()); - if (realColumn.getSourceName().equals(sc.getName()) && isNewResultColumn) { - // If this is a "COL_NAME = COL_NAME" identity mapping then it is new iff the result column is new - sourceIsNew = false; - } - if (!sourceIsNew) { - if (numberOfInternallyFlattenedColumns > 0) { - // we must preserve this column, but have already created an analyzer for the internally - // flattened column, therefore must start over without permitting internal flattening - return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources, - useShiftedColumns, false, selectColumns); - } else { - // we can not flatten future columns because we are preserving a column that may not be flat - analyzer.flattenedResult = false; - } - } - - final String name = sc.getName(); - final ColumnSource cs = sc.getDataView(); - analyzer.addLayer(new PreserveColumnLayer(analyzer, name, sc, cs, distinctDeps, mcsBuilder)); - + final SourceColumn realColumn = tryToGetSourceColumn(sc); + if (realColumn != null && shouldPreserve(sc, sc.getDataView())) { + context.addLayer(new PreserveColumnLayer( + context, sc, sc.getDataView(), distinctDeps, mcsBuilder)); continue; } @@ -249,8 +201,8 @@ void addLayer(final Layer layer) { if (realColumn != null) { final ColumnSource alias = resultAlias.get(realColumn.getSourceName()); if (alias != null) { - final String name = sc.getName(); - analyzer.addLayer(new PreserveColumnLayer(analyzer, name, sc, alias, distinctDeps, mcsBuilder)); + context.addLayer(new PreserveColumnLayer( + context, sc, alias, distinctDeps, mcsBuilder)); continue; } } @@ -260,35 +212,30 @@ void addLayer(final Layer layer) { : cs -> resultAlias.put(realColumn.getSourceName(), cs); final long targetDestinationCapacity = - rowSet.isEmpty() ? 0 : (analyzer.flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1); + rowSet.isEmpty() ? 0 : (context.flatResult ? rowSet.size() : rowSet.lastRowKey() + 1); switch (mode) { case VIEW_LAZY: { final ColumnSource viewCs = sc.getLazyView(); maybeCreateAlias.accept(viewCs); - final String name = sc.getName(); - analyzer.addLayer(new ViewColumnLayer(analyzer, name, sc, viewCs, distinctDeps, mcsBuilder)); + context.addLayer(new ViewColumnLayer(context, sc, viewCs, distinctDeps, mcsBuilder)); break; } case VIEW_EAGER: { final ColumnSource viewCs = sc.getDataView(); maybeCreateAlias.accept(viewCs); - final String name = sc.getName(); - analyzer.addLayer(new ViewColumnLayer(analyzer, name, sc, viewCs, distinctDeps, mcsBuilder)); + context.addLayer(new ViewColumnLayer(context, sc, viewCs, distinctDeps, mcsBuilder)); break; } case SELECT_STATIC: { // We need to call newDestInstance because only newDestInstance has the knowledge to endow our // created array with the proper componentType (in the case of Vectors). - final WritableColumnSource scs = analyzer.flatResult || analyzer.flattenedResult + final WritableColumnSource scs = sourceTable.isFlat() || context.flatResult ? sc.newFlatDestInstance(targetDestinationCapacity) : sc.newDestInstance(targetDestinationCapacity); maybeCreateAlias.accept(scs); - final String name = sc.getName(); - analyzer.addLayer(new SelectColumnLayer(updateGraph, rowSet, analyzer, name, sc, scs, null, - distinctDeps, mcsBuilder, false, analyzer.flattenedResult)); - if (analyzer.flattenedResult) { - numberOfInternallyFlattenedColumns++; - } + context.addLayer(new SelectColumnLayer( + updateGraph, rowSet, context, sc, scs, null, distinctDeps, mcsBuilder, false, + useResultKeySpace)); break; } case SELECT_REDIRECTED_STATIC: { @@ -296,16 +243,15 @@ void addLayer(final Layer layer) { final WritableColumnSource scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.size()); maybeCreateAlias.accept(scs); - final String name = sc.getName(); - analyzer.addLayer(new SelectColumnLayer(updateGraph, rowSet, analyzer, name, sc, scs, - underlyingSource, distinctDeps, mcsBuilder, true, false)); + context.addLayer(new SelectColumnLayer( + updateGraph, rowSet, context, sc, scs, underlyingSource, distinctDeps, mcsBuilder, true, + useResultKeySpace)); break; } case SELECT_REDIRECTED_REFRESHING: case SELECT_REFRESHING: { // We need to call newDestInstance because only newDestInstance has the knowledge to endow our // created array with the proper componentType (in the case of Vectors). - // TODO: use DeltaAwareColumnSource WritableColumnSource scs = sc.newDestInstance(targetDestinationCapacity); WritableColumnSource underlyingSource = null; if (rowRedirection != null) { @@ -314,9 +260,9 @@ void addLayer(final Layer layer) { rowRedirection, underlyingSource, rowSet.intSize()); } maybeCreateAlias.accept(scs); - final String name = sc.getName(); - analyzer.addLayer(new SelectColumnLayer(updateGraph, rowSet, analyzer, name, sc, scs, - underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false)); + context.addLayer(new SelectColumnLayer( + updateGraph, rowSet, context, sc, scs, underlyingSource, distinctDeps, mcsBuilder, + rowRedirection != null, useResultKeySpace)); break; } default: @@ -324,8 +270,19 @@ void addLayer(final Layer layer) { } } - return new SelectAndViewAnalyzerWrapper(analyzer, shiftColumn, shiftColumnHasPositiveOffset, remainingCols, - processedCols); + return context; + } + + private static @Nullable SourceColumn tryToGetSourceColumn(final SelectColumn sc) { + final SourceColumn realColumn; + if (sc instanceof SourceColumn) { + realColumn = (SourceColumn) sc; + } else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) { + realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn(); + } else { + realColumn = null; + } + return realColumn; } private static boolean hasConstantArrayAccess(final SelectColumn sc) { @@ -369,97 +326,419 @@ private static boolean hasConstantValue(final SelectColumn sc) { return false; } - private static boolean shouldPreserve(final SelectColumn sc) { - // we already know sc is a SourceColumn or switches to a SourceColumn - final ColumnSource sccs = sc.getDataView(); - return sccs instanceof InMemoryColumnSource && ((InMemoryColumnSource) sccs).isInMemory() + private static boolean shouldPreserve( + final SelectColumn sc, + final ColumnSource columnSource) { + return columnSource instanceof InMemoryColumnSource && ((InMemoryColumnSource) columnSource).isInMemory() && !Vector.class.isAssignableFrom(sc.getReturnedType()); } - /** - * Set the bits in bitset that represent all the new columns. This is used to identify when the select or update - * operation is complete. - * - * @param bitset the bitset to manipulate. - */ - public void setAllNewColumns(BitSet bitset) { - bitset.set(0, layers.size()); - } - - private final List layers = new ArrayList<>(); - private final Map columnToLayerIndex = new HashMap<>(); + /** The layers that make up this analyzer. */ + private final Layer[] layers; - /** Whether the result is already flat. */ - private boolean flatResult = false; /** Whether the result should be flat. */ - private boolean flattenedResult = false; + private final boolean flatResult; - SelectAndViewAnalyzer() { + private final BitSet requiredLayers = new BitSet(); + private final BitSet remainingLayers = new BitSet(); + private SelectAndViewAnalyzer( + final Layer[] layers, + final boolean flatResult) { + this.layers = layers; + this.flatResult = flatResult; + for (final Layer layer : layers) { + if (layer.hasRefreshingLogic()) { + requiredLayers.set(layer.getLayerIndex()); + } else { + this.layers[layer.getLayerIndex()] = null; + } + } } - void addLayer(final Layer layer) { - layers.add(layer); + public final static class AnalyzerContext { + + /** The analyzer that we are building. */ + private final List layers = new ArrayList<>(); + /** The sources that are available to the analyzer, including parent columns. */ + private final Map> allSources = new LinkedHashMap<>(); + /** The sources that are available to the analyzer, including parent columns, in result key space. */ + private final Map> allSourcesInResultKeySpace; + /** The sources that are explicitly defined to the analyzer, including preserved parent columns. */ + private final Map> selectedSources = new HashMap<>(); + /** The sources that are published to the child table. */ + private final Map> publishedSources = new LinkedHashMap<>(); + /** A mapping from result column name to the layer index that created it. */ + private final Map columnToLayerIndex = new HashMap<>(); + /** The select columns that have been processed so far. */ + private final List processedCols = new ArrayList<>(); + + /** A holder for the shift column, if any. */ + private FormulaColumn shiftColumn; + /** Whether the shift column has a positive offset. */ + private boolean shiftColumnHasPositiveOffset; + /** The columns that will need to be processed after the shift column. */ + private List remainingCols; + /** Whether the result should be flat. */ + private boolean flatResult; + /** The layer that will be used to process redirection, if we have one. */ + private int redirectionLayer = -1; + + AnalyzerContext( + final QueryTable sourceTable, + final boolean publishTheseSources, + final boolean flatResult) { + final Map> sources = sourceTable.getColumnSourceMap(); + + this.flatResult = flatResult; + + allSources.putAll(sources); + for (final String columnName : allSources.keySet()) { + columnToLayerIndex.put(columnName, -1); + } + + if (publishTheseSources) { + publishedSources.putAll(sources); + } + + if (!flatResult) { + // result key space is the same as parent key space + allSourcesInResultKeySpace = allSources; + } else { + allSourcesInResultKeySpace = new HashMap<>(); - for (final String columnName : layer.getLayerColumnNames()) { - columnToLayerIndex.put(columnName, layer.getLayerIndex()); + final RowRedirection rowRedirection = new WrappedRowSetRowRedirection(sourceTable.getRowSet()); + allSources.forEach((name, cs) -> allSourcesInResultKeySpace.put(name, + RedirectedColumnSource.maybeRedirect(rowRedirection, cs))); + } } - } - public int getNextLayerIndex() { - return layers.size(); - } + /** + * Add a layer to the analyzer. + * + * @param layer the layer to add + */ + void addLayer(final Layer layer) { + if (layer instanceof RedirectionLayer) { + if (redirectionLayer != -1) { + throw new IllegalStateException("Cannot have more than one redirection layer"); + } + redirectionLayer = layers.size(); + } + + layer.populateColumnSources(allSources); + if (flatResult) { + layer.populateColumnSources(allSourcesInResultKeySpace); + } + layer.populateColumnSources(selectedSources); + layer.populateColumnSources(publishedSources); + + layers.add(layer); + + for (final String columnName : layer.getLayerColumnNames()) { + columnToLayerIndex.put(columnName, layer.getLayerIndex()); + } + } + + /** + * @return the next layerIndex to use + */ + int getNextLayerIndex() { + return layers.size(); + } + + /** + * Return the layerIndex for a given string column. + * + * @param column the name of the column + * + * @return the layerIndex + */ + int getLayerIndexFor(String column) { + final Integer layerIndex = columnToLayerIndex.get(column); + if (layerIndex == null) { + throw new IllegalStateException("Column " + column + " not found in any layer of the analyzer"); + } + return layerIndex; + } + + /** + * Populate the ModifiedColumnSet with all indirect/direct dependencies on the parent table. + * + * @param mcsBuilder the result ModifiedColumnSet to populate + * @param dependencies the immediate dependencies + */ + void populateModifiedColumnSet( + final ModifiedColumnSet mcsBuilder, + final String[] dependencies) { + for (final String dep : dependencies) { + final int layerIndex = getLayerIndexFor(dep); + if (layerIndex != -1) { + mcsBuilder.setAll(layers.get(layerIndex).getModifiedColumnSet()); + } else if (!allSources.containsKey(dep)) { + // we should have blown up during initDef if this is the case + throw new IllegalStateException("Column " + dep + " not found in any layer of the analyzer"); + } else if (!selectedSources.containsKey(dep)) { + // this is a preserved parent column + mcsBuilder.setAll(dep); + } + } + } + + /** + * Populate the layer dependency set with the layer indices that the dependencies are in. + * + * @param layerDependencySet the result bitset to populate + * @param dependencies the dependencies + */ + void populateLayerDependencySet( + final BitSet layerDependencySet, + final String[] dependencies) { + for (final String dep : dependencies) { + final int layerIndex = getLayerIndexFor(dep); + if (layerIndex != -1) { + layerDependencySet.or(layers.get(layerIndex).getLayerDependencySet()); + } else if (!allSources.containsKey(dep)) { + // we should have blown up during initDef if this is the case + throw new IllegalStateException("Column " + dep + " not found in any layer of the analyzer"); + } + // Note that preserved columns do not belong to a layer. + } + } + + /** + * Set the redirection layer in the bitset if the analyzer has any redirection. + * + * @param layerDependencies the result bitset to populate + */ + void setRedirectionLayer(final BitSet layerDependencies) { + if (redirectionLayer != -1) { + layerDependencies.set(redirectionLayer); + } + } + + /** + * @return the column sources explicitly created by the analyzer + */ + public Map> getSelectedColumnSources() { + return selectedSources; + } + + /** + * @return the column sources that are published to the child table + */ + public Map> getPublishedColumnSources() { + // Note that if we have a shift column that we forcefully publish all columns. + return shiftColumn == null ? publishedSources : allSources; + } + + /** + * @return the final analyzer + */ + public SelectAndViewAnalyzer createAnalyzer() { + return new SelectAndViewAnalyzer(layers.toArray(Layer[]::new), flatResult); + } + + /** + * @return which select columns were included in the result (not including the shift, or post-shift, columns) + */ + public List getProcessedColumns() { + return processedCols; + } + + /** + * @return whether the result should be flat + */ + public boolean isFlatResult() { + return flatResult; + } + + /** + * Our job here is to calculate the effects: a map from incoming column to a list of columns that it effects. We + * do this in two stages. In the first stage we create a map from column to (set of dependent columns). In the + * second stage we reverse that map. + * + * @return the effects map + */ + public Map calcEffects() { + final Map> resultMap = getPublishedColumnSources(); + + // Create the mapping from result column to dependent source columns. + final Map dependsOn = new HashMap<>(); + for (final String columnName : resultMap.keySet()) { + final int layerIndex = getLayerIndexFor(columnName); + final String[] dependencies; + if (layerIndex == -1) { + dependencies = new String[] {columnName}; + } else { + dependencies = layers.get(layerIndex).getModifiedColumnSet().dirtyColumnNames(); + } + dependsOn.put(columnName, dependencies); + } + + // Now create the mapping from source column to result columns. + final Map> effects = new HashMap<>(); + for (Map.Entry entry : dependsOn.entrySet()) { + final String depender = entry.getKey(); + for (final String dependee : entry.getValue()) { + effects.computeIfAbsent(dependee, dummy -> new ArrayList<>()).add(depender); + } + } + + // Convert effects type into result type + final Map result = new HashMap<>(); + for (Map.Entry> entry : effects.entrySet()) { + final String[] value = entry.getValue().toArray(String[]::new); + result.put(entry.getKey(), value); + } + return result; + } + + /** + * Shift columns introduce intermediary table operations. This method applies remaining work to the result built + * so far. + * + * @param sourceTable the source table + * @param resultSoFar the intermediate result + * @param updateFlavor the update flavor + * @return the final result + */ + public QueryTable applyShiftsAndRemainingColumns( + final @NotNull QueryTable sourceTable, + @NotNull QueryTable resultSoFar, + final UpdateFlavor updateFlavor) { + if (shiftColumn != null) { + resultSoFar = (QueryTable) ShiftedColumnsFactory.getShiftedColumnsTable( + resultSoFar, shiftColumn, updateFlavor); + } + + // shift columns may introduce modifies that are not present in the original table; set these before using + if (sourceTable.isRefreshing()) { + if (shiftColumn == null && sourceTable.isAddOnly()) { + resultSoFar.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, true); + } + if ((shiftColumn == null || !shiftColumnHasPositiveOffset) && sourceTable.isAppendOnly()) { + // note if the shift offset is non-positive, then this result is still append-only + resultSoFar.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); + } + if (sourceTable.hasAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE)) { + // be convenient for test authors by propagating the test source table attribute + resultSoFar.setAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE, true); + } + if (sourceTable.isBlink()) { + // blink tables, although possibly not useful, can have shift columns + resultSoFar.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true); + } + } - public static abstract class Layer implements LogOutputAppendable { + boolean isMultiStateSelect = shiftColumn != null || remainingCols != null; + if (isMultiStateSelect && (updateFlavor == UpdateFlavor.Select || updateFlavor == UpdateFlavor.View)) { + List newResultColumns = new LinkedList<>(); + for (SelectColumn processed : processedCols) { + newResultColumns.add(new SourceColumn(processed.getName())); + } + if (shiftColumn != null) { + newResultColumns.add(new SourceColumn(shiftColumn.getName())); + } + if (remainingCols != null) { + newResultColumns.addAll(remainingCols); + } - static final int BASE_LAYER_INDEX = 0; - static final int REDIRECTION_LAYER_INDEX = 1; + if (updateFlavor == UpdateFlavor.Select) { + resultSoFar = (QueryTable) resultSoFar.select(newResultColumns); + } else { + resultSoFar = (QueryTable) resultSoFar.view(newResultColumns); + } + } else if (remainingCols != null) { + switch (updateFlavor) { + case Update: { + resultSoFar = (QueryTable) resultSoFar.update(remainingCols); + break; + } + case UpdateView: { + resultSoFar = (QueryTable) resultSoFar.updateView(remainingCols); + break; + } + case LazyUpdate: { + resultSoFar = (QueryTable) resultSoFar.lazyUpdate(remainingCols); + break; + } + default: + throw new IllegalStateException("Unexpected update flavor: " + updateFlavor); + } + } - enum GetMode { - All, New, Published + return resultSoFar; } + } + + static abstract class Layer implements LogOutputAppendable { /** * The layerIndex is used to identify each layer uniquely within the bitsets for completion. */ private final int layerIndex; - public Layer(int layerIndex) { + Layer(int layerIndex) { this.layerIndex = layerIndex; } + /** + * @return which index in the layer stack this layer is + */ int getLayerIndex() { return layerIndex; } + /** + * @return whether this layer has refreshing logic and needs to be updated + */ + boolean hasRefreshingLogic() { + return true; + } + + /** + * @return the modified column set of the parent table that this layer indirectly depends on + */ + ModifiedColumnSet getModifiedColumnSet() { + return failNoRefreshingLogic(); + } + + /** + * @return the layer dependency set indicating which layers this layer depends on + */ + BitSet getLayerDependencySet() { + return new BitSet(); + } + @Override public String toString() { return new LogOutputStringImpl().append(this).toString(); } - public void startTrackingPrev() { + void startTrackingPrev() { // default is that there is nothing to do } + /** + * @return the column names created by this layer + */ abstract Set getLayerColumnNames(); - abstract void populateModifiedColumnSetInReverse( - ModifiedColumnSet mcsBuilder, - Set remainingDepsToSatisfy); - - abstract void populateColumnSources( - Map> result, - GetMode mode); - - abstract void calcDependsOn( - final Map> result, - boolean forcePublishAllSources); - + /** + * Populate the column sources for this layer. + * + * @param result the map to populate + */ + abstract void populateColumnSources(Map> result); + /** + * @return true if this layer allows parallelization across columns + */ abstract boolean allowCrossColumnParallelization(); /** - * Apply this update to this SelectAndViewAnalyzer. + * Apply this update to this Layer. * * @param upstream the upstream update * @param toClear rows that used to exist and no longer exist @@ -467,120 +746,29 @@ abstract void calcDependsOn( * @param jobScheduler scheduler for parallel sub-tasks * @param liveResultOwner {@link LivenessNode node} to be used to manage/unmanage results that happen to be * {@link io.deephaven.engine.liveness.LivenessReferent liveness referents} - * @param onCompletion called when the inner column is complete + * @param onSuccess called when the update completed successfully + * @param onError called when the update failed */ - public abstract CompletionHandler createUpdateHandler( + Runnable createUpdateHandler( TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, @Nullable LivenessNode liveResultOwner, - CompletionHandler onCompletion); - - /** - * A class that handles the completion of one select column. - */ - public static abstract class CompletionHandler { - /** - * Note that the completed columns are shared among the entire operation's completion handlers. - */ - private final BitSet completedColumns; - private final BitSet requiredColumns; - private volatile boolean fired = false; - - /** - * Create a completion handler for a column. Reuses the completedColumns from the provided handler. - * - * @param requiredColumns the columns required for this layer - * @param handler the handler orchestrating when other columns are fired - */ - CompletionHandler(BitSet requiredColumns, CompletionHandler handler) { - this.requiredColumns = requiredColumns; - this.completedColumns = handler.completedColumns; - } - - /** - * Create the final completion handler. - * - * @param requiredColumns the columns required for this handler to fire - * @param completedColumns the set of completed columns, shared with all the other handlers - */ - public CompletionHandler(BitSet requiredColumns, BitSet completedColumns) { - this.requiredColumns = requiredColumns; - this.completedColumns = completedColumns; - } - - /** - * Called when a single column is completed. - *

- * If we are ready, then we call {@link #onAllRequiredColumnsCompleted()}. - *

- * We may not be ready, but other columns downstream of us may be ready, so they are also notified (the - * nextHandler). - * - * @param completedColumn the layerIndex of the completedColumn - */ - void onLayerCompleted(int completedColumn) { - if (!fired) { - boolean readyToFire = false; - synchronized (completedColumns) { - if (!fired) { - completedColumns.set(completedColumn); - if (requiredColumns.get(completedColumn) || requiredColumns.isEmpty()) { - readyToFire = requiredColumns.stream().allMatch(completedColumns::get); - if (readyToFire) { - fired = true; - } - } - } - } - if (readyToFire) { - onAllRequiredColumnsCompleted(); - } - } - } - - protected void onError(Exception error) { - - } - - /** - * Called when all required columns are completed. - */ - protected abstract void onAllRequiredColumnsCompleted(); + Runnable onSuccess, + Consumer onError) { + return failNoRefreshingLogic(); } - } - - public final void populateModifiedColumnSet( - final ModifiedColumnSet mcsBuilder, - final Set remainingDepsToSatisfy) { - for (int ii = layers.size() - 1; ii >= 0; --ii) { - layers.get(ii).populateModifiedColumnSetInReverse(mcsBuilder, remainingDepsToSatisfy); - } - } - public final Map> getAllColumnSources() { - return getColumnSources(Layer.GetMode.All); - } - - public final Map> getNewColumnSources() { - return getColumnSources(Layer.GetMode.New); - } - - public final Map> getPublishedColumnSources() { - return getColumnSources(Layer.GetMode.Published); - } - - private Map> getColumnSources(final Layer.GetMode mode) { - final Map> result = new LinkedHashMap<>(); - for (final Layer layer : layers) { - layer.populateColumnSources(result, mode); + private T failNoRefreshingLogic() { + throw new UnsupportedOperationException(String.format( + "%s does not have any refreshing logic", this.getClass().getSimpleName())); } - return result; } public static class UpdateHelper implements SafeCloseable { private RowSet existingRows; + private TableUpdate upstreamInResultSpace; private SafeCloseablePair shiftedWithModifies; private SafeCloseablePair shiftedWithoutModifies; @@ -592,6 +780,15 @@ public UpdateHelper(RowSet parentRowSet, TableUpdate upstream) { this.upstream = upstream; } + TableUpdate resultKeySpaceUpdate() { + if (upstreamInResultSpace == null) { + upstreamInResultSpace = new TableUpdateImpl( + RowSetFactory.flat(upstream.added().size()), RowSetFactory.empty(), RowSetFactory.empty(), + RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY); + } + return upstreamInResultSpace; + } + private RowSet getExisting() { if (existingRows == null) { existingRows = parentRowSet.minus(upstream.added()); @@ -641,6 +838,10 @@ public void close() { shiftedWithoutModifies.close(); shiftedWithoutModifies = null; } + if (upstreamInResultSpace != null) { + upstreamInResultSpace.release(); + upstreamInResultSpace = null; + } } } @@ -653,7 +854,8 @@ public void close() { * @param jobScheduler scheduler for parallel sub-tasks * @param liveResultOwner {@link LivenessNode node} to be used to manage/unmanage results that happen to be * {@link io.deephaven.engine.liveness.LivenessReferent liveness referents} - * @param onCompletion Called when an inner column is complete. The outer layer should pass the {@code onCompletion} + * @param onSuccess called when the update completed successfully + * @param onError called when the update failed */ public void applyUpdate( final TableUpdate upstream, @@ -661,171 +863,141 @@ public void applyUpdate( final UpdateHelper helper, final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final Layer.CompletionHandler onCompletion) { + final Runnable onSuccess, + final Consumer onError) { - TableUpdateImpl postFlatten = null; - final MutableObject postFlattenHolder = new MutableObject<>(); - final Layer.CompletionHandler[] handlers = new Layer.CompletionHandler[layers.size()]; + remainingLayers.or(requiredLayers); - final BitSet allLayers = new BitSet(); - setAllNewColumns(allLayers); + final Runnable[] runners = new Runnable[layers.length]; + final UpdateScheduler scheduler = new UpdateScheduler(runners, onSuccess, onError); - final Layer.CompletionHandler innerHandler = new Layer.CompletionHandler( - allLayers, onCompletion) { - @Override - protected void onError(Exception error) { - // propagate the error upstream - onCompletion.onError(error); + for (int ii = 0; ii < layers.length; ++ii) { + final Layer layer = layers[ii]; + if (layer != null) { + runners[ii] = layer.createUpdateHandler( + upstream, toClear, helper, jobScheduler, liveResultOwner, + () -> scheduler.onLayerComplete(layer.getLayerIndex()), onError); } + } - @Override - void onLayerCompleted(int completedColumn) { - super.onLayerCompleted(completedColumn); + scheduler.tryToKickOffWork(); + } - for (int ii = 1; ii < layers.size(); ++ii) { - handlers[ii].onLayerCompleted(completedColumn); - } + private class UpdateScheduler { + private final ReentrantLock runLock = new ReentrantLock(); + private final AtomicBoolean needsRun = new AtomicBoolean(); - onCompletion.onLayerCompleted(completedColumn); - } + private final Runnable[] runners; + private final Runnable onSuccess; + private final Consumer onError; - @Override - protected void onAllRequiredColumnsCompleted() { - final TableUpdateImpl update = postFlattenHolder.getValue(); - if (update != null) { - update.release(); - } - } - }; + private volatile boolean updateComplete; - for (int ii = layers.size() - 1; ii >= 0; --ii) { - final Layer currentLayer = layers.get(ii); - handlers[ii] = currentLayer.createUpdateHandler( - postFlatten != null ? postFlatten : upstream, - toClear, helper, jobScheduler, liveResultOwner, innerHandler); + public UpdateScheduler( + final Runnable[] runners, + final Runnable onSuccess, + final Consumer onError) { + this.runners = runners; + this.onSuccess = onSuccess; + this.onError = onError; + } - if (currentLayer instanceof StaticFlattenLayer) { - postFlatten = new TableUpdateImpl(); - postFlatten.added = ((StaticFlattenLayer) currentLayer).getParentRowSetCopy(); - postFlatten.removed = RowSetFactory.empty(); - postFlatten.modified = RowSetFactory.empty(); - postFlatten.modifiedColumnSet = ModifiedColumnSet.EMPTY; - postFlatten.shifted = RowSetShiftData.EMPTY; + public void onLayerComplete(final int layerIndex) { + synchronized (remainingLayers) { + remainingLayers.set(layerIndex, false); } + + tryToKickOffWork(); } - // base layer is invoked manually - handlers[0].onAllRequiredColumnsCompleted(); - } + private void tryToKickOffWork() { + needsRun.set(true); + while (true) { + if (runLock.isHeldByCurrentThread() || !runLock.tryLock()) { + // do not permit re-entry or waiting on another thread doing exactly this work + return; + } - /** - * Our job here is to calculate the effects: a map from incoming column to a list of columns that it effects. We do - * this in two stages. In the first stage we create a map from column to (set of dependent columns). In the second - * stage we reverse that map. - */ - public final Map calcEffects(boolean forcePublishAllResources) { - final Map> dependsOn = calcDependsOn(forcePublishAllResources); + try { + if (needsRun.compareAndSet(true, false)) { + doKickOffWork(); + } + } catch (final Exception exception) { + try { + onError.accept(exception); + } catch (final Exception ignored) { + } + } finally { + runLock.unlock(); + } - // Now create effects, which is the inverse of dependsOn: - // An entry W -> [X, Y, Z] in effects means that W affects X, Y, and Z - final Map> effects = new HashMap<>(); - for (Map.Entry> entry : dependsOn.entrySet()) { - final String depender = entry.getKey(); - for (final String dependee : entry.getValue()) { - effects.computeIfAbsent(dependee, dummy -> new ArrayList<>()).add(depender); + if (!needsRun.get()) { + return; + } } } - // Convert effects type into result type - final Map result = new HashMap<>(); - for (Map.Entry> entry : effects.entrySet()) { - final String[] value = entry.getValue().toArray(String[]::new); - result.put(entry.getKey(), value); - } - return result; - } - final Map> calcDependsOn(boolean forcePublishAllResources) { - final Map> result = new HashMap<>(); - for (final Layer layer : layers) { - layer.calcDependsOn(result, forcePublishAllResources); + private void doKickOffWork() { + if (updateComplete) { + // we may have already completed the update, but are checking again due to the potential of a race + return; + } + + int nextLayer = 0; + while (nextLayer >= 0) { + boolean complete; + boolean readyToFire = false; + Runnable runner = null; + synchronized (remainingLayers) { + complete = remainingLayers.isEmpty(); + nextLayer = remainingLayers.nextSetBit(nextLayer); + + if (nextLayer != -1) { + if ((runner = runners[nextLayer]) != null) { + readyToFire = !layers[nextLayer].getLayerDependencySet().intersects(remainingLayers); + } + + if (readyToFire) { + runners[nextLayer] = null; + } else { + ++nextLayer; + } + } + } + + if (readyToFire) { + runner.run(); + } else if (complete) { + updateComplete = true; + onSuccess.run(); + return; + } + } } - return result; } public void startTrackingPrev() { for (final Layer layer : layers) { - layer.startTrackingPrev(); + if (layer != null) { + layer.startTrackingPrev(); + } } } /** - * Have the column sources already been flattened? Only the STATIC_SELECT case flattens the result. A static flatten - * layer is only added if SelectColumn depends on an intermediate result. + * Is the result of this select/view flat? */ public boolean flatResult() { return flatResult; } - /** - * Was the result internally flattened? Only the STATIC_SELECT case flattens the result. If the result preserves any - * columns, then flattening is not permitted. Because all the other layers cannot internally flatten, the default - * implementation returns false. - */ - public boolean flattenedResult() { - return flattenedResult; - } - - /** - * Return the layerIndex for a given string column. - * - * @param column the name of the column - * - * @return the layerIndex - */ - int getLayerIndexFor(String column) { - return columnToLayerIndex.getOrDefault(column, -1); - } - /** * Can all of our columns permit parallel updates? */ public boolean allowCrossColumnParallelization() { - return layers.stream().allMatch(Layer::allowCrossColumnParallelization); - } - - /** - * Create a completion handler that signals a future when the update is completed. - * - * @param waitForResult a void future indicating success or failure - * - * @return a completion handler that will signal the future - */ - public Layer.CompletionHandler futureCompletionHandler(CompletableFuture waitForResult) { - final BitSet completedColumns = new BitSet(); - final BitSet requiredColumns = new BitSet(); - - setAllNewColumns(requiredColumns); - - return new Layer.CompletionHandler(requiredColumns, completedColumns) { - boolean errorOccurred = false; - - @Override - public void onAllRequiredColumnsCompleted() { - if (errorOccurred) { - return; - } - waitForResult.complete(null); - } - - @Override - protected void onError(Exception error) { - if (errorOccurred) { - return; - } - errorOccurred = true; - waitForResult.completeExceptionally(error); - } - }; + return Arrays.stream(layers) + .filter(Objects::nonNull) + .allMatch(Layer::allowCrossColumnParallelization); } @Override @@ -833,6 +1005,9 @@ public LogOutput append(LogOutput logOutput) { logOutput = logOutput.append("SelectAndViewAnalyzer{"); boolean first = true; for (final Layer layer : layers) { + if (layer == null) { + continue; + } if (first) { first = false; } else { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java deleted file mode 100644 index ec4fcf6f534..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java +++ /dev/null @@ -1,128 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.select.analyzers; - -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.ShiftedColumnsFactory; -import io.deephaven.engine.table.impl.select.FormulaColumn; -import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.engine.table.impl.select.SourceColumn; -import org.jetbrains.annotations.NotNull; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class SelectAndViewAnalyzerWrapper { - public enum UpdateFlavor { - Select, View, Update, UpdateView, LazyUpdate - } - - private final SelectAndViewAnalyzer analyzer; - private final FormulaColumn shiftColumn; - private final boolean shiftColumnHasPositiveOffset; - private final List remainingCols; - private final List processedColumns; - - SelectAndViewAnalyzerWrapper( - SelectAndViewAnalyzer analyzer, - FormulaColumn shiftColumn, - boolean shiftColumnHasPositiveOffset, - List remainingCols, - List processedColumns) { - this.analyzer = analyzer; - this.shiftColumn = shiftColumn; - this.shiftColumnHasPositiveOffset = shiftColumnHasPositiveOffset; - this.remainingCols = remainingCols; - this.processedColumns = processedColumns; - } - - public final Map> getPublishedColumnResources() { - if (shiftColumn == null) { - return analyzer.getPublishedColumnSources(); - } else { - return analyzer.getAllColumnSources(); - } - } - - public final Map calcEffects() { - return analyzer.calcEffects(shiftColumn != null); - } - - public SelectAndViewAnalyzer getAnalyzer() { - return analyzer; - } - - public List getProcessedColumns() { - return processedColumns; - } - - public QueryTable applyShiftsAndRemainingColumns( - @NotNull QueryTable sourceTable, @NotNull QueryTable queryTable, UpdateFlavor updateFlavor) { - if (shiftColumn != null) { - queryTable = (QueryTable) ShiftedColumnsFactory.getShiftedColumnsTable( - queryTable, shiftColumn, updateFlavor); - } - - // shift columns may introduce modifies that are not present in the original table; set these before using - if (sourceTable.isRefreshing()) { - if (shiftColumn == null && sourceTable.isAddOnly()) { - queryTable.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, true); - } - if ((shiftColumn == null || !shiftColumnHasPositiveOffset) && sourceTable.isAppendOnly()) { - // note if the shift offset is non-positive, then this result is still append-only - queryTable.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - } - if (sourceTable.hasAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE)) { - // be convenient for test authors by propagating the test source table attribute - queryTable.setAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE, true); - } - if (sourceTable.isBlink()) { - // blink tables, although possibly not useful, can have shift columns - queryTable.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true); - } - } - - boolean isMultiStateSelect = shiftColumn != null || remainingCols != null; - if (isMultiStateSelect && (updateFlavor == UpdateFlavor.Select || updateFlavor == UpdateFlavor.View)) { - List newResultColumns = new LinkedList<>(); - for (SelectColumn processed : processedColumns) { - newResultColumns.add(new SourceColumn(processed.getName())); - } - if (shiftColumn != null) { - newResultColumns.add(new SourceColumn(shiftColumn.getName())); - } - if (remainingCols != null) { - newResultColumns.addAll(remainingCols); - } - - if (updateFlavor == UpdateFlavor.Select) { - queryTable = (QueryTable) queryTable.select(newResultColumns); - } else { - queryTable = (QueryTable) queryTable.view(newResultColumns); - } - } else if (remainingCols != null) { - switch (updateFlavor) { - case Update: { - queryTable = (QueryTable) queryTable.update(remainingCols); - break; - } - case UpdateView: { - queryTable = (QueryTable) queryTable.updateView(remainingCols); - break; - } - case LazyUpdate: { - queryTable = (QueryTable) queryTable.lazyUpdate(remainingCols); - break; - } - default: - throw new IllegalStateException("Unexpected update flavor: " + updateFlavor); - } - } - - return queryTable; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index b881e981217..431d00ee7e7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -40,23 +40,22 @@ import static io.deephaven.chunk.util.pools.ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY; final public class SelectColumnLayer extends SelectOrViewColumnLayer { - /** - * The same reference as super.columnSource, but as a WritableColumnSource and maybe reinterpreted - */ + /** The same reference as super.columnSource, but as a WritableColumnSource and maybe reinterpreted */ private final WritableColumnSource writableSource; - /** - * The execution context the select column layer was constructed in - */ + /** The execution context the select column layer was constructed in */ private final ExecutionContext executionContext; private final UpdateGraph updateGraph; - /** - * Our parent row set, used for ensuring capacity. - */ + /** Our parent row set, used for ensuring capacity */ private final RowSet parentRowSet; + /** Whether our result is redirected */ private final boolean isRedirected; + /** Whether our result is flattened */ private final boolean flattenedResult; + /** Whether our dependencies are in the result key space instead of parent key space */ + private final boolean sourcesAreInResultKeySpace; + /** Which layers we depend on */ private final BitSet dependencyBitSet; private final boolean canParallelizeThisColumn; private final boolean isSystemic; @@ -72,14 +71,22 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { private ChunkSource.WithPrev chunkSource; SelectColumnLayer( - UpdateGraph updateGraph, RowSet parentRowSet, SelectAndViewAnalyzer inner, String name, SelectColumn sc, - WritableColumnSource ws, WritableColumnSource underlying, String[] deps, ModifiedColumnSet mcsBuilder, - boolean isRedirected, boolean flattenedResult) { - super(inner, name, sc, ws, underlying, deps, mcsBuilder); + final UpdateGraph updateGraph, + final RowSet parentRowSet, + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final WritableColumnSource ws, + final WritableColumnSource underlying, + final String[] deps, + final ModifiedColumnSet mcsBuilder, + final boolean isRedirected, + final boolean sourcesAreInResultKeySpace) { + super(context, sc, ws, underlying, deps, mcsBuilder); this.updateGraph = updateGraph; this.parentRowSet = parentRowSet; this.writableSource = ReinterpretUtils.maybeConvertToWritablePrimitive(ws); this.isRedirected = isRedirected; + this.sourcesAreInResultKeySpace = sourcesAreInResultKeySpace; final ExecutionContext userSuppliedContext = ExecutionContext.getContextToRecord(); if (userSuppliedContext != null) { @@ -90,9 +97,16 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { } dependencyBitSet = new BitSet(); - Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); + Arrays.stream(deps) + .mapToInt(context::getLayerIndexFor) + .filter(layerIndex -> layerIndex >= 0) + .forEach(dependencyBitSet::set); + if (isRedirected) { + // we cannot write to the redirected column until after the redirection has been updated + context.setRedirectionLayer(dependencyBitSet); + } - this.flattenedResult = flattenedResult; + this.flattenedResult = context.isFlatResult(); // We can only parallelize this column if we are not redirected, our destination provides ensure previous, and // the select column is stateless @@ -134,13 +148,33 @@ private ChunkSource getChunkSource() { } @Override - public CompletionHandler createUpdateHandler( - final TableUpdate upstream, + public BitSet getLayerDependencySet() { + return dependencyBitSet; + } + + @Override + public Runnable createUpdateHandler( + final TableUpdate originalUpdate, final RowSet toClear, final SelectAndViewAnalyzer.UpdateHelper helper, final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion) { + final Runnable onSuccess, + final Consumer onError) { + final TableUpdate upstream; + if (!sourcesAreInResultKeySpace) { + upstream = originalUpdate; + } else { + // This better be the static fake update. + Assert.eqTrue(originalUpdate.added().size() == parentRowSet.size(), + "originalUpdate.added().size() == parentRowSet.size()"); + Assert.eqTrue(originalUpdate.removed().isEmpty(), "originalUpdate.removed.isEmpty()"); + Assert.eqTrue(originalUpdate.modified().isEmpty(), "originalUpdate.modified.isEmpty()"); + Assert.eqTrue(originalUpdate.shifted().empty(), "originalUpdate.shifted.empty()"); + + upstream = helper.resultKeySpaceUpdate(); + } + if (upstream.removed().isNonempty()) { if (isRedirected) { clearObjectsAtThisLevel(upstream.removed()); @@ -150,71 +184,66 @@ public CompletionHandler createUpdateHandler( } } - // recurse so that dependent intermediate columns are already updated - return new CompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // We don't want to bother with threads if we are going to process a small update - final long totalSize = upstream.added().size() + upstream.modified().size(); - - // If we have shifts, that makes everything nasty; so we do not want to deal with it - final boolean hasShifts = upstream.shifted().nonempty(); - - final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() - || updateGraph.sharedLock().isHeldByCurrentThread() - || updateGraph.exclusiveLock().isHeldByCurrentThread(); - - if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts && - ((resultTypeIsTableOrRowSet && totalSize > 0) - || totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) { - final long divisionSize = resultTypeIsTableOrRowSet ? 1 - : Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS, - (totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount()); - final List updates = new ArrayList<>(); - // divide up the additions and modifications - try (final RowSequence.Iterator rsAddIt = upstream.added().getRowSequenceIterator(); - final RowSequence.Iterator rsModIt = upstream.modified().getRowSequenceIterator()) { - while (rsAddIt.hasMore() || rsModIt.hasMore()) { - final TableUpdateImpl update = new TableUpdateImpl(); - update.modifiedColumnSet = upstream.modifiedColumnSet(); - update.shifted = RowSetShiftData.EMPTY; - update.removed = RowSetFactory.empty(); - - if (rsAddIt.hasMore()) { - update.added = rsAddIt.getNextRowSequenceWithLength(divisionSize).asRowSet(); - } else { - update.added = RowSetFactory.empty(); - } - - if (update.added.size() < divisionSize && rsModIt.hasMore()) { - update.modified = rsModIt - .getNextRowSequenceWithLength(divisionSize - update.added().size()) - .asRowSet(); - } else { - update.modified = RowSetFactory.empty(); - } + return () -> { + // We don't want to bother with threads if we are going to process a small update + final long totalSize = upstream.added().size() + upstream.modified().size(); + + // If we have shifts, that makes everything nasty; so we do not want to deal with it + final boolean hasShifts = upstream.shifted().nonempty(); + + final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() + || updateGraph.sharedLock().isHeldByCurrentThread() + || updateGraph.exclusiveLock().isHeldByCurrentThread(); + + if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts && + ((resultTypeIsTableOrRowSet && totalSize > 0) + || totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) { + final long divisionSize = resultTypeIsTableOrRowSet ? 1 + : Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS, + (totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount()); + final List updates = new ArrayList<>(); + // divide up the additions and modifications + try (final RowSequence.Iterator rsAddIt = upstream.added().getRowSequenceIterator(); + final RowSequence.Iterator rsModIt = upstream.modified().getRowSequenceIterator()) { + while (rsAddIt.hasMore() || rsModIt.hasMore()) { + final TableUpdateImpl update = new TableUpdateImpl(); + update.modifiedColumnSet = upstream.modifiedColumnSet(); + update.shifted = RowSetShiftData.EMPTY; + update.removed = RowSetFactory.empty(); + + if (rsAddIt.hasMore()) { + update.added = rsAddIt.getNextRowSequenceWithLength(divisionSize).asRowSet(); + } else { + update.added = RowSetFactory.empty(); + } - updates.add(update); + if (update.added.size() < divisionSize && rsModIt.hasMore()) { + update.modified = rsModIt + .getNextRowSequenceWithLength(divisionSize - update.added().size()) + .asRowSet(); + } else { + update.modified = RowSetFactory.empty(); } - } - if (updates.isEmpty()) { - throw new IllegalStateException(); + updates.add(update); } + } - jobScheduler.submit( - executionContext, - () -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper, - liveResultOwner, onCompletion, onCompletion::onError, updates, - serialTableOperationsSafe), - SelectColumnLayer.this, onCompletion::onError); - } else { - jobScheduler.submit( - executionContext, - () -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onCompletion, - serialTableOperationsSafe), - SelectColumnLayer.this, onCompletion::onError); + if (updates.isEmpty()) { + throw new IllegalStateException(); } + + jobScheduler.submit( + executionContext, + () -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper, liveResultOwner, onSuccess, + onError, updates, serialTableOperationsSafe), + SelectColumnLayer.this, onError); + } else { + jobScheduler.submit( + executionContext, + () -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onSuccess, + serialTableOperationsSafe), + SelectColumnLayer.this, onError); } }; } @@ -225,7 +254,7 @@ private void prepareParallelUpdate( final RowSet toClear, final SelectAndViewAnalyzer.UpdateHelper helper, @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion, + final Runnable onSuccess, final Consumer onError, final List splitUpdates, final boolean serialTableOperationsSafe) { @@ -257,7 +286,7 @@ private void prepareParallelUpdate( if (!isRedirected) { clearObjectsAtThisLevel(toClear); } - onCompletion.onLayerCompleted(getLayerIndex()); + onSuccess.run(); }, onError); } @@ -267,7 +296,7 @@ private void doSerialApplyUpdate( final RowSet toClear, final SelectAndViewAnalyzer.UpdateHelper helper, @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion, + final Runnable onSuccess, final boolean serialTableOperationsSafe) { doEnsureCapacity(); final boolean oldSafe = updateGraph.setSerialTableOperationsSafe(serialTableOperationsSafe); @@ -280,7 +309,7 @@ private void doSerialApplyUpdate( if (!isRedirected) { clearObjectsAtThisLevel(toClear); } - onCompletion.onLayerCompleted(getLayerIndex()); + onSuccess.run(); } private void doParallelApplyUpdate( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java index 5a052dc006a..ebf2ac05ec2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java @@ -13,21 +13,18 @@ public abstract class SelectOrViewColumnLayer extends DependencyLayerBase { private final ColumnSource optionalUnderlying; SelectOrViewColumnLayer( - final SelectAndViewAnalyzer analyzer, - final String name, + final SelectAndViewAnalyzer.AnalyzerContext context, final SelectColumn sc, final ColumnSource ws, final ColumnSource optionalUnderlying, final String[] deps, final ModifiedColumnSet mcsBuilder) { - super(analyzer, name, sc, ws, deps, mcsBuilder); + super(context, sc, ws, deps, mcsBuilder); this.optionalUnderlying = optionalUnderlying; } @Override - void populateColumnSources( - final Map> result, - final GetMode mode) { + void populateColumnSources(final Map> result) { result.put(name, columnSource); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java deleted file mode 100644 index d4002a89702..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java +++ /dev/null @@ -1,124 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.select.analyzers; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; -import io.deephaven.engine.liveness.LivenessNode; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.TrackingRowSet; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; -import io.deephaven.engine.table.impl.util.RowRedirection; -import io.deephaven.engine.table.impl.util.WrappedRowSetRowRedirection; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; - -import java.util.BitSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -final public class StaticFlattenLayer extends SelectAndViewAnalyzer.Layer { - private final TrackingRowSet parentRowSet; - private final Map> overriddenColumns; - - StaticFlattenLayer( - final SelectAndViewAnalyzer analyzer, - final TrackingRowSet parentRowSet, - final Map> allColumnSources) { - super(analyzer.getNextLayerIndex()); - this.parentRowSet = parentRowSet; - final HashSet alreadyFlattenedColumns = new HashSet<>(); - analyzer.getNewColumnSources().forEach((name, cs) -> { - alreadyFlattenedColumns.add(name); - }); - - final RowRedirection rowRedirection = new WrappedRowSetRowRedirection(parentRowSet); - overriddenColumns = new HashMap<>(); - allColumnSources.forEach((name, cs) -> { - if (alreadyFlattenedColumns.contains(name)) { - return; - } - - overriddenColumns.put(name, RedirectedColumnSource.maybeRedirect(rowRedirection, cs)); - }); - } - - @Override - Set getLayerColumnNames() { - return Set.of(); - } - - @Override - void populateModifiedColumnSetInReverse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - // we don't have any dependencies, so we don't need to do anything here - } - - @Override - boolean allowCrossColumnParallelization() { - return true; - } - - @Override - void populateColumnSources( - final Map> result, - final GetMode mode) { - // for each overridden column replace it in the result map - for (Map.Entry> entry : overriddenColumns.entrySet()) { - final String columnName = entry.getKey(); - if (result.containsKey(columnName)) { - result.put(columnName, entry.getValue()); - } - } - } - - @Override - void calcDependsOn( - final Map> result, - final boolean forcePublishAllSources) { - // we don't have any dependencies, so we don't need to do anything here - } - - @Override - public CompletionHandler createUpdateHandler( - final TableUpdate upstream, - final RowSet toClear, - final SelectAndViewAnalyzer.UpdateHelper helper, - final JobScheduler jobScheduler, - @Nullable final LivenessNode liveResultOwner, - final CompletionHandler onCompletion) { - // this must be the fake update used to initialize the result table - Assert.eqTrue(upstream.added().isFlat(), "upstream.added.isFlat()"); - Assert.eq(upstream.added().size(), "upstream.added.size()", parentRowSet.size(), "parentRowSet.size()"); - Assert.eqTrue(upstream.removed().isEmpty(), "upstream.removed.isEmpty()"); - Assert.eqTrue(upstream.modified().isEmpty(), "upstream.modified.isEmpty()"); - - final BitSet baseLayerBitSet = new BitSet(); - baseLayerBitSet.set(BASE_LAYER_INDEX); - return new CompletionHandler(baseLayerBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - onCompletion.onLayerCompleted(getLayerIndex()); - } - }; - } - - public RowSet getParentRowSetCopy() { - return parentRowSet.copy(); - } - - @Override - public void startTrackingPrev() { - throw new UnsupportedOperationException("StaticFlattenLayer supports only non-refreshing scenarios"); - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("{StaticFlattenLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java index 23fdb1f270d..3019b5277b1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java @@ -4,41 +4,30 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; -import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.liveness.LivenessReferent; -import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.engine.table.impl.util.JobScheduler; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; final public class ViewColumnLayer extends SelectOrViewColumnLayer { private static final boolean ALLOW_LIVENESS_REFERENT_RESULTS = Configuration.getInstance() .getBooleanForClassWithDefault(ViewColumnLayer.class, "allowLivenessReferentResults", false); - ViewColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource cs, String[] deps, - ModifiedColumnSet mcsBuilder) { - super(inner, name, sc, checkResultType(cs), null, deps, mcsBuilder); + ViewColumnLayer( + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final ColumnSource cs, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(context, sc, checkResultType(cs), null, deps, mcsBuilder); } @Override - public CompletionHandler createUpdateHandler( - final TableUpdate upstream, - final RowSet toClear, - final SelectAndViewAnalyzer.UpdateHelper helper, - final JobScheduler jobScheduler, - @Nullable final LivenessNode liveResultOwner, - final CompletionHandler completionHandler) { - // There should be nothing to do here. - Assert.eqNull(completionHandler, "completionHandler"); - - return null; + public boolean hasRefreshingLogic() { + return false; } @Override