Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Add multi-column support to AggFormula #6206

Merged
merged 24 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6e248ee
Trying to get python to work.
lbooker42 Oct 14, 2024
33c39ee
Refactored formula aggregation and specifications.
lbooker42 Oct 16, 2024
07d8cf6
Added add'l agg_by test.
lbooker42 Oct 16, 2024
4e7011b
WIP, more work on GRPC.
lbooker42 Oct 16, 2024
dd0e936
Fixing some tests.
lbooker42 Oct 16, 2024
043654d
Fixing some tests.
lbooker42 Oct 17, 2024
03d4bf6
Fixing some tests.
lbooker42 Oct 18, 2024
fa6a456
Merge branch 'main' into lab-agg-multi-formula
lbooker42 Oct 18, 2024
a13ac00
Refactored to use SelectColumn.
lbooker42 Oct 21, 2024
a625c33
Minor cleanup.
lbooker42 Oct 21, 2024
e566029
Self-review, GRPC client tests now passing.
lbooker42 Oct 31, 2024
e0caf7f
Addressed some PR comments.
lbooker42 Nov 7, 2024
a025323
Added tests for correct propagation of MCS for AggFormula
lbooker42 Nov 7, 2024
a4e320a
Merged with main.
lbooker42 Nov 7, 2024
da008ce
Fix agg.py imports.
lbooker42 Nov 7, 2024
81923de
PR comments addressed.
lbooker42 Nov 11, 2024
bf997d5
More comments addressed and TODO added.
lbooker42 Nov 11, 2024
68719b5
Reworked table.proto to add Selectable and adjust Formula to use it.
lbooker42 Nov 11, 2024
103db51
Better Selectable handling for GRPC aggregations.
lbooker42 Nov 12, 2024
2d5ba12
Update the python argument for formula()
lbooker42 Nov 12, 2024
cc8f58c
Provide key column values as scalars to AggFormula.
lbooker42 Nov 12, 2024
22b2364
Addressed PR comments.
lbooker42 Nov 15, 2024
aec23c8
Broken build fix.
lbooker42 Nov 18, 2024
f040167
Correct agg test to remove wavg()
lbooker42 Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -159,7 +158,7 @@ private enum Type {
* types converted to {@link io.deephaven.vector.Vector vectors}. This can be computed once and re-used across all
* formula aggregations.
*/
private Map<String, ColumnDefinition<?>> vectorColumnNameMap;
private Map<String, ColumnDefinition<?>> vectorColumnDefinitions;

/**
* Convert a collection of {@link Aggregation aggregations} to an {@link AggregationContextFactory}.
Expand Down Expand Up @@ -717,28 +716,32 @@ public void visit(@NotNull final Partition partition) {

@Override
public void visit(@NotNull final Formula formula) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final SelectColumn selectColumn =
SelectColumn.of(Selectable.of(formula.column(), RawString.of(formula.formula())));
final SelectColumn selectColumn = SelectColumn.of(formula.selectable());
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

// Get or create a column definition map composed of vectors of the original column types.
if (vectorColumnNameMap == null) {
vectorColumnNameMap = new HashMap<>();
if (vectorColumnDefinitions == null) {
vectorColumnDefinitions = table.getDefinition().getColumnStream().collect(Collectors.toMap(
ColumnDefinition::getName,
(final ColumnDefinition<?> cd) -> ColumnDefinition.fromGenericType(
cd.getName(),
VectorFactory.forElementType(cd.getDataType()).vectorType(),
cd.getDataType())));
table.getColumnSourceMap().forEach((key, value) -> {
final ColumnDefinition<?> columnDef = ColumnDefinition.fromGenericType(
key, VectorFactory.forElementType(value.getType()).vectorType());
vectorColumnNameMap.put(key, columnDef);
vectorColumnDefinitions.put(key, columnDef);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
});
}

// Get the input column names from the formula and provide them to the groupBy operator
final String[] inputColumns =
selectColumn.initDef(vectorColumnNameMap, compilationProcessor).toArray(String[]::new);
selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);

final GroupByChunkedOperator groupByChunkedOperator = new GroupByChunkedOperator(table, false, null,
Arrays.stream(inputColumns).map(col -> MatchPair.of(Pair.parse(col)))
.toArray(MatchPair[]::new));

final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(
final FormulaMultiColumnChunkedOperator op = new FormulaMultiColumnChunkedOperator(table,
groupByChunkedOperator, true, selectColumn);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
addNoInputOperator(op);
}
Expand Down Expand Up @@ -783,7 +786,6 @@ public void visit(@NotNull final AggSpecFormula formula) {
unsupportedForBlinkTables("Formula");
final GroupByChunkedOperator groupByChunkedOperator = new GroupByChunkedOperator(table, false, null,
resultPairs.stream().map(pair -> MatchPair.of((Pair) pair.input())).toArray(MatchPair[]::new));

final FormulaChunkedOperator formulaChunkedOperator = new FormulaChunkedOperator(groupByChunkedOperator,
true, formula.formula(), formula.paramToken(), compilationProcessor,
MatchPair.fromPairs(resultPairs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,8 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(@NotNull final Quer
resultColumnModifiedColumnSets[ci] = resultTable.newModifiedColumnSet(resultColumnNames[ci]);
}
if (delegateToBy) {
// We cannot use the groupBy's result MCS factory, because the result column names are not guaranteed to be
// the
// same.
// We cannot use the groupBy's result MCS factory, because the result column names are not guaranteed
// to be the same.
groupBy.initializeRefreshing(resultTable, aggregationUpdateListener);
}
// Note that we also use the factory in propagateUpdates to identify the set of modified columns to handle.
Expand Down Expand Up @@ -377,7 +376,7 @@ public SingletonContext makeSingletonContext(final int size) {
private class DataFillerContext implements SafeCloseable {

private final boolean[] columnsToFillMask;
protected final FillFromContext[] fillFromContexts;
final FillFromContext[] fillFromContexts;

private DataFillerContext(final boolean @NotNull [] columnsToFillMask) {
this.columnsToFillMask = columnsToFillMask;
Expand Down
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
*/
class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOperator {

private final QueryTable inputTable;

private final GroupByChunkedOperator groupBy;
// TODO: `delegateToBy` is always true, for this class and FormulaChunkedOperator. Can we remove this and all
// the checks?
private final boolean delegateToBy;
private final SelectColumn selectColumn;
private final WritableColumnSource<?> resultColumn;
Expand All @@ -50,18 +50,20 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp
private ModifiedColumnSet updateUpstreamModifiedColumnSet;

/**
* Construct an operator for applying a formula to a grouped table.
* Construct an operator for applying a formula to slot-vectors over an aggregated table..
*
* @param groupBy The {@link GroupByChunkedOperator} to use for tracking indices
* @param delegateToBy Whether this operator is responsible for passing methods through to {@code groupBy}. Should
* be false if {@code groupBy} is updated by the helper (and {@code groupBy} must come before this operator
* if so), or if this is not the first operator sharing {@code groupBy}.
* be false if {@code groupBy} is updated by the helper, or if this is not the first operator sharing
* {@code groupBy}.
* @param selectColumn The formula column that will produce the results
*/
FormulaMultiColumnChunkedOperator(
@NotNull final QueryTable inputTable,
@NotNull final GroupByChunkedOperator groupBy,
final boolean delegateToBy,
@NotNull final SelectColumn selectColumn) {
this.inputTable = inputTable;
this.groupBy = groupBy;
this.delegateToBy = delegateToBy;
this.selectColumn = selectColumn;
Expand Down Expand Up @@ -238,11 +240,30 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(@NotNull final Quer
// guaranteed to be the same.
groupBy.initializeRefreshing(resultTable, aggregationUpdateListener);
}

// Note that we also use the factory in propagateUpdates to identify the set of modified columns to handle.
final ModifiedColumnSet resultModifiedColumnSet = resultTable
.newModifiedColumnSet(getResultColumns().keySet().toArray(String[]::new));
if (selectColumn.getColumns().isEmpty()) {
return inputToResultModifiedColumnSetFactory = input -> ModifiedColumnSet.EMPTY;
}

return inputToResultModifiedColumnSetFactory = upstreamModifiedColumnSet -> resultModifiedColumnSet;
final ModifiedColumnSet updateMCS = new ModifiedColumnSet(resultTable.getModifiedColumnSetForUpdates());
final ModifiedColumnSet resultMCS = resultTable.newModifiedColumnSet(selectColumn.getName());
final String[] inputColumnNames = selectColumn.getColumns().toArray(String[]::new);
final ModifiedColumnSet[] modifiedColumnSets = selectColumn.getColumns().stream()
.map(ignored -> resultMCS).toArray(ModifiedColumnSet[]::new);
final ModifiedColumnSet.Transformer transformer = inputTable.newModifiedColumnSetTransformer(
inputColumnNames,
modifiedColumnSets);

return inputToResultModifiedColumnSetFactory = input -> {
if (groupBy.getSomeKeyHasAddsOrRemoves()) {
return resultMCS;
} else if (groupBy.getSomeKeyHasModifies()) {
transformer.clearAndTransform(input, updateMCS);
return updateMCS;
}
return ModifiedColumnSet.EMPTY;
};
}

@Override
Expand Down Expand Up @@ -288,14 +309,10 @@ public void propagateUpdates(@NotNull final TableUpdate downstream,
if (removesToProcess && !resultColumn.getType().isPrimitive()) {
dataCopyContext.clearObjectColumnData(downstream.removed());
}
if (modifiesToProcess && addsToProcess) {
// Union the rowsets and do a single pass.
try (final RowSequence combinedRowSequence = downstream.modified().union(downstream.added())) {
dataCopyContext.copyData(combinedRowSequence);
}
} else if (modifiesToProcess) {
if (modifiesToProcess) {
dataCopyContext.copyData(downstream.modified());
} else {
}
if (addsToProcess) {
dataCopyContext.copyData(downstream.added());
}
}
Expand Down Expand Up @@ -324,7 +341,7 @@ public SingletonContext makeSingletonContext(final int size) {
*/
private class DataFillerContext implements SafeCloseable {

protected final FillFromContext fillFromContext;
final FillFromContext fillFromContext;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

private DataFillerContext() {
fillFromContext = resultColumn.makeFillFromContext(BLOCK_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,4 +614,12 @@ public boolean requiresRowKeys() {
public boolean unchunkedRowSet() {
return true;
}

boolean getSomeKeyHasAddsOrRemoves() {
return someKeyHasAddsOrRemoves;
}

boolean getSomeKeyHasModifies() {
return someKeyHasModifies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public static void validate(
}
}

private boolean lastSupported;
private boolean hasUnsupportedArgs;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

private boolean isSupported(@NotNull final Aggregation aggregation) {
aggregation.walk(this);
return lastSupported;
return !hasUnsupportedArgs;
}

@Override
Expand All @@ -54,36 +54,36 @@ public void visit(@NotNull final Aggregations aggregations) {

@Override
public void visit(@NotNull final ColumnAggregation columnAgg) {
lastSupported = columnAgg.spec() instanceof AggSpecGroup;
hasUnsupportedArgs |= !(columnAgg.spec() instanceof AggSpecGroup);
}

@Override
public void visit(@NotNull final ColumnAggregations columnAggs) {
lastSupported = columnAggs.spec() instanceof AggSpecGroup;
hasUnsupportedArgs |= !(columnAggs.spec() instanceof AggSpecGroup);
}

@Override
public void visit(@NotNull final Count count) {
lastSupported = false;
hasUnsupportedArgs = true;
}

@Override
public void visit(@NotNull final FirstRowKey firstRowKey) {
lastSupported = false;
hasUnsupportedArgs = true;
}

@Override
public void visit(@NotNull final LastRowKey lastRowKey) {
lastSupported = false;
hasUnsupportedArgs = true;
}

@Override
public void visit(@NotNull final Partition partition) {
lastSupported = false;
hasUnsupportedArgs = true;
}

@Override
public void visit(@NotNull final Formula formula) {
lastSupported = false;
hasUnsupportedArgs = true;
}
}
Loading