Skip to content

Commit

Permalink
Refactoring Based on Review and Discussion
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Aug 14, 2024
1 parent ae8ed6c commit 893d644
Show file tree
Hide file tree
Showing 14 changed files with 796 additions and 1,040 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.deephaven.engine.table.impl.remote.ConstructSnapshot;
import io.deephaven.engine.table.impl.select.*;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
import io.deephaven.engine.table.impl.snapshot.SnapshotIncrementalListener;
import io.deephaven.engine.table.impl.snapshot.SnapshotInternalListener;
import io.deephaven.engine.table.impl.snapshot.SnapshotUtils;
Expand Down Expand Up @@ -72,6 +71,7 @@
import io.deephaven.util.type.ArrayTypeUtils;
import io.deephaven.vector.Vector;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -1498,10 +1498,10 @@ public Table update(final Collection<? extends Selectable> newColumns) {
*/
public SelectValidationResult validateSelect(final SelectColumn... selectColumns) {
final SelectColumn[] clones = SelectColumn.copyFrom(selectColumns);
SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, columns, rowSet, getModifiedColumnSetForUpdates(), true,
SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, getModifiedColumnSetForUpdates(), true,
false, clones);
return new SelectValidationResult(analyzerWrapper.getAnalyzer(), clones);
return new SelectValidationResult(analyzerContext.createAnalyzer(), clones);
}

private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns) {
Expand All @@ -1526,18 +1526,17 @@ private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns)
}
}
final boolean publishTheseSources = flavor == Flavor.Update;
final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create(
this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSources, true,
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create(
this, mode, getModifiedColumnSetForUpdates(), publishTheseSources, true,
selectColumns);

final SelectAndViewAnalyzer analyzer = analyzerWrapper.getAnalyzer();
final SelectColumn[] processedColumns = analyzerWrapper.getProcessedColumns()
final SelectAndViewAnalyzer analyzer = analyzerContext.createAnalyzer();
final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns()
.toArray(SelectColumn[]::new);

// Init all the rows by cooking up a fake Update
final TableUpdate fakeUpdate = new TableUpdateImpl(
analyzer.flatResult() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(),
RowSetFactory.empty(), RowSetFactory.empty(),
rowSet.copy(), RowSetFactory.empty(), RowSetFactory.empty(),
RowSetShiftData.EMPTY, ModifiedColumnSet.ALL);

final CompletableFuture<Void> waitForResult = new CompletableFuture<>();
Expand All @@ -1558,8 +1557,23 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
new SelectAndViewAnalyzer.UpdateHelper(emptyRowSet, fakeUpdate)) {

try {
analyzer.applyUpdate(fakeUpdate, emptyRowSet, updateHelper, jobScheduler,
liveResultCapture, analyzer.futureCompletionHandler(waitForResult));
final MutableBoolean errorOccurred = new MutableBoolean();

analyzer.applyUpdate(
fakeUpdate, emptyRowSet, updateHelper, jobScheduler, liveResultCapture,
() -> {
if (errorOccurred.booleanValue()) {
return;
}
waitForResult.complete(null);
},
err -> {
if (errorOccurred.booleanValue()) {
return;
}
errorOccurred.setTrue();
waitForResult.completeExceptionally(err);
});
} catch (Exception e) {
waitForResult.completeExceptionally(e);
}
Expand All @@ -1580,12 +1594,13 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
}
}

final TrackingRowSet resultRowSet =
analyzer.flattenedResult() ? RowSetFactory.flat(rowSet.size()).toTracking() : rowSet;
resultTable = new QueryTable(resultRowSet, analyzerWrapper.getPublishedColumnResources());
final TrackingRowSet resultRowSet = analyzer.flatResult() && !rowSet.isFlat()
? RowSetFactory.flat(rowSet.size()).toTracking()
: rowSet;
resultTable = new QueryTable(resultRowSet, analyzerContext.getPublishedColumnSources());
if (liveResultCapture != null) {
analyzer.startTrackingPrev();
final Map<String, String[]> effects = analyzerWrapper.calcEffects();
final Map<String, String[]> effects = analyzerContext.calcEffects();
final SelectOrUpdateListener soul = new SelectOrUpdateListener(updateDescription, this,
resultTable, effects, analyzer);
liveResultCapture.transferTo(soul);
Expand All @@ -1596,7 +1611,8 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
resultTable.setFlat();
}
propagateDataIndexes(processedColumns, resultTable);
for (final ColumnSource<?> columnSource : analyzer.getNewColumnSources().values()) {
for (final ColumnSource<?> columnSource : analyzerContext.getSelectedColumnSources()
.values()) {
if (columnSource instanceof PossiblyImmutableColumnSource) {
((PossiblyImmutableColumnSource) columnSource).setImmutable();
}
Expand All @@ -1610,10 +1626,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
} else {
maybeCopyColumnDescriptions(resultTable);
}
SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = flavor == Flavor.Update
? SelectAndViewAnalyzerWrapper.UpdateFlavor.Update
: SelectAndViewAnalyzerWrapper.UpdateFlavor.Select;
return analyzerWrapper.applyShiftsAndRemainingColumns(this, resultTable, updateFlavor);
SelectAndViewAnalyzer.UpdateFlavor updateFlavor = flavor == Flavor.Update
? SelectAndViewAnalyzer.UpdateFlavor.Update
: SelectAndViewAnalyzer.UpdateFlavor.Select;
return analyzerContext.applyShiftsAndRemainingColumns(this, resultTable, updateFlavor);
}));
}

