From 9b9f8bea48f3959769b3045c08f41fb1b1ed8f2f Mon Sep 17 00:00:00 2001 From: Joey Date: Thu, 14 Mar 2024 11:59:32 +0800 Subject: [PATCH] [VL] Support skewness aggregate function (#4939) --- .../utils/CHExpressionUtil.scala | 3 +- .../HashAggregateExecTransformer.scala | 45 ++++++++++++------- .../utils/VeloxIntermediateData.scala | 21 ++++++--- .../VeloxAggregateFunctionsSuite.scala | 18 ++++++++ .../SubstraitToVeloxPlanValidator.cc | 3 +- docs/velox-backend-support-progress.md | 8 ++-- .../expression/ExpressionMappings.scala | 3 +- .../expression/ExpressionNames.scala | 1 + 8 files changed, 75 insertions(+), 27 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala index 41ebad1379f0..31f2cd533ec1 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala @@ -174,6 +174,7 @@ object CHExpressionUtil { ENCODE -> EncodeDecodeValidator(), ARRAY_EXCEPT -> DefaultValidator(), ARRAY_REPEAT -> DefaultValidator(), - DATE_FROM_UNIX_DATE -> DefaultValidator() + DATE_FROM_UNIX_DATE -> DefaultValidator(), + SKEWNESS -> DefaultValidator() ) } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala index cc7a1a852ad3..565edd9f67aa 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala @@ -386,23 +386,38 @@ abstract class HashAggregateExecTransformer( val adjustedOrders = veloxOrders.map(sparkOrders.indexOf(_)) veloxTypes.zipWithIndex.foreach { case (veloxType, idx) => - val sparkType = sparkTypes(adjustedOrders(idx)) - val attr = rewrittenInputAttributes(adjustedOrders(idx)) - val aggFuncInputAttrNode = ExpressionConverter - .replaceWithExpressionTransformer(attr, originalInputAttributes) - .doTransform(args) - val expressionNode = if (sparkType != veloxType) { - newInputAttributes += - attr.copy(dataType = veloxType)(attr.exprId, attr.qualifier) - ExpressionBuilder.makeCast( - ConverterUtils.getTypeNode(veloxType, attr.nullable), - aggFuncInputAttrNode, - SQLConf.get.ansiEnabled) + val adjustedIdx = adjustedOrders(idx) + if (adjustedIdx == -1) { + // The Velox aggregate intermediate buffer column not found in Spark. + // For example, skewness and kurtosis share the same aggregate buffer in Velox, + // and Kurtosis additionally requires the buffer column of m4, which is + // always 0 for skewness. In Spark, the aggregate buffer of skewness does not + // have the column of m4, thus a placeholder m4 with a value of 0 must be passed + // to Velox, and this value cannot be omitted. Velox will always read m4 column + // when accessing the intermediate data. + val extraAttr = AttributeReference(veloxOrders(idx), veloxType)() + newInputAttributes += extraAttr + val lt = Literal.default(veloxType) + childNodes.add(ExpressionBuilder.makeLiteral(lt.value, lt.dataType, false)) } else { - newInputAttributes += attr - aggFuncInputAttrNode + val sparkType = sparkTypes(adjustedIdx) + val attr = rewrittenInputAttributes(adjustedIdx) + val aggFuncInputAttrNode = ExpressionConverter + .replaceWithExpressionTransformer(attr, originalInputAttributes) + .doTransform(args) + val expressionNode = if (sparkType != veloxType) { + newInputAttributes += + attr.copy(dataType = veloxType)(attr.exprId, attr.qualifier) + ExpressionBuilder.makeCast( + ConverterUtils.getTypeNode(veloxType, attr.nullable), + aggFuncInputAttrNode, + SQLConf.get.ansiEnabled) + } else { + newInputAttributes += attr + aggFuncInputAttrNode + } + childNodes.add(expressionNode) } - childNodes.add(expressionNode) } exprNodes.add(getRowConstructNode(args, childNodes, newInputAttributes, aggFunc)) case other => diff --git a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala index faead2ad1fae..773fedfe9cce 100644 --- a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala +++ b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala @@ -27,18 +27,25 @@ import scala.collection.JavaConverters._ object VeloxIntermediateData { // Agg functions with inconsistent ordering of intermediate data between Velox and Spark. // Corr - val veloxCorrIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg") + private val veloxCorrIntermediateDataOrder: Seq[String] = + Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg") // CovPopulation, CovSample - val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg", "yAvg") + private val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg", "yAvg") + // Skewness + private val veloxSkewnessIntermediateDataOrder: Seq[String] = Seq("n", "avg", "m2", "m3", "m4") // Agg functions with inconsistent types of intermediate data between Velox and Spark. // StddevSamp, StddevPop, VarianceSamp, VariancePop - val veloxVarianceIntermediateTypes: Seq[DataType] = Seq(LongType, DoubleType, DoubleType) + private val veloxVarianceIntermediateTypes: Seq[DataType] = Seq(LongType, DoubleType, DoubleType) // CovPopulation, CovSample - val veloxCovarIntermediateTypes: Seq[DataType] = Seq(DoubleType, LongType, DoubleType, DoubleType) + private val veloxCovarIntermediateTypes: Seq[DataType] = + Seq(DoubleType, LongType, DoubleType, DoubleType) // Corr - val veloxCorrIntermediateTypes: Seq[DataType] = + private val veloxCorrIntermediateTypes: Seq[DataType] = Seq(DoubleType, LongType, DoubleType, DoubleType, DoubleType, DoubleType) + // Skewness + private val veloxSkewnessIntermediateTypes: Seq[DataType] = + Seq(LongType, DoubleType, DoubleType, DoubleType, DoubleType) /** * Return the intermediate columns order of Velox aggregation functions, with special matching @@ -55,6 +62,8 @@ object VeloxIntermediateData { veloxCorrIntermediateDataOrder case _: CovPopulation | _: CovSample => veloxCovarIntermediateDataOrder + case _: Skewness => + veloxSkewnessIntermediateDataOrder case _ => aggFunc.aggBufferAttributes.map(_.name) } @@ -134,6 +143,8 @@ object VeloxIntermediateData { Some(veloxCovarIntermediateTypes) case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => Some(veloxVarianceIntermediateTypes) + case _: Skewness => + Some(veloxSkewnessIntermediateTypes) case _ if aggFunc.aggBufferAttributes.size > 1 => Some(aggFunc.aggBufferAttributes.map(_.dataType)) case _ => None diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala index 48b0a36c0138..26bea5b1c9ba 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala @@ -857,6 +857,24 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu |""".stripMargin)( df => assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecTransformer]) == 2)) } + + test("skewness") { + runQueryAndCompare(""" + |select skewness(l_partkey) from lineitem; + |""".stripMargin) { + checkOperatorMatch[HashAggregateExecTransformer] + } + runQueryAndCompare("select skewness(l_partkey), count(distinct l_orderkey) from lineitem") { + df => + { + assert( + getExecutedPlan(df).count( + plan => { + plan.isInstanceOf[HashAggregateExecTransformer] + }) == 4) + } + } + } } class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 0e81ba91a34f..3826465b3c29 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -1138,7 +1138,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag "corr", "covar_pop", "covar_samp", - "approx_distinct"}; + "approx_distinct", + "skewness"}; for (const auto& funcSpec : funcSpecs) { auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec); diff --git a/docs/velox-backend-support-progress.md b/docs/velox-backend-support-progress.md index 4dcd27a340e7..639211e11ced 100644 --- a/docs/velox-backend-support-progress.md +++ b/docs/velox-backend-support-progress.md @@ -87,7 +87,7 @@ Gluten supports 28 operators (Draw to right to see all data types) Gluten supports 199 functions. (Draw to right to see all data types) | Spark Functions | Velox/Presto Functions | Velox/Spark functions | Gluten | Restrictions | BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | DATE | TIMESTAMP | STRING | DECIMAL | NULL | BINARS | CALENDAR | ARRAY | MAP | STRUCT | UDT | -|-------------------------------|------------------------|-----------------------|--------|------------------------|---------|------|-------|-----|------|-------|--------|------|-----------|--------|---------|------|--------| -------- |-------| ---- | ------ | ---- | +|-------------------------------|------------------------|-----------------------|--------|------------------------|---------|------|-------|-----|------|-------|--------|------|-----------|--------|---------|------|--------| -------- |-------| ---- |--------| ---- | | ! | | not | S | | S | S | S | S | S | S | S | | | S | | | | | | | | | | != | neq | | S | | S | S | S | S | S | S | S | | | S | | | | | | | | | | % | mod | remainder | S | Ansi Off | | S | S | S | S | S | | | | | | | | | | | | | @@ -372,7 +372,7 @@ Gluten supports 199 functions. (Draw to right to see all data types) | mean | avg | | S | Ansi Off | | | | | | | | | | | | | | | | | | | | min | min | | S | | | | S | S | S | S | S | | | | | | | | | | | | | min_by | | | S | | | | | | | | | | | | | | | | | | | | -| skewness | | | | | | | | | | | | | | | | | | | | | | | +| skewness | skewness | skewness | S | | | | S | S | S | S | S | | | | | | | | | | | | | some | | | | | | | | | | | | | | | | | | | | | | | | std,stddev | stddev | | S | | | | S | S | S | S | S | | | | | | | | | | | | | stddev,std | stddev | | S | | | | S | S | S | S | S | | | | | | | | | | | | @@ -387,7 +387,7 @@ Gluten supports 199 functions. (Draw to right to see all data types) | lag | | | | | | | | | | | | | | | | | | | | | | | | lead | | | | | | | | | | | | | | | | | | | | | | | | nth_value | nth_value | nth_value | PS | | | | | | | | | | | | | | | | | | | | -| ntile | ntile | ntile | S | | | | | | | | | | | | | | | | | | | | +| ntile | ntile | ntile | S | | | | | | | | | | | | | | | | | | | | | percent_rank | percent_rank | | S | | | | | | | | | | | | | | | | | | | | | rank | rank | | S | | | | | | | | | | | | | | | | | | | | | row_number | row_number | | S | | | | S | S | S | | | | | | | | | | | | | | @@ -404,7 +404,7 @@ Gluten supports 199 functions. (Draw to right to see all data types) | coalesce | | | PS | | | | | | | | | | | | | | | | | | | | | crc32 | crc32 | | S | | | | | | | | | | | S | | | | | | | | | | current_user | | | S* | | | | | | | | | | | S | | | | | | | | | -| current_catalog | | | S | | | | | | | | | | | | | | | | | | | | +| current_catalog | | | S | | | | | | | | | | | | | | | | | | | | | current_database | | | S | | | | | | | | | | | | | | | | | | | | | greatest | greatest | greatest | S | | | | | | S | S | S | S | S | | | | | | | | | | | hash | hash | hash | S | | S | S | S | S | S | S | S | | | | | | | | | | | | diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala index a063c244b4a5..6fb9d80b003d 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala @@ -269,7 +269,8 @@ object ExpressionMappings { Sig[CovPopulation](COVAR_POP), Sig[CovSample](COVAR_SAMP), Sig[Last](LAST), - Sig[First](FIRST) + Sig[First](FIRST), + Sig[Skewness](SKEWNESS) ) /** Mapping Spark window expression to Substrait function name */ diff --git a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala index d2fc4f9ec354..c5a00b51f4c4 100644 --- a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala @@ -45,6 +45,7 @@ object ExpressionNames { final val FIRST = "first" final val FIRST_IGNORE_NULL = "first_ignore_null" final val APPROX_DISTINCT = "approx_distinct" + final val SKEWNESS = "skewness" // Function names used by Substrait plan. final val ADD = "add"