Skip to content

Commit

Permalink
Select Refactoring to Remove Recursion During Update Processing
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Aug 8, 2024
1 parent 9156dd8 commit 7245d85
Show file tree
Hide file tree
Showing 12 changed files with 616 additions and 572 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc

// Init all the rows by cooking up a fake Update
final TableUpdate fakeUpdate = new TableUpdateImpl(
analyzer.alreadyFlattenedSources() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(),
analyzer.flatResult() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(),
RowSetFactory.empty(), RowSetFactory.empty(),
RowSetShiftData.EMPTY, ModifiedColumnSet.ALL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void onUpdate(final TableUpdate upstream) {
}

analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this,
new SelectAndViewAnalyzer.SelectLayerCompletionHandler(allNewColumns, completedColumns) {
new SelectAndViewAnalyzer.Layer.CompletionHandler(allNewColumns, completedColumns) {
@Override
public void onAllRequiredColumnsCompleted() {
completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import java.util.*;

public class BaseLayer extends SelectAndViewAnalyzer {
public class BaseLayer extends SelectAndViewAnalyzer.Layer {
private final Map<String, ColumnSource<?>> sources;
private final boolean publishTheseSources;

Expand All @@ -25,73 +25,62 @@ public class BaseLayer extends SelectAndViewAnalyzer {
}

@Override
int getLayerIndexFor(String column) {
if (sources.containsKey(column)) {
return BASE_LAYER_INDEX;
}
throw new IllegalArgumentException("Unknown column: " + column);
}

@Override
void setBaseBits(BitSet bitset) {
bitset.set(BASE_LAYER_INDEX);
Set<String> getLayerColumnNames() {
return sources.keySet();
}

@Override
public void setAllNewColumns(BitSet bitset) {
bitset.set(BASE_LAYER_INDEX);
}

@Override
void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set<String> remainingDepsToSatisfy) {
void populateModifiedColumnSetInReverse(
final ModifiedColumnSet mcsBuilder,
final Set<String> remainingDepsToSatisfy) {
mcsBuilder.setAll(remainingDepsToSatisfy.toArray(String[]::new));
}

@Override
final Map<String, ColumnSource<?>> getColumnSourcesRecurse(GetMode mode) {
void populateColumnSources(
final Map<String, ColumnSource<?>> result,
final GetMode mode) {
// We specifically return a LinkedHashMap so the columns get populated in order
final Map<String, ColumnSource<?>> result = new LinkedHashMap<>();
if (mode == GetMode.All || (mode == GetMode.Published && publishTheseSources)) {
result.putAll(sources);
}
return result;
}

@Override
public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler,
@Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) {
// nothing to do at the base layer
onCompletion.onLayerCompleted(BASE_LAYER_INDEX);
public CompletionHandler createUpdateHandler(
final TableUpdate upstream,
final RowSet toClear,
final SelectAndViewAnalyzer.UpdateHelper helper,
final JobScheduler jobScheduler,
@Nullable final LivenessNode liveResultOwner,
final CompletionHandler onCompletion) {
return new CompletionHandler(new BitSet(), onCompletion) {
@Override
protected void onAllRequiredColumnsCompleted() {
// nothing to do at the base layer
onCompletion.onLayerCompleted(getLayerIndex());
}
};
}

@Override
final Map<String, Set<String>> calcDependsOnRecurse(boolean forcePublishAllSources) {
final Map<String, Set<String>> result = new HashMap<>();
final void calcDependsOn(
final Map<String, Set<String>> result,
boolean forcePublishAllSources) {
if (publishTheseSources || forcePublishAllSources) {
for (final String col : sources.keySet()) {
result.computeIfAbsent(col, dummy -> new HashSet<>()).add(col);
}
}
return result;
}

@Override
public SelectAndViewAnalyzer getInner() {
return null;
}

@Override
public void startTrackingPrev() {
// nothing to do
boolean allowCrossColumnParallelization() {
return true;
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("{BaseLayer").append(", layerIndex=").append(getLayerIndex()).append("}");
}

@Override
public boolean allowCrossColumnParallelization() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,17 @@

public class ConstantColumnLayer extends SelectOrViewColumnLayer {
private final BitSet dependencyBitSet;
private final boolean flattenedResult;
private final boolean alreadyFlattenedSources;

ConstantColumnLayer(
SelectAndViewAnalyzer inner,
SelectAndViewAnalyzer analyzer,
String name,
SelectColumn sc,
WritableColumnSource<?> ws,
String[] deps,
ModifiedColumnSet mcsBuilder,
boolean flattenedResult,
boolean alreadyFlattenedSources) {
super(inner, name, sc, ws, null, deps, mcsBuilder);
ModifiedColumnSet mcsBuilder) {
super(analyzer, name, sc, ws, null, deps, mcsBuilder);
this.dependencyBitSet = new BitSet();
this.flattenedResult = flattenedResult;
this.alreadyFlattenedSources = alreadyFlattenedSources;
Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set);
Arrays.stream(deps).mapToInt(analyzer::getLayerIndexFor).forEach(dependencyBitSet::set);
initialize(ws);
}

Expand All @@ -60,38 +54,31 @@ private void initialize(final WritableColumnSource<?> writableSource) {
}

@Override
public void applyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper,
final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner,
final SelectLayerCompletionHandler onCompletion) {
public CompletionHandler createUpdateHandler(
final TableUpdate upstream,
final RowSet toClear,
final SelectAndViewAnalyzer.UpdateHelper helper,
final JobScheduler jobScheduler,
@Nullable final LivenessNode liveResultOwner,
final CompletionHandler onCompletion) {
// Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g.
// because they are SelectColumnLayers)
inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner,
new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) {
@Override
public void onAllRequiredColumnsCompleted() {
// we don't need to do anything specific here; our result value is constant
onCompletion.onLayerCompleted(getLayerIndex());
}
});
return new CompletionHandler(dependencyBitSet, onCompletion) {
@Override
public void onAllRequiredColumnsCompleted() {
// we don't need to do anything specific here; our result value is constant
onCompletion.onLayerCompleted(getLayerIndex());
}
};
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}");
}

@Override
public boolean flattenedResult() {
return flattenedResult;
boolean allowCrossColumnParallelization() {
return true;
}

@Override
public boolean alreadyFlattenedSources() {
return alreadyFlattenedSources;
}

@Override
public boolean allowCrossColumnParallelization() {
return inner.allowCrossColumnParallelization();
public LogOutput append(LogOutput logOutput) {
return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
//
package io.deephaven.engine.table.impl.select.analyzers;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.vector.Vector;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.engine.table.ColumnSource;

import java.util.*;

public abstract class DependencyLayerBase extends SelectAndViewAnalyzer {
final SelectAndViewAnalyzer inner;
public abstract class DependencyLayerBase extends SelectAndViewAnalyzer.Layer {
final String name;
final SelectColumn selectColumn;
final boolean selectColumnHoldsVector;
Expand All @@ -21,35 +19,46 @@ public abstract class DependencyLayerBase extends SelectAndViewAnalyzer {
private final String[] dependencies;
final ModifiedColumnSet myModifiedColumnSet;

DependencyLayerBase(SelectAndViewAnalyzer inner, String name, SelectColumn selectColumn,
ColumnSource<?> columnSource,
String[] dependencies, ModifiedColumnSet mcsBuilder) {
super(inner.getLayerIndex() + 1);
this.inner = inner;
DependencyLayerBase(
final SelectAndViewAnalyzer analyzer,
final String name,
final SelectColumn selectColumn,
final ColumnSource<?> columnSource,
final String[] dependencies,
final ModifiedColumnSet mcsBuilder) {
super(analyzer.getNextLayerIndex());
this.name = name;
this.selectColumn = selectColumn;
selectColumnHoldsVector = Vector.class.isAssignableFrom(selectColumn.getReturnedType());
this.columnSource = columnSource;
this.dependencies = dependencies;
final Set<String> remainingDepsToSatisfy = new HashSet<>(Arrays.asList(dependencies));
inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy);
analyzer.populateModifiedColumnSet(mcsBuilder, remainingDepsToSatisfy);
this.myModifiedColumnSet = mcsBuilder;
}

@Override
void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set<String> remainingDepsToSatisfy) {
Set<String> getLayerColumnNames() {
return Set.of(name);
}

@Override
void populateModifiedColumnSetInReverse(
final ModifiedColumnSet mcsBuilder,
final Set<String> remainingDepsToSatisfy) {
// Later-defined columns override earlier-defined columns. So we satisfy column dependencies "on the way
// down" the recursion.
if (remainingDepsToSatisfy.remove(name)) {
// Caller had a dependency on us, so caller gets our dependencies
mcsBuilder.setAll(myModifiedColumnSet);
}
inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy);
}

@Override
final Map<String, Set<String>> calcDependsOnRecurse(boolean forcePublishAllResources) {
final Map<String, Set<String>> result = inner.calcDependsOnRecurse(forcePublishAllResources);
void calcDependsOn(
final Map<String, Set<String>> result,
final boolean forcePublishAllSources) {

final Set<String> thisResult = new HashSet<>();
for (final String dep : dependencies) {
final Set<String> innerDependencies = result.get(dep);
Expand All @@ -61,25 +70,7 @@ final Map<String, Set<String>> calcDependsOnRecurse(boolean forcePublishAllResou
thisResult.addAll(innerDependencies);
}
}
result.put(name, thisResult);
return result;
}

@Override
public SelectAndViewAnalyzer getInner() {
return inner;
}

@Override
int getLayerIndexFor(String column) {
if (name.equals(column)) {
return getLayerIndex();
}
return inner.getLayerIndexFor(column);
}

@Override
void setBaseBits(BitSet bitset) {
inner.setBaseBits(bitset);
result.put(name, thisResult);
}
}
Loading

0 comments on commit 7245d85

Please sign in to comment.