Skip to content

Commit

Permalink
support max_by and min_by
Browse files Browse the repository at this point in the history
use tmp branch

stage

Revert "use tmp branch"

This reverts commit 4ff033429158f0791922dfd6f16a26a85de63de0.

stage
  • Loading branch information
Yohahaha committed Oct 27, 2023
1 parent 478e0c1 commit bfea23c
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 587 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ case class HashAggregateExecTransformer(
val aggregateFunction = expr.aggregateFunction
aggregateFunction match {
case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | _: VarianceSamp |
_: VariancePop | _: Corr | _: CovPopulation | _: CovSample =>
_: VariancePop | _: Corr | _: CovPopulation | _: CovSample | _: MaxMinBy =>
expr.mode match {
case Partial | PartialMerge =>
return true
Expand Down Expand Up @@ -150,7 +150,7 @@ case class HashAggregateExecTransformer(
throw new UnsupportedOperationException(s"${expr.mode} not supported.")
}
expr.aggregateFunction match {
case _: Average | _: First | _: Last =>
case _: Average | _: First | _: Last | _: MaxMinBy =>
// Select first and second aggregate buffer from Velox Struct.
expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 0))
expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 1))
Expand Down Expand Up @@ -245,6 +245,11 @@ case class HashAggregateExecTransformer(
case last: Last =>
structTypeNodes.add(ConverterUtils.getTypeNode(last.dataType, nullable = true))
structTypeNodes.add(ConverterUtils.getTypeNode(BooleanType, nullable = true))
case maxMinBy: MaxMinBy =>
structTypeNodes
.add(ConverterUtils.getTypeNode(maxMinBy.valueExpr.dataType, nullable = true))
structTypeNodes
.add(ConverterUtils.getTypeNode(maxMinBy.orderingExpr.dataType, nullable = true))
case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop =>
// Use struct type to represent Velox Row(BIGINT, DOUBLE, DOUBLE).
structTypeNodes.add(
Expand Down Expand Up @@ -372,7 +377,7 @@ case class HashAggregateExecTransformer(
case sum: Sum if sum.dataType.isInstanceOf[DecimalType] =>
generateMergeCompanionNode()
case _: Average | _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop | _: Corr |
_: CovPopulation | _: CovSample | _: First | _: Last =>
_: CovPopulation | _: CovSample | _: First | _: Last | _: MaxMinBy =>
generateMergeCompanionNode()
case _ =>
val aggFunctionNode = ExpressionBuilder.makeAggregateFunction(
Expand Down Expand Up @@ -404,7 +409,7 @@ case class HashAggregateExecTransformer(
val aggregateFunction = expression.aggregateFunction
aggregateFunction match {
case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | _: VarianceSamp |
_: VariancePop | _: Corr | _: CovPopulation | _: CovSample =>
_: VariancePop | _: Corr | _: CovPopulation | _: CovSample | _: MaxMinBy =>
expression.mode match {
case Partial | PartialMerge =>
typeNodeList.add(getIntermediateTypeNode(aggregateFunction))
Expand Down Expand Up @@ -533,12 +538,13 @@ case class HashAggregateExecTransformer(
case other =>
throw new UnsupportedOperationException(s"$other is not supported.")
}
case _: First | _: Last =>
case _: First | _: Last | _: MaxMinBy =>
aggregateExpression.mode match {
case PartialMerge | Final =>
assert(
functionInputAttributes.size == 2,
s"${aggregateExpression.mode.toString} of First/Last expects two input attributes.")
s"${aggregateExpression.mode.toString} of " +
s"${aggregateFunction.getClass.toString} expects two input attributes.")
// Use a Velox function to combine the intermediate columns into struct.
val childNodes = new util.ArrayList[ExpressionNode](
functionInputAttributes.toList
Expand Down Expand Up @@ -760,8 +766,8 @@ case class HashAggregateExecTransformer(
val aggregateFunc = aggExpr.aggregateFunction
val childrenNodes = new util.ArrayList[ExpressionNode]()
aggregateFunc match {
case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop |
_: VarianceSamp | _: VariancePop | _: Corr | _: CovPopulation | _: CovSample
case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | _: VarianceSamp |
_: VariancePop | _: Corr | _: CovPopulation | _: CovSample | _: MaxMinBy
if aggExpr.mode == PartialMerge | aggExpr.mode == Final =>
// Only occupies one column due to intermediate results are combined
// by previous projection.
Expand Down
Loading

0 comments on commit bfea23c

Please sign in to comment.