Expand Down Expand Up @@ -1761,15 +1777,17 @@ updateDescription, sizeForInstrumentation(), () -> {
createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
initializeWithSnapshot(humanReadablePrefix, sc, (usePrev, beforeClockValue) -> {
final boolean publishTheseSources = flavor == Flavor.UpdateView;
final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, columns, rowSet,
getModifiedColumnSetForUpdates(), publishTheseSources, true, viewColumns);
final SelectColumn[] processedViewColumns = analyzerWrapper.getProcessedColumns()
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext =
SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.VIEW_EAGER,
getModifiedColumnSetForUpdates(), publishTheseSources, true,
viewColumns);
final SelectColumn[] processedViewColumns = analyzerContext.getProcessedColumns()
.toArray(SelectColumn[]::new);
QueryTable queryTable = new QueryTable(
rowSet, analyzerWrapper.getPublishedColumnResources());
rowSet, analyzerContext.getPublishedColumnSources());
if (sc != null) {
final Map<String, String[]> effects = analyzerWrapper.calcEffects();
final Map<String, String[]> effects = analyzerContext.calcEffects();
final TableUpdateListener listener =
new ViewOrUpdateViewListener(updateDescription, this, queryTable, effects);
sc.setListenerAndResult(listener, queryTable);
Expand All @@ -1786,11 +1804,11 @@ updateDescription, sizeForInstrumentation(), () -> {
} else {
maybeCopyColumnDescriptions(queryTable);
}
final SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor =
final SelectAndViewAnalyzer.UpdateFlavor updateFlavor =
flavor == Flavor.UpdateView
? SelectAndViewAnalyzerWrapper.UpdateFlavor.UpdateView
: SelectAndViewAnalyzerWrapper.UpdateFlavor.View;
queryTable = analyzerWrapper.applyShiftsAndRemainingColumns(
? SelectAndViewAnalyzer.UpdateFlavor.UpdateView
: SelectAndViewAnalyzer.UpdateFlavor.View;
queryTable = analyzerContext.applyShiftsAndRemainingColumns(
this, queryTable, updateFlavor);

result.setValue(queryTable);
Expand Down Expand Up @@ -1851,14 +1869,13 @@ public Table lazyUpdate(final Collection<? extends Selectable> newColumns) {
sizeForInstrumentation(), () -> {
checkInitiateOperation();

final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, columns, rowSet,
getModifiedColumnSetForUpdates(),
true, true, selectColumns);
final SelectColumn[] processedColumns = analyzerWrapper.getProcessedColumns()
final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.create(
this, SelectAndViewAnalyzer.Mode.VIEW_LAZY,
getModifiedColumnSetForUpdates(), true, true, selectColumns);
final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns()
.toArray(SelectColumn[]::new);
final QueryTable result = new QueryTable(
rowSet, analyzerWrapper.getPublishedColumnResources());
rowSet, analyzerContext.getPublishedColumnSources());
if (isRefreshing()) {
addUpdateListener(new ListenerImpl(
"lazyUpdate(" + Arrays.deepToString(processedColumns) + ')', this, result));
Expand All @@ -1868,8 +1885,8 @@ public Table lazyUpdate(final Collection<? extends Selectable> newColumns) {
copySortableColumns(result, processedColumns);
maybeCopyColumnDescriptions(result, processedColumns);

return analyzerWrapper.applyShiftsAndRemainingColumns(
this, result, SelectAndViewAnalyzerWrapper.UpdateFlavor.LazyUpdate);
return analyzerContext.applyShiftsAndRemainingColumns(
this, result, SelectAndViewAnalyzer.UpdateFlavor.LazyUpdate);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler;

import java.util.BitSet;
import java.util.Map;

/**
Expand All @@ -29,8 +28,6 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl {
private final SelectAndViewAnalyzer analyzer;

private volatile boolean updateInProgress = false;
private final BitSet completedColumns = new BitSet();
private final BitSet allNewColumns = new BitSet();
private final boolean enableParallelUpdate;

/**
Expand Down Expand Up @@ -61,7 +58,6 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl {
(QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE
&& getUpdateGraph().parallelismFactor() > 1))
&& analyzer.allowCrossColumnParallelization();
analyzer.setAllNewColumns(allNewColumns);
}

@Override
Expand All @@ -76,7 +72,6 @@ public void onUpdate(final TableUpdate upstream) {
// - create parallel arrays of pre-shift-keys and post-shift-keys so we can move them in chunks

updateInProgress = true;
completedColumns.clear();
final TableUpdate acquiredUpdate = upstream.acquire();

final WritableRowSet toClear = resultRowSet.copyPrev();
Expand All @@ -92,17 +87,8 @@ public void onUpdate(final TableUpdate upstream) {
}

analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this,
new SelectAndViewAnalyzer.Layer.CompletionHandler(allNewColumns, completedColumns) {
@Override
public void onAllRequiredColumnsCompleted() {
completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper);
}

@Override
protected void onError(Exception error) {
handleException(error);
}
});
() -> completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper),
this::handleException);
}

private void handleException(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
import io.deephaven.engine.table.impl.select.FormulaColumn;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.select.WhereFilterFactory;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.util.mutable.MutableInt;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -260,7 +260,7 @@ private static Pair<Table, Filter[]> getShiftedTableFilterPair(
public static Table getShiftedColumnsTable(
@NotNull final Table source,
@NotNull FormulaColumn formulaColumn,
@NotNull SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor) {
@NotNull SelectAndViewAnalyzer.UpdateFlavor updateFlavor) {
String nuggetName = "getShiftedColumnsTable( " + formulaColumn + ", " + updateFlavor + ") ";
return QueryPerformanceRecorder.withNugget(nuggetName, source.sizeForInstrumentation(), () -> {
Table tableSoFar = source;
Expand Down

This file was deleted.

Loading

0 comments on commit 893d644

Please sign in to comment.