diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index be12408a63b..6b709ee24d3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1536,7 +1536,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc // Init all the rows by cooking up a fake Update final TableUpdate fakeUpdate = new TableUpdateImpl( - analyzer.alreadyFlattenedSources() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(), + analyzer.flatResult() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(), RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.ALL); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 285cdeabb94..6a1c0a653a6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -92,7 +92,7 @@ public void onUpdate(final TableUpdate upstream) { } analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, - new SelectAndViewAnalyzer.SelectLayerCompletionHandler(allNewColumns, completedColumns) { + new SelectAndViewAnalyzer.Layer.CompletionHandler(allNewColumns, completedColumns) { @Override public void onAllRequiredColumnsCompleted() { completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java index fb79fc32e01..01b749efe5e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java @@ -14,7 +14,7 @@ import java.util.*; -public class BaseLayer extends SelectAndViewAnalyzer { +public class BaseLayer extends SelectAndViewAnalyzer.Layer { private final Map> sources; private final boolean publishTheseSources; @@ -25,73 +25,62 @@ public class BaseLayer extends SelectAndViewAnalyzer { } @Override - int getLayerIndexFor(String column) { - if (sources.containsKey(column)) { - return BASE_LAYER_INDEX; - } - throw new IllegalArgumentException("Unknown column: " + column); - } - - @Override - void setBaseBits(BitSet bitset) { - bitset.set(BASE_LAYER_INDEX); + Set getLayerColumnNames() { + return sources.keySet(); } @Override - public void setAllNewColumns(BitSet bitset) { - bitset.set(BASE_LAYER_INDEX); - } - - @Override - void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { + void populateModifiedColumnSetInReverse( + final ModifiedColumnSet mcsBuilder, + final Set remainingDepsToSatisfy) { mcsBuilder.setAll(remainingDepsToSatisfy.toArray(String[]::new)); } @Override - final Map> getColumnSourcesRecurse(GetMode mode) { + void populateColumnSources( + final Map> result, + final GetMode mode) { // We specifically return a LinkedHashMap so the columns get populated in order - final Map> result = new LinkedHashMap<>(); if (mode == GetMode.All || (mode == GetMode.Published && publishTheseSources)) { result.putAll(sources); } - return result; } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { - // nothing to do at the base layer - onCompletion.onLayerCompleted(BASE_LAYER_INDEX); + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion) { + return new CompletionHandler(new BitSet(), onCompletion) { + @Override + protected void onAllRequiredColumnsCompleted() { + // nothing to do at the base layer + onCompletion.onLayerCompleted(getLayerIndex()); + } + }; } @Override - final Map> calcDependsOnRecurse(boolean forcePublishAllSources) { - final Map> result = new HashMap<>(); + final void calcDependsOn( + final Map> result, + boolean forcePublishAllSources) { if (publishTheseSources || forcePublishAllSources) { for (final String col : sources.keySet()) { result.computeIfAbsent(col, dummy -> new HashSet<>()).add(col); } } - return result; } @Override - public SelectAndViewAnalyzer getInner() { - return null; - } - - @Override - public void startTrackingPrev() { - // nothing to do + boolean allowCrossColumnParallelization() { + return true; } @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("{BaseLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); } - - @Override - public boolean allowCrossColumnParallelization() { - return true; - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java index c5eff7f3132..d385cbcfabe 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java @@ -23,23 +23,17 @@ public class ConstantColumnLayer extends SelectOrViewColumnLayer { private final BitSet dependencyBitSet; - private final boolean flattenedResult; - private final boolean alreadyFlattenedSources; ConstantColumnLayer( - SelectAndViewAnalyzer inner, + SelectAndViewAnalyzer analyzer, String name, SelectColumn sc, WritableColumnSource ws, String[] deps, - ModifiedColumnSet mcsBuilder, - boolean flattenedResult, - boolean alreadyFlattenedSources) { - super(inner, name, sc, ws, null, deps, mcsBuilder); + ModifiedColumnSet mcsBuilder) { + super(analyzer, name, sc, ws, null, deps, mcsBuilder); this.dependencyBitSet = new BitSet(); - this.flattenedResult = flattenedResult; - this.alreadyFlattenedSources = alreadyFlattenedSources; - Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); + Arrays.stream(deps).mapToInt(analyzer::getLayerIndexFor).forEach(dependencyBitSet::set); initialize(ws); } @@ -60,38 +54,31 @@ private void initialize(final WritableColumnSource writableSource) { } @Override - public void applyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper, - final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final SelectLayerCompletionHandler onCompletion) { + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion) { // Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g. // because they are SelectColumnLayers) - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we don't need to do anything specific here; our result value is constant - onCompletion.onLayerCompleted(getLayerIndex()); - } - }); + return new CompletionHandler(dependencyBitSet, onCompletion) { + @Override + public void onAllRequiredColumnsCompleted() { + // we don't need to do anything specific here; our result value is constant + onCompletion.onLayerCompleted(getLayerIndex()); + } + }; } @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}"); - } - - @Override - public boolean flattenedResult() { - return flattenedResult; + boolean allowCrossColumnParallelization() { + return true; } @Override - public boolean alreadyFlattenedSources() { - return alreadyFlattenedSources; - } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); + public LogOutput append(LogOutput logOutput) { + return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}"); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java index 67fa89b424a..3c703382ea5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java @@ -3,7 +3,6 @@ // package io.deephaven.engine.table.impl.select.analyzers; -import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.vector.Vector; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.select.SelectColumn; @@ -11,8 +10,7 @@ import java.util.*; -public abstract class DependencyLayerBase extends SelectAndViewAnalyzer { - final SelectAndViewAnalyzer inner; +public abstract class DependencyLayerBase extends SelectAndViewAnalyzer.Layer { final String name; final SelectColumn selectColumn; final boolean selectColumnHoldsVector; @@ -21,35 +19,46 @@ public abstract class DependencyLayerBase extends SelectAndViewAnalyzer { private final String[] dependencies; final ModifiedColumnSet myModifiedColumnSet; - DependencyLayerBase(SelectAndViewAnalyzer inner, String name, SelectColumn selectColumn, - ColumnSource columnSource, - String[] dependencies, ModifiedColumnSet mcsBuilder) { - super(inner.getLayerIndex() + 1); - this.inner = inner; + DependencyLayerBase( + final SelectAndViewAnalyzer analyzer, + final String name, + final SelectColumn selectColumn, + final ColumnSource columnSource, + final String[] dependencies, + final ModifiedColumnSet mcsBuilder) { + super(analyzer.getNextLayerIndex()); this.name = name; this.selectColumn = selectColumn; selectColumnHoldsVector = Vector.class.isAssignableFrom(selectColumn.getReturnedType()); this.columnSource = columnSource; this.dependencies = dependencies; final Set remainingDepsToSatisfy = new HashSet<>(Arrays.asList(dependencies)); - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); + analyzer.populateModifiedColumnSet(mcsBuilder, remainingDepsToSatisfy); this.myModifiedColumnSet = mcsBuilder; } @Override - void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { + Set getLayerColumnNames() { + return Set.of(name); + } + + @Override + void populateModifiedColumnSetInReverse( + final ModifiedColumnSet mcsBuilder, + final Set remainingDepsToSatisfy) { // Later-defined columns override earlier-defined columns. So we satisfy column dependencies "on the way // down" the recursion. if (remainingDepsToSatisfy.remove(name)) { // Caller had a dependency on us, so caller gets our dependencies mcsBuilder.setAll(myModifiedColumnSet); } - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); } @Override - final Map> calcDependsOnRecurse(boolean forcePublishAllResources) { - final Map> result = inner.calcDependsOnRecurse(forcePublishAllResources); + void calcDependsOn( + final Map> result, + final boolean forcePublishAllSources) { + final Set thisResult = new HashSet<>(); for (final String dep : dependencies) { final Set innerDependencies = result.get(dep); @@ -61,25 +70,7 @@ final Map> calcDependsOnRecurse(boolean forcePublishAllResou thisResult.addAll(innerDependencies); } } - result.put(name, thisResult); - return result; - } - @Override - public SelectAndViewAnalyzer getInner() { - return inner; - } - - @Override - int getLayerIndexFor(String column) { - if (name.equals(column)) { - return getLayerIndex(); - } - return inner.getLayerIndexFor(column); - } - - @Override - void setBaseBits(BitSet bitset) { - inner.setBaseBits(bitset); + result.put(name, thisResult); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java index 8d687cdc8c8..b36ccd054d8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java @@ -19,38 +19,47 @@ /** * A layer that copies a column from our input to our output. - * + *

* {@implNote This class is part of the Deephaven engine, and not intended for direct use.} */ final public class PreserveColumnLayer extends DependencyLayerBase { private final BitSet dependencyBitSet; - PreserveColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource cs, String[] deps, - ModifiedColumnSet mcsBuilder) { - super(inner, name, sc, cs, deps, mcsBuilder); + PreserveColumnLayer( + final SelectAndViewAnalyzer analyzer, + final String name, + final SelectColumn sc, + final ColumnSource cs, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(analyzer, name, sc, cs, deps, mcsBuilder); this.dependencyBitSet = new BitSet(); - Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); + Arrays.stream(deps).mapToInt(analyzer::getLayerIndexFor).forEach(dependencyBitSet::set); } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion) { // Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g. // because they are SelectColumnLayers) - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we don't need to do anything specific here - onCompletion.onLayerCompleted(getLayerIndex()); - } - }); + return new CompletionHandler(dependencyBitSet, onCompletion) { + @Override + public void onAllRequiredColumnsCompleted() { + // we don't need to do anything specific here + onCompletion.onLayerCompleted(getLayerIndex()); + } + }; } @Override - Map> getColumnSourcesRecurse(GetMode mode) { - // our column is not a new column, so we need to make sure that we do not double enable previous tracking - final Map> result = inner.getColumnSourcesRecurse(mode); + void populateColumnSources( + final Map> result, + final GetMode mode) { switch (mode) { case New: // we have no new sources @@ -60,13 +69,11 @@ Map> getColumnSourcesRecurse(GetMode mode) { result.put(name, columnSource); break; } - return result; } @Override - public void startTrackingPrev() { - // nothing to do, here but the inner needs to be called - inner.startTrackingPrev(); + boolean allowCrossColumnParallelization() { + return true; } @Override @@ -74,24 +81,4 @@ public LogOutput append(LogOutput logOutput) { return logOutput.append("{PreserveColumnLayer: ").append(name).append(", layerIndex=").append(getLayerIndex()) .append("}"); } - - @Override - public boolean flattenedResult() { - // preserve layer is only flattened if the inner is flattened - // the "flattenedResult" means that we are flattening the table as part of select. For a pre-existing column, we - // could not preserve a layer while flattening, but if we are preserving a newly generated column; it is valid - // for the result to have been flattened as part of select. - return inner.flattenedResult(); - } - - @Override - public boolean alreadyFlattenedSources() { - // a preserve layer is only already flattened if the inner is already flattened - return inner.alreadyFlattenedSources(); - } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java index 8a33acfa00d..df498076cbf 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java @@ -8,9 +8,9 @@ import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.util.*; import io.deephaven.util.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; @@ -23,63 +23,77 @@ * * {@implNote This class is part of the Deephaven engine, and not intended for direct use.} */ -public final class RedirectionLayer extends SelectAndViewAnalyzer { - private final SelectAndViewAnalyzer inner; +public final class RedirectionLayer extends SelectAndViewAnalyzer.Layer { private final TrackingRowSet resultRowSet; private final WritableRowRedirection rowRedirection; private final WritableRowSet freeValues = RowSetFactory.empty(); private long maxInnerIndex; - RedirectionLayer(SelectAndViewAnalyzer inner, TrackingRowSet resultRowSet, WritableRowRedirection rowRedirection) { + RedirectionLayer(SelectAndViewAnalyzer analyzer, TrackingRowSet resultRowSet, + WritableRowRedirection rowRedirection) { super(REDIRECTION_LAYER_INDEX); - Assert.eq(inner.getLayerIndex(), "inner.getLayerIndex()", BASE_LAYER_INDEX); - this.inner = inner; + Assert.eq(analyzer.getNextLayerIndex(), "analyzer.getNextLayerIndex()", REDIRECTION_LAYER_INDEX); this.resultRowSet = resultRowSet; this.rowRedirection = rowRedirection; this.maxInnerIndex = -1; } @Override - int getLayerIndexFor(String column) { - // Result columns' applyUpdate depend on the result of the redirection. - Assert.eq(inner.getLayerIndexFor(column), "inner.getLayerIndexFor(column)", BASE_LAYER_INDEX); - return REDIRECTION_LAYER_INDEX; + Set getLayerColumnNames() { + return Set.of(); + } + + @Override + void populateModifiedColumnSetInReverse( + final ModifiedColumnSet mcsBuilder, + final Set remainingDepsToSatisfy) { + // we don't generate any column sources, so we don't need to do anything here } @Override - void setBaseBits(BitSet bitset) { - inner.setBaseBits(bitset); - bitset.set(REDIRECTION_LAYER_INDEX); + void populateColumnSources( + final Map> result, + final GetMode mode) { + // we don't generate any column sources, so we don't need to do anything here } @Override - public void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); + void calcDependsOn( + final Map> result, + final boolean forcePublishAllSources) { + // we don't generate any column sources, so we don't need to do anything here } @Override - public Map> getColumnSourcesRecurse(GetMode mode) { - return inner.getColumnSourcesRecurse(mode); + boolean allowCrossColumnParallelization() { + return true; } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion) { final BitSet baseLayerBitSet = new BitSet(); - inner.setBaseBits(baseLayerBitSet); - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(baseLayerBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we only have a base layer underneath us, so we do not care about the bitSet; it is always - // empty - doApplyUpdate(upstream, toClear, helper, onCompletion); - } - }); + baseLayerBitSet.set(BASE_LAYER_INDEX); + return new CompletionHandler(baseLayerBitSet, onCompletion) { + @Override + public void onAllRequiredColumnsCompleted() { + // we only have a base layer underneath us, so we do not care about the bitSet; it is always + // empty + doApplyUpdate(upstream, toClear, helper, onCompletion); + } + }; } - private void doApplyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, - SelectLayerCompletionHandler onCompletion) { + private void doApplyUpdate( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final CompletionHandler onCompletion) { // we need to remove the removed values from our row redirection, and add them to our free RowSet; so that // updating tables will not consume more space over the course of a day for abandoned rows final RowSetBuilderRandom innerToFreeBuilder = RowSetFactory.builderRandom(); @@ -153,29 +167,13 @@ private void doApplyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper he onCompletion.onLayerCompleted(REDIRECTION_LAYER_INDEX); } - @Override - public Map> calcDependsOnRecurse(boolean forcePublishAllResources) { - return inner.calcDependsOnRecurse(forcePublishAllResources); - } - - @Override - public SelectAndViewAnalyzer getInner() { - return inner; - } - @Override public void startTrackingPrev() { rowRedirection.startTrackingPrevValues(); - inner.startTrackingPrev(); } @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("{RedirectionLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index 099d70d4eb6..b9b3ed1fb8f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -4,15 +4,18 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.Pair; +import io.deephaven.base.log.LogOutput; import io.deephaven.base.log.LogOutputAppendable; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.rowset.TrackingRowSet; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.MatchPair; import io.deephaven.engine.table.impl.QueryCompilerRequestProcessor; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.select.FormulaColumn; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.SourceColumn; @@ -36,7 +39,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; -public abstract class SelectAndViewAnalyzer implements LogOutputAppendable { +public class SelectAndViewAnalyzer implements LogOutputAppendable { private static final Consumer> NOOP = ignore -> { }; @@ -84,7 +87,9 @@ public static SelectAndViewAnalyzerWrapper create( final boolean allowInternalFlatten, final SelectColumn... selectColumns) { final UpdateGraph updateGraph = sourceTable.getUpdateGraph(); - SelectAndViewAnalyzer analyzer = createBaseLayer(columnSources, publishTheseSources); + final SelectAndViewAnalyzer analyzer = new SelectAndViewAnalyzer(); + analyzer.addLayer(new BaseLayer(columnSources, publishTheseSources)); + final Map> columnDefinitions = new LinkedHashMap<>(); final RowRedirection rowRedirection; if (mode == Mode.SELECT_REDIRECTED_STATIC) { @@ -92,7 +97,7 @@ public static SelectAndViewAnalyzerWrapper create( } else if (mode == Mode.SELECT_REDIRECTED_REFRESHING && rowSet.size() < Integer.MAX_VALUE) { final WritableRowRedirection writableRowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(rowSet.intSize()); - analyzer = analyzer.createRedirectionLayer(rowSet, writableRowRedirection); + analyzer.addLayer(new RedirectionLayer(analyzer, rowSet, writableRowRedirection)); rowRedirection = writableRowRedirection; } else { rowRedirection = null; @@ -141,9 +146,9 @@ public static SelectAndViewAnalyzerWrapper create( // Second pass builds the analyzer and destination columns final TrackingRowSet originalRowSet = rowSet; - boolean flatResult = rowSet.isFlat(); + analyzer.flatResult = rowSet.isFlat(); // if we preserve a column, we set this to false - boolean flattenedResult = !flatResult + analyzer.flattenedResult = !analyzer.flatResult && allowInternalFlatten && (columnSources.isEmpty() || !publishTheseSources) && mode == Mode.SELECT_STATIC; @@ -157,11 +162,12 @@ public static SelectAndViewAnalyzerWrapper create( // When flattening the result, intermediate columns generate results in position space. When we discover // that a select column depends on an intermediate result, then we must flatten all parent columns so // that all dependent columns are in the same result-key space. - if (!flatResult && flattenedResult && Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()) - .anyMatch(resultColumns::contains)) { - analyzer = analyzer.createStaticFlattenLayer(rowSet); + if (!analyzer.flatResult && analyzer.flattenedResult + && Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()) + .anyMatch(resultColumns::contains)) { + analyzer.addLayer(new StaticFlattenLayer(analyzer, rowSet)); rowSet = RowSetFactory.flat(rowSet.size()).toTracking(); - flatResult = true; + analyzer.flatResult = true; // we must re-initialize the column inputs as they may have changed post-flatten sc.initInputs(rowSet, analyzer.getAllColumnSources()); @@ -190,9 +196,9 @@ public static SelectAndViewAnalyzerWrapper create( if (hasConstantValue(sc)) { final WritableColumnSource constViewSource = SingleValueColumnSource.getSingleValueColumnSource(sc.getReturnedType()); - analyzer = analyzer.createLayerForConstantView( - sc.getName(), sc, constViewSource, distinctDeps, mcsBuilder, flattenedResult, - flatResult && flattenedResult); + final String name = sc.getName(); + analyzer.addLayer( + new ConstantColumnLayer(analyzer, name, sc, constViewSource, distinctDeps, mcsBuilder)); continue; } @@ -215,12 +221,13 @@ public static SelectAndViewAnalyzerWrapper create( useShiftedColumns, false, selectColumns); } else { // we can not flatten future columns because we are preserving a column that may not be flat - flattenedResult = false; + analyzer.flattenedResult = false; } } - analyzer = analyzer.createLayerForPreserve( - sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder); + final String name = sc.getName(); + final ColumnSource cs = sc.getDataView(); + analyzer.addLayer(new PreserveColumnLayer(analyzer, name, sc, cs, distinctDeps, mcsBuilder)); continue; } @@ -229,7 +236,8 @@ public static SelectAndViewAnalyzerWrapper create( if (realColumn != null) { final ColumnSource alias = resultAlias.get(realColumn.getSourceName()); if (alias != null) { - analyzer = analyzer.createLayerForPreserve(sc.getName(), sc, alias, distinctDeps, mcsBuilder); + final String name = sc.getName(); + analyzer.addLayer(new PreserveColumnLayer(analyzer, name, sc, alias, distinctDeps, mcsBuilder)); continue; } } @@ -239,30 +247,33 @@ public static SelectAndViewAnalyzerWrapper create( : cs -> resultAlias.put(realColumn.getSourceName(), cs); final long targetDestinationCapacity = - rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1); + rowSet.isEmpty() ? 0 : (analyzer.flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1); switch (mode) { case VIEW_LAZY: { final ColumnSource viewCs = sc.getLazyView(); maybeCreateAlias.accept(viewCs); - analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder); + final String name = sc.getName(); + analyzer.addLayer(new ViewColumnLayer(analyzer, name, sc, viewCs, distinctDeps, mcsBuilder)); break; } case VIEW_EAGER: { final ColumnSource viewCs = sc.getDataView(); maybeCreateAlias.accept(viewCs); - analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder); + final String name = sc.getName(); + analyzer.addLayer(new ViewColumnLayer(analyzer, name, sc, viewCs, distinctDeps, mcsBuilder)); break; } case SELECT_STATIC: { // We need to call newDestInstance because only newDestInstance has the knowledge to endow our // created array with the proper componentType (in the case of Vectors). - final WritableColumnSource scs = - flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity) - : sc.newDestInstance(targetDestinationCapacity); + final WritableColumnSource scs = analyzer.flatResult || analyzer.flattenedResult + ? sc.newFlatDestInstance(targetDestinationCapacity) + : sc.newDestInstance(targetDestinationCapacity); maybeCreateAlias.accept(scs); - analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null, - distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult); - if (flattenedResult) { + final String name = sc.getName(); + analyzer.addLayer(new SelectColumnLayer(updateGraph, rowSet, analyzer, name, sc, scs, null, + distinctDeps, mcsBuilder, false, analyzer.flattenedResult)); + if (analyzer.flattenedResult) { numberOfInternallyFlattenedColumns++; } break; @@ -272,8 +283,9 @@ public static SelectAndViewAnalyzerWrapper create( final WritableColumnSource scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.size()); maybeCreateAlias.accept(scs); - analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, - underlyingSource, distinctDeps, mcsBuilder, true, false, false); + final String name = sc.getName(); + analyzer.addLayer(new SelectColumnLayer(updateGraph, rowSet, analyzer, name, sc, scs, + underlyingSource, distinctDeps, mcsBuilder, true, false)); break; } case SELECT_REDIRECTED_REFRESHING: @@ -289,8 +301,9 @@ public static SelectAndViewAnalyzerWrapper create( rowRedirection, underlyingSource, rowSet.intSize()); } maybeCreateAlias.accept(scs); - analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, - underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false, false); + final String name = sc.getName(); + analyzer.addLayer(new SelectColumnLayer(updateGraph, rowSet, analyzer, name, sc, scs, + underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false)); break; } default: @@ -350,30 +363,6 @@ private static boolean shouldPreserve(final SelectColumn sc) { && !Vector.class.isAssignableFrom(sc.getReturnedType()); } - static final int BASE_LAYER_INDEX = 0; - static final int REDIRECTION_LAYER_INDEX = 1; - - /** - * The layerIndex is used to identify each layer uniquely within the bitsets for completion. - */ - private final int layerIndex; - - public SelectAndViewAnalyzer(int layerIndex) { - this.layerIndex = layerIndex; - } - - int getLayerIndex() { - return layerIndex; - } - - /** - * Set the bits in bitset that represent the base layer and optional redirection layer. No other jobs can be - * executed until all of these bits are set. - * - * @param bitset the bitset to manipulate. - */ - abstract void setBaseBits(BitSet bitset); - /** * Set the bits in bitset that represent all the new columns. This is used to identify when the select or update * operation is complete. @@ -381,69 +370,214 @@ int getLayerIndex() { * @param bitset the bitset to manipulate. */ public void setAllNewColumns(BitSet bitset) { - getInner().setAllNewColumns(bitset); - bitset.set(getLayerIndex()); + // TODO NATE NOCOMMIT: I believe this is actually simply every layer, but dbl check. + bitset.set(0, layers.size()); } - private static SelectAndViewAnalyzer createBaseLayer(Map> sources, - boolean publishTheseSources) { - return new BaseLayer(sources, publishTheseSources); - } + private final List layers = new ArrayList<>(); + private final Map columnToLayerIndex = new HashMap<>(); - private RedirectionLayer createRedirectionLayer(TrackingRowSet resultRowSet, - WritableRowRedirection rowRedirection) { - return new RedirectionLayer(this, resultRowSet, rowRedirection); - } + /** Whether the result is already flat. */ + private boolean flatResult = false; + /** Whether the result should be flat. */ + private boolean flattenedResult = false; - private StaticFlattenLayer createStaticFlattenLayer(TrackingRowSet parentRowSet) { - return new StaticFlattenLayer(this, parentRowSet); - } + SelectAndViewAnalyzer() { - private SelectAndViewAnalyzer createLayerForSelect( - UpdateGraph updateGraph, RowSet parentRowset, String name, SelectColumn sc, WritableColumnSource cs, - WritableColumnSource underlyingSource, String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, - boolean isRedirected, boolean flattenResult, boolean alreadyFlattened) { - return new SelectColumnLayer(updateGraph, parentRowset, this, name, sc, cs, underlyingSource, - parentColumnDependencies, - mcsBuilder, isRedirected, flattenResult, alreadyFlattened); } - private SelectAndViewAnalyzer createLayerForConstantView(String name, SelectColumn sc, WritableColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, boolean flattenResult, - boolean alreadyFlattened) { - return new ConstantColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder, flattenResult, - alreadyFlattened); - } + void addLayer(final Layer layer) { + layers.add(layer); - private SelectAndViewAnalyzer createLayerForView(String name, SelectColumn sc, ColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) { - return new ViewColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder); + for (final String columnName : layer.getLayerColumnNames()) { + columnToLayerIndex.put(columnName, layer.getLayerIndex()); + } } - private SelectAndViewAnalyzer createLayerForPreserve(String name, SelectColumn sc, ColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) { - return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder); + public int getNextLayerIndex() { + return layers.size(); } - abstract void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy); + public static abstract class Layer implements LogOutputAppendable { + + static final int BASE_LAYER_INDEX = 0; + static final int REDIRECTION_LAYER_INDEX = 1; + + enum GetMode { + All, New, Published + } + + /** + * The layerIndex is used to identify each layer uniquely within the bitsets for completion. + */ + private final int layerIndex; + + public Layer(int layerIndex) { + this.layerIndex = layerIndex; + } + + int getLayerIndex() { + return layerIndex; + } + + @Override + public String toString() { + return new LogOutputStringImpl().append(this).toString(); + } + + public void startTrackingPrev() { + // default is that there is nothing to do + } - enum GetMode { - All, New, Published + abstract Set getLayerColumnNames(); + + abstract void populateModifiedColumnSetInReverse( + ModifiedColumnSet mcsBuilder, + Set remainingDepsToSatisfy); + + abstract void populateColumnSources( + Map> result, + GetMode mode); + + abstract void calcDependsOn( + final Map> result, + boolean forcePublishAllSources); + + + abstract boolean allowCrossColumnParallelization(); + + /** + * Apply this update to this SelectAndViewAnalyzer. + * + * @param upstream the upstream update + * @param toClear rows that used to exist and no longer exist + * @param helper convenience class that memoizes reusable calculations for this update + * @param jobScheduler scheduler for parallel sub-tasks + * @param liveResultOwner {@link LivenessNode node} to be used to manage/unmanage results that happen to be + * {@link io.deephaven.engine.liveness.LivenessReferent liveness referents} + * @param onCompletion Called when an inner column is complete. The outer layer should pass the + * {@code onCompletion} + */ + public abstract CompletionHandler createUpdateHandler( + TableUpdate upstream, + RowSet toClear, + UpdateHelper helper, + JobScheduler jobScheduler, + @Nullable LivenessNode liveResultOwner, + CompletionHandler onCompletion); + + /** + * A class that handles the completion of one select column. The handlers are chained together; all downstream + * dependencies may execute when a column completes. + */ + public static abstract class CompletionHandler { + /** + * Note that the completed columns are shared among the entire chain of completion handlers. + */ + private final BitSet completedColumns; + private final CompletionHandler nextHandler; + private final BitSet requiredColumns; + private volatile boolean fired = false; + + /** + * Create a new completion handler that calls nextHandler after its own processing. The completedColumns + * BitSet is shared among all handlers. + * + * @param requiredColumns the columns required for this layer + * @param nextHandler the next handler to call + */ + CompletionHandler(BitSet requiredColumns, CompletionHandler nextHandler) { + this.requiredColumns = requiredColumns; + this.completedColumns = nextHandler.completedColumns; + this.nextHandler = nextHandler; + } + + /** + * Create the final completion handler, which has no next handler. + * + * @param requiredColumns the columns required for this handler to fire + * @param completedColumns the set of completed columns, shared with all the other handlers + */ + public CompletionHandler(BitSet requiredColumns, BitSet completedColumns) { + this.requiredColumns = requiredColumns; + this.completedColumns = completedColumns; + this.nextHandler = null; + } + + /** + * Called when a single column is completed. + *

+ * If we are ready, then we call {@link #onAllRequiredColumnsCompleted()}. + *

+ * We may not be ready, but other columns downstream of us may be ready, so they are also notified (the + * nextHandler). + * + * @param completedColumn the layerIndex of the completedColumn + */ + void onLayerCompleted(int completedColumn) { + if (!fired) { + boolean readyToFire = false; + synchronized (completedColumns) { + if (!fired) { + completedColumns.set(completedColumn); + if (requiredColumns.get(completedColumn) || requiredColumns.isEmpty()) { + readyToFire = requiredColumns.stream().allMatch(completedColumns::get); + if (readyToFire) { + fired = true; + } + } + } + } + if (readyToFire) { + onAllRequiredColumnsCompleted(); + } + } + if (nextHandler != null) { + // TODO: this is still susceptible to a stack overflow if the chain is too long + nextHandler.onLayerCompleted(completedColumn); + } + } + + protected void onError(Exception error) { + if (nextHandler != null) { + nextHandler.onError(error); + } + } + + /** + * Called when all required columns are completed. + */ + protected abstract void onAllRequiredColumnsCompleted(); + } + } + + public final void populateModifiedColumnSet( + final ModifiedColumnSet mcsBuilder, + final Set remainingDepsToSatisfy) { + for (int ii = layers.size() - 1; ii >= 0; --ii) { + layers.get(ii).populateModifiedColumnSetInReverse(mcsBuilder, remainingDepsToSatisfy); + } } public final Map> getAllColumnSources() { - return getColumnSourcesRecurse(GetMode.All); + return getColumnSources(Layer.GetMode.All); } public final Map> getNewColumnSources() { - return getColumnSourcesRecurse(GetMode.New); + return getColumnSources(Layer.GetMode.New); } public final Map> getPublishedColumnSources() { - return getColumnSourcesRecurse(GetMode.Published); + return getColumnSources(Layer.GetMode.Published); } - abstract Map> getColumnSourcesRecurse(GetMode mode); + private Map> getColumnSources(final Layer.GetMode mode) { + final Map> result = new LinkedHashMap<>(); + for (final Layer layer : layers) { + layer.populateColumnSources(result, mode); + } + return result; + } public static class UpdateHelper implements SafeCloseable { private RowSet existingRows; @@ -521,9 +655,35 @@ public void close() { * {@link io.deephaven.engine.liveness.LivenessReferent liveness referents} * @param onCompletion Called when an inner column is complete. The outer layer should pass the {@code onCompletion} */ - public abstract void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, - JobScheduler jobScheduler, @Nullable LivenessNode liveResultOwner, - SelectLayerCompletionHandler onCompletion); + public void applyUpdate( + final TableUpdate upstream, + final RowSet toClear, + final UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + Layer.CompletionHandler onCompletion) { + + // TODO: this leaks as it needs to exist until the last layer referencing it is complete + TableUpdateImpl postFlatten = null; + for (int ii = layers.size() - 1; ii >= 0; --ii) { + final Layer currentLayer = layers.get(ii); + onCompletion = currentLayer.createUpdateHandler( + postFlatten != null ? postFlatten : upstream, + toClear, helper, jobScheduler, liveResultOwner, onCompletion); + + if (currentLayer instanceof StaticFlattenLayer) { + postFlatten = new TableUpdateImpl(); + postFlatten.added = ((StaticFlattenLayer) currentLayer).getParentRowSetCopy(); + postFlatten.removed = RowSetFactory.empty(); + postFlatten.modified = RowSetFactory.empty(); + postFlatten.modifiedColumnSet = ModifiedColumnSet.EMPTY; + postFlatten.shifted = RowSetShiftData.EMPTY; + } + } + + // base layer is invoked manually + onCompletion.onAllRequiredColumnsCompleted(); + } /** * Our job here is to calculate the effects: a map from incoming column to a list of columns that it effects. We do @@ -531,7 +691,7 @@ public abstract void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHel * stage we reverse that map. */ public final Map calcEffects(boolean forcePublishAllResources) { - final Map> dependsOn = calcDependsOnRecurse(forcePublishAllResources); + final Map> dependsOn = calcDependsOn(forcePublishAllResources); // Now create effects, which is the inverse of dependsOn: // An entry W -> [X, Y, Z] in effects means that W affects X, Y, and Z @@ -551,11 +711,27 @@ public final Map calcEffects(boolean forcePublishAllResources) return result; } - abstract Map> calcDependsOnRecurse(boolean forcePublishAllResources); + final Map> calcDependsOn(boolean forcePublishAllResources) { + final Map> result = new HashMap<>(); + for (final Layer layer : layers) { + layer.calcDependsOn(result, forcePublishAllResources); + } + return result; + } - public abstract SelectAndViewAnalyzer getInner(); + public void startTrackingPrev() { + for (final Layer layer : layers) { + layer.startTrackingPrev(); + } + } - public abstract void startTrackingPrev(); + /** + * Have the column sources already been flattened? Only the STATIC_SELECT case flattens the result. A static flatten + * layer is only added if SelectColumn depends on an intermediate result. + */ + public boolean flatResult() { + return flatResult; + } /** * Was the result internally flattened? Only the STATIC_SELECT case flattens the result. If the result preserves any @@ -563,116 +739,25 @@ public final Map calcEffects(boolean forcePublishAllResources) * implementation returns false. */ public boolean flattenedResult() { - return false; - } - - /** - * Have the column sources already been flattened? Only the STATIC_SELECT case flattens the result. A static flatten - * layer is only added if SelectColumn depends on an intermediate result. - */ - public boolean alreadyFlattenedSources() { - return false; + return flattenedResult; } /** * Return the layerIndex for a given string column. * - *

- * This is executed recursively, because later columns in a select statement hide earlier columns. - *

- * * @param column the name of the column * * @return the layerIndex */ - abstract int getLayerIndexFor(String column); + int getLayerIndexFor(String column) { + return columnToLayerIndex.getOrDefault(column, -1); + } /** * Can all of our columns permit parallel updates? */ - abstract public boolean allowCrossColumnParallelization(); - - /** - * A class that handles the completion of one select column. The handlers are chained together; all downstream - * dependencies may execute when a column completes. - */ - public static abstract class SelectLayerCompletionHandler { - /** - * Note that the completed columns are shared among the entire chain of completion handlers. - */ - private final BitSet completedColumns; - private final SelectLayerCompletionHandler nextHandler; - private final BitSet requiredColumns; - private volatile boolean fired = false; - - /** - * Create a new completion handler that calls nextHandler after its own processing. The completedColumns BitSet - * is shared among all handlers. - * - * @param requiredColumns the columns required for this layer - * @param nextHandler the next handler to call - */ - SelectLayerCompletionHandler(BitSet requiredColumns, SelectLayerCompletionHandler nextHandler) { - this.requiredColumns = requiredColumns; - this.completedColumns = nextHandler.completedColumns; - this.nextHandler = nextHandler; - } - - /** - * Create the final completion handler, which has no next handler. - * - * @param requiredColumns the columns required for this handler to fire - * @param completedColumns the set of completed columns, shared with all the other handlers - */ - public SelectLayerCompletionHandler(BitSet requiredColumns, BitSet completedColumns) { - this.requiredColumns = requiredColumns; - this.completedColumns = completedColumns; - this.nextHandler = null; - } - - /** - * Called when a single column is completed. - *

- * If we are ready, then we call {@link #onAllRequiredColumnsCompleted()}. - *

- * We may not be ready, but other columns downstream of us may be ready, so they are also notified (the - * nextHandler). - * - * @param completedColumn the layerIndex of the completedColumn - */ - void onLayerCompleted(int completedColumn) { - if (!fired) { - boolean readyToFire = false; - synchronized (completedColumns) { - if (!fired) { - completedColumns.set(completedColumn); - if (requiredColumns.get(completedColumn) || requiredColumns.isEmpty()) { - readyToFire = requiredColumns.stream().allMatch(completedColumns::get); - if (readyToFire) { - fired = true; - } - } - } - } - if (readyToFire) { - onAllRequiredColumnsCompleted(); - } - } - if (nextHandler != null) { - nextHandler.onLayerCompleted(completedColumn); - } - } - - protected void onError(Exception error) { - if (nextHandler != null) { - nextHandler.onError(error); - } - } - - /** - * Called when all required columns are completed. - */ - protected abstract void onAllRequiredColumnsCompleted(); + public boolean allowCrossColumnParallelization() { + return layers.stream().allMatch(Layer::allowCrossColumnParallelization); } /** @@ -682,13 +767,13 @@ protected void onError(Exception error) { * * @return a completion handler that will signal the future */ - public SelectLayerCompletionHandler futureCompletionHandler(CompletableFuture waitForResult) { + public Layer.CompletionHandler futureCompletionHandler(CompletableFuture waitForResult) { final BitSet completedColumns = new BitSet(); final BitSet requiredColumns = new BitSet(); setAllNewColumns(requiredColumns); - return new SelectLayerCompletionHandler(requiredColumns, completedColumns) { + return new Layer.CompletionHandler(requiredColumns, completedColumns) { boolean errorOccurred = false; @Override @@ -710,6 +795,22 @@ protected void onError(Exception error) { }; } + @Override + public LogOutput append(LogOutput logOutput) { + logOutput = logOutput.append("SelectAndViewAnalyzer{"); + boolean first = true; + for (final Layer layer : layers) { + if (first) { + first = false; + } else { + logOutput = logOutput.append(", "); + } + logOutput = logOutput.append(layer); + + } + return logOutput.append("}"); + } + @Override public String toString() { return new LogOutputStringImpl().append(this).toString(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index b7177e9fe39..c18b215cda5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -57,7 +57,6 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { private final RowSet parentRowSet; private final boolean isRedirected; private final boolean flattenedResult; - private final boolean alreadyFlattenedSources; private final BitSet dependencyBitSet; private final boolean canParallelizeThisColumn; private final boolean isSystemic; @@ -75,7 +74,7 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { SelectColumnLayer( UpdateGraph updateGraph, RowSet parentRowSet, SelectAndViewAnalyzer inner, String name, SelectColumn sc, WritableColumnSource ws, WritableColumnSource underlying, String[] deps, ModifiedColumnSet mcsBuilder, - boolean isRedirected, boolean flattenedResult, boolean alreadyFlattenedSources) { + boolean isRedirected, boolean flattenedResult) { super(inner, name, sc, ws, underlying, deps, mcsBuilder); this.updateGraph = updateGraph; this.parentRowSet = parentRowSet; @@ -94,7 +93,6 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); this.flattenedResult = flattenedResult; - this.alreadyFlattenedSources = alreadyFlattenedSources; // We can only parallelize this column if we are not redirected, our destination provides ensure previous, and // the select column is stateless @@ -136,9 +134,13 @@ private ChunkSource getChunkSource() { } @Override - public void applyUpdate(final TableUpdate upstream, final RowSet toClear, - final UpdateHelper helper, final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final SelectLayerCompletionHandler onCompletion) { + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion) { if (upstream.removed().isNonempty()) { if (isRedirected) { clearObjectsAtThisLevel(upstream.removed()); @@ -149,79 +151,84 @@ public void applyUpdate(final TableUpdate upstream, final RowSet toClear, } // recurse so that dependent intermediate columns are already updated - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // We don't want to bother with threads if we are going to process a small update - final long totalSize = upstream.added().size() + upstream.modified().size(); - - // If we have shifts, that makes everything nasty; so we do not want to deal with it - final boolean hasShifts = upstream.shifted().nonempty(); - - final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() - || updateGraph.sharedLock().isHeldByCurrentThread() - || updateGraph.exclusiveLock().isHeldByCurrentThread(); - - if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts && - ((resultTypeIsTableOrRowSet && totalSize > 0) - || totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) { - final long divisionSize = resultTypeIsTableOrRowSet ? 1 - : Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS, - (totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount()); - final List updates = new ArrayList<>(); - // divide up the additions and modifications - try (final RowSequence.Iterator rsAddIt = upstream.added().getRowSequenceIterator(); - final RowSequence.Iterator rsModIt = upstream.modified().getRowSequenceIterator()) { - while (rsAddIt.hasMore() || rsModIt.hasMore()) { - final TableUpdateImpl update = new TableUpdateImpl(); - update.modifiedColumnSet = upstream.modifiedColumnSet(); - update.shifted = RowSetShiftData.EMPTY; - update.removed = RowSetFactory.empty(); - - if (rsAddIt.hasMore()) { - update.added = rsAddIt.getNextRowSequenceWithLength(divisionSize).asRowSet(); - } else { - update.added = RowSetFactory.empty(); - } - - if (update.added.size() < divisionSize && rsModIt.hasMore()) { - update.modified = rsModIt - .getNextRowSequenceWithLength(divisionSize - update.added().size()) - .asRowSet(); - } else { - update.modified = RowSetFactory.empty(); - } - - updates.add(update); - } + return new CompletionHandler(dependencyBitSet, onCompletion) { + @Override + public void onAllRequiredColumnsCompleted() { + // We don't want to bother with threads if we are going to process a small update + final long totalSize = upstream.added().size() + upstream.modified().size(); + + // If we have shifts, that makes everything nasty; so we do not want to deal with it + final boolean hasShifts = upstream.shifted().nonempty(); + + final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() + || updateGraph.sharedLock().isHeldByCurrentThread() + || updateGraph.exclusiveLock().isHeldByCurrentThread(); + + if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts && + ((resultTypeIsTableOrRowSet && totalSize > 0) + || totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) { + final long divisionSize = resultTypeIsTableOrRowSet ? 1 + : Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS, + (totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount()); + final List updates = new ArrayList<>(); + // divide up the additions and modifications + try (final RowSequence.Iterator rsAddIt = upstream.added().getRowSequenceIterator(); + final RowSequence.Iterator rsModIt = upstream.modified().getRowSequenceIterator()) { + while (rsAddIt.hasMore() || rsModIt.hasMore()) { + final TableUpdateImpl update = new TableUpdateImpl(); + update.modifiedColumnSet = upstream.modifiedColumnSet(); + update.shifted = RowSetShiftData.EMPTY; + update.removed = RowSetFactory.empty(); + + if (rsAddIt.hasMore()) { + update.added = rsAddIt.getNextRowSequenceWithLength(divisionSize).asRowSet(); + } else { + update.added = RowSetFactory.empty(); } - if (updates.isEmpty()) { - throw new IllegalStateException(); + if (update.added.size() < divisionSize && rsModIt.hasMore()) { + update.modified = rsModIt + .getNextRowSequenceWithLength(divisionSize - update.added().size()) + .asRowSet(); + } else { + update.modified = RowSetFactory.empty(); } - jobScheduler.submit( - executionContext, - () -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper, - liveResultOwner, onCompletion, this::onError, updates, - serialTableOperationsSafe), - SelectColumnLayer.this, this::onError); - } else { - jobScheduler.submit( - executionContext, - () -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onCompletion, - serialTableOperationsSafe), - SelectColumnLayer.this, this::onError); + updates.add(update); } } - }); + + if (updates.isEmpty()) { + throw new IllegalStateException(); + } + + jobScheduler.submit( + executionContext, + () -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper, + liveResultOwner, onCompletion, this::onError, updates, + serialTableOperationsSafe), + SelectColumnLayer.this, this::onError); + } else { + jobScheduler.submit( + executionContext, + () -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onCompletion, + serialTableOperationsSafe), + SelectColumnLayer.this, this::onError); + } + } + }; } - private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableUpdate upstream, - final RowSet toClear, final UpdateHelper helper, @Nullable final LivenessNode liveResultOwner, - final SelectLayerCompletionHandler onCompletion, final Consumer onError, - final List splitUpdates, final boolean serialTableOperationsSafe) { + private void prepareParallelUpdate( + final JobScheduler jobScheduler, + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion, + final Consumer onError, + final List splitUpdates, + final boolean serialTableOperationsSafe) { // we have to do removal and previous initialization before we can do any of the actual filling in multiple // threads to avoid concurrency problems with our destination column sources doEnsureCapacity(); @@ -255,8 +262,12 @@ private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableU onError); } - private void doSerialApplyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper, - @Nullable final LivenessNode liveResultOwner, final SelectLayerCompletionHandler onCompletion, + private void doSerialApplyUpdate( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion, final boolean serialTableOperationsSafe) { doEnsureCapacity(); final boolean oldSafe = updateGraph.setSerialTableOperationsSafe(serialTableOperationsSafe); @@ -272,8 +283,11 @@ private void doSerialApplyUpdate(final TableUpdate upstream, final RowSet toClea onCompletion.onLayerCompleted(getLayerIndex()); } - private void doParallelApplyUpdate(final TableUpdate upstream, final UpdateHelper helper, - @Nullable final LivenessNode liveResultOwner, final boolean serialTableOperationsSafe, + private void doParallelApplyUpdate( + final TableUpdate upstream, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final boolean serialTableOperationsSafe, final long startOffset) { final boolean oldSafe = updateGraph.setSerialTableOperationsSafe(serialTableOperationsSafe); try { @@ -285,8 +299,11 @@ private void doParallelApplyUpdate(final TableUpdate upstream, final UpdateHelpe upstream.release(); } - private Boolean doApplyUpdate(final TableUpdate upstream, final UpdateHelper helper, - @Nullable final LivenessNode liveResultOwner, final long startOffset) { + private Boolean doApplyUpdate( + final TableUpdate upstream, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final long startOffset) { final int PAGE_SIZE = 4096; final LongToIntFunction contextSize = (long size) -> size > PAGE_SIZE ? PAGE_SIZE : (int) size; @@ -594,16 +611,6 @@ private void clearObjectsAtThisLevel(RowSet keys) { } } - @Override - public boolean flattenedResult() { - return flattenedResult; - } - - @Override - public boolean alreadyFlattenedSources() { - return alreadyFlattenedSources; - } - @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("{SelectColumnLayer: ").append(selectColumn.toString()).append(", layerIndex=") @@ -612,6 +619,6 @@ public LogOutput append(LogOutput logOutput) { @Override public boolean allowCrossColumnParallelization() { - return selectColumn.isStateless() && inner.allowCrossColumnParallelization(); + return selectColumn.isStateless(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java index 5fbef5b9d74..5a052dc006a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java @@ -12,18 +12,23 @@ public abstract class SelectOrViewColumnLayer extends DependencyLayerBase { private final ColumnSource optionalUnderlying; - SelectOrViewColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, - ColumnSource ws, ColumnSource optionalUnderlying, - String[] deps, ModifiedColumnSet mcsBuilder) { - super(inner, name, sc, ws, deps, mcsBuilder); + SelectOrViewColumnLayer( + final SelectAndViewAnalyzer analyzer, + final String name, + final SelectColumn sc, + final ColumnSource ws, + final ColumnSource optionalUnderlying, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(analyzer, name, sc, ws, deps, mcsBuilder); this.optionalUnderlying = optionalUnderlying; } @Override - final Map> getColumnSourcesRecurse(GetMode mode) { - final Map> result = inner.getColumnSourcesRecurse(mode); + void populateColumnSources( + final Map> result, + final GetMode mode) { result.put(name, columnSource); - return result; } @Override @@ -32,6 +37,5 @@ public void startTrackingPrev() { if (optionalUnderlying != null) { optionalUnderlying.startTrackingPrevValues(); } - inner.startTrackingPrev(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java index 25827b2ca19..194cd5e4e6d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java @@ -7,14 +7,10 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.rowset.TrackingRowSet; -import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; import io.deephaven.engine.table.impl.util.RowRedirection; import io.deephaven.engine.table.impl.util.WrappedRowSetRowRedirection; @@ -24,27 +20,24 @@ import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; -final public class StaticFlattenLayer extends SelectAndViewAnalyzer { - private final SelectAndViewAnalyzer inner; +final public class StaticFlattenLayer extends SelectAndViewAnalyzer.Layer { private final TrackingRowSet parentRowSet; private final Map> overriddenColumns; - StaticFlattenLayer(SelectAndViewAnalyzer inner, TrackingRowSet parentRowSet) { - super(inner.getLayerIndex() + 1); - this.inner = inner; + StaticFlattenLayer(SelectAndViewAnalyzer analyzer, TrackingRowSet parentRowSet) { + super(analyzer.getNextLayerIndex()); this.parentRowSet = parentRowSet; final HashSet alreadyFlattenedColumns = new HashSet<>(); - inner.getNewColumnSources().forEach((name, cs) -> { + analyzer.getNewColumnSources().forEach((name, cs) -> { alreadyFlattenedColumns.add(name); }); final RowRedirection rowRedirection = new WrappedRowSetRowRedirection(parentRowSet); overriddenColumns = new HashMap<>(); - inner.getAllColumnSources().forEach((name, cs) -> { + analyzer.getAllColumnSources().forEach((name, cs) -> { if (alreadyFlattenedColumns.contains(name)) { return; } @@ -54,31 +47,48 @@ final public class StaticFlattenLayer extends SelectAndViewAnalyzer { } @Override - void setBaseBits(BitSet bitset) { - inner.setBaseBits(bitset); + Set getLayerColumnNames() { + return Set.of(); } @Override - void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); + void populateModifiedColumnSetInReverse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { + // we don't have any dependencies, so we don't need to do anything here } @Override - Map> getColumnSourcesRecurse(GetMode mode) { - final Map> innerColumns = inner.getColumnSourcesRecurse(mode); + boolean allowCrossColumnParallelization() { + return true; + } - if (overriddenColumns.keySet().stream().noneMatch(innerColumns::containsKey)) { - return innerColumns; + @Override + void populateColumnSources( + final Map> result, + final GetMode mode) { + // for each overridden column replace it in the result map + for (Map.Entry> entry : overriddenColumns.entrySet()) { + final String columnName = entry.getKey(); + if (result.containsKey(columnName)) { + result.put(columnName, entry.getValue()); + } } + } - final Map> columns = new LinkedHashMap<>(); - innerColumns.forEach((name, cs) -> columns.put(name, overriddenColumns.getOrDefault(name, cs))); - return columns; + @Override + void calcDependsOn( + final Map> result, + final boolean forcePublishAllSources) { + // we don't have any dependencies, so we don't need to do anything here } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler onCompletion) { // this must be the fake update used to initialize the result table Assert.eqTrue(upstream.added().isFlat(), "upstream.added.isFlat()"); Assert.eq(upstream.added().size(), "upstream.added.size()", parentRowSet.size(), "parentRowSet.size()"); @@ -86,61 +96,26 @@ public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helpe Assert.eqTrue(upstream.modified().isEmpty(), "upstream.modified.isEmpty()"); final BitSet baseLayerBitSet = new BitSet(); - inner.setBaseBits(baseLayerBitSet); - final TableUpdate innerUpdate = new TableUpdateImpl( - parentRowSet.copy(), RowSetFactory.empty(), RowSetFactory.empty(), - RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY); - inner.applyUpdate(innerUpdate, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(baseLayerBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - onCompletion.onLayerCompleted(getLayerIndex()); - } - }); - } - - @Override - Map> calcDependsOnRecurse(boolean forcePublishAllResources) { - return inner.calcDependsOnRecurse(forcePublishAllResources); - } - - @Override - public SelectAndViewAnalyzer getInner() { - return inner; + baseLayerBitSet.set(BASE_LAYER_INDEX); + return new CompletionHandler(baseLayerBitSet, onCompletion) { + @Override + public void onAllRequiredColumnsCompleted() { + onCompletion.onLayerCompleted(getLayerIndex()); + } + }; } - @Override - int getLayerIndexFor(String column) { - if (overriddenColumns.containsKey(column)) { - return getLayerIndex(); - } - return inner.getLayerIndexFor(column); + public RowSet getParentRowSetCopy() { + return parentRowSet.copy(); } @Override public void startTrackingPrev() { - throw new UnsupportedOperationException("StaticFlattenLayer is used in only non-refreshing scenarios"); + throw new UnsupportedOperationException("StaticFlattenLayer supports only non-refreshing scenarios"); } @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("{StaticFlattenLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); - } - - @Override - public boolean flattenedResult() { - // this layer performs a flatten, so the result is flattened - return true; - } - - @Override - public boolean alreadyFlattenedSources() { - // this layer performs a flatten, so the sources are now flattened - return true; - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java index 84bdda755a5..28d7c9ac7aa 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java @@ -28,11 +28,17 @@ final public class ViewColumnLayer extends SelectOrViewColumnLayer { } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler completionHandler) { - // To be parallel with SelectColumnLayer, we would recurse here, but since this is ViewColumnLayer - // (and all my inner layers are ViewColumnLayer), there's nothing to do. + public CompletionHandler createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final CompletionHandler completionHandler) { + // There should be nothing to do here. Assert.eqNull(completionHandler, "completionHandler"); + + return null; } @Override @@ -41,12 +47,6 @@ public LogOutput append(LogOutput logOutput) { .append(getLayerIndex()).append("}"); } - @Override - public boolean allowCrossColumnParallelization() { - // this should not actually matter; but false seems like the safe answer for any formula - return false; - } - private static ColumnSource checkResultType(@NotNull final ColumnSource cs) { final Class resultType = cs.getType(); if (!ALLOW_LIVENESS_REFERENT_RESULTS && LivenessReferent.class.isAssignableFrom(resultType)) { @@ -56,4 +56,9 @@ private static ColumnSource checkResultType(@NotNull final ColumnSource cs) { } return cs; } + + @Override + public boolean allowCrossColumnParallelization() { + return selectColumn.isStateless(); + } }