From 3b9a3ea093e64e6b4a3b208dc7f805292ecf117b Mon Sep 17 00:00:00 2001 From: Joey Date: Fri, 29 Mar 2024 17:18:18 +0800 Subject: [PATCH] [VL] Support kurtosis aggregate function (#5151) --- .../glutenproject/utils/CHExpressionUtil.scala | 3 ++- .../utils/VeloxIntermediateData.scala | 17 +++++++++-------- .../VeloxAggregateFunctionsSuite.scala | 18 ++++++++++++++++++ .../substrait/SubstraitToVeloxPlanValidator.cc | 3 ++- docs/velox-backend-support-progress.md | 6 +++--- .../expression/ExpressionMappings.scala | 1 + .../sql-tests/results/group-by.sql.out | 2 +- .../velox/VeloxSQLQueryTestSettings.scala | 4 +++- .../sql-tests/results/group-by.sql.out | 2 +- .../sql-tests/results/udf/udf-group-by.sql.out | 2 +- .../velox/VeloxSQLQueryTestSettings.scala | 4 +++- .../sql-tests/results/group-by.sql.out | 2 +- .../sql-tests/results/udf/udf-group-by.sql.out | 2 +- .../velox/VeloxSQLQueryTestSettings.scala | 4 +++- .../expression/ExpressionNames.scala | 3 ++- 15 files changed, 51 insertions(+), 22 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala index 6dfed9dd6e95..d431d0c87387 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/utils/CHExpressionUtil.scala @@ -180,6 +180,7 @@ object CHExpressionUtil { URL_DECODE -> DefaultValidator(), SKEWNESS -> DefaultValidator(), BIT_LENGTH -> DefaultValidator(), - MAKE_YM_INTERVAL -> DefaultValidator() + MAKE_YM_INTERVAL -> DefaultValidator(), + KURTOSIS -> DefaultValidator() ) } diff --git a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala index 773fedfe9cce..0c3508b5d580 100644 --- a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala +++ b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala @@ -31,8 +31,9 @@ object VeloxIntermediateData { Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg") // CovPopulation, CovSample private val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg", "yAvg") - // Skewness - private val veloxSkewnessIntermediateDataOrder: Seq[String] = Seq("n", "avg", "m2", "m3", "m4") + // Skewness, Kurtosis + private val veloxCentralMomentAggIntermediateDataOrder: Seq[String] = + Seq("n", "avg", "m2", "m3", "m4") // Agg functions with inconsistent types of intermediate data between Velox and Spark. // StddevSamp, StddevPop, VarianceSamp, VariancePop @@ -43,8 +44,8 @@ object VeloxIntermediateData { // Corr private val veloxCorrIntermediateTypes: Seq[DataType] = Seq(DoubleType, LongType, DoubleType, DoubleType, DoubleType, DoubleType) - // Skewness - private val veloxSkewnessIntermediateTypes: Seq[DataType] = + // Skewness, Kurtosis + private val veloxCentralMomentAggIntermediateTypes: Seq[DataType] = Seq(LongType, DoubleType, DoubleType, DoubleType, DoubleType) /** @@ -62,8 +63,8 @@ object VeloxIntermediateData { veloxCorrIntermediateDataOrder case _: CovPopulation | _: CovSample => veloxCovarIntermediateDataOrder - case _: Skewness => - veloxSkewnessIntermediateDataOrder + case _: Skewness | _: Kurtosis => + veloxCentralMomentAggIntermediateDataOrder case _ => aggFunc.aggBufferAttributes.map(_.name) } @@ -143,8 +144,8 @@ object VeloxIntermediateData { Some(veloxCovarIntermediateTypes) case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => Some(veloxVarianceIntermediateTypes) - case _: Skewness => - Some(veloxSkewnessIntermediateTypes) + case _: Skewness | _: Kurtosis => + Some(veloxCentralMomentAggIntermediateTypes) case _ if aggFunc.aggBufferAttributes.size > 1 => Some(aggFunc.aggBufferAttributes.map(_.dataType)) case _ => None diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala index c306f4585736..abeadaadf0ab 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxAggregateFunctionsSuite.scala @@ -901,6 +901,24 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } } + + test("kurtosis") { + runQueryAndCompare(""" + |select kurtosis(l_partkey) from lineitem; + |""".stripMargin) { + checkOperatorMatch[HashAggregateExecTransformer] + } + runQueryAndCompare("select kurtosis(l_partkey), count(distinct l_orderkey) from lineitem") { + df => + { + assert( + getExecutedPlan(df).count( + plan => { + plan.isInstanceOf[HashAggregateExecTransformer] + }) == 4) + } + } + } } class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index f04dcda7ce05..264379716328 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -1144,7 +1144,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag "covar_pop", "covar_samp", "approx_distinct", - "skewness"}; + "skewness", + "kurtosis"}; for (const auto& funcSpec : funcSpecs) { auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec); diff --git a/docs/velox-backend-support-progress.md b/docs/velox-backend-support-progress.md index 56cb95c1bb11..c08a2e6aa18f 100644 --- a/docs/velox-backend-support-progress.md +++ b/docs/velox-backend-support-progress.md @@ -364,7 +364,7 @@ Gluten supports 199 functions. (Draw to right to see all data types) | first_value | | first_value | S | | | | | | | | | | | | | | | | | | | | | grouping | | | | | | | | | | | | | | | | | | | | | | | | grouping_id | | | | | | | | | | | | | | | | | | | | | | | -| kurtosis | | | | | | | | | | | | | | | | | | | | | | | +| kurtosis | kurtosis | kurtosis | S | | | | S | S | S | S | S | | | | | | | | | | | | | last | | last | S | | | | | | | | | | | | | | | | | | | | | last_value | | last_value | S | | | | | | | | | | | | | | | | | | | | | max | max | | S | | | | S | S | S | S | S | | | | | | | | | | | | @@ -416,7 +416,7 @@ Gluten supports 199 functions. (Draw to right to see all data types) | java_method | | | | | | | | | | | | | | | | | | | | | | | | least | least | least | S | | | | | | S | S | S | S | S | | | | | | | | | | | md5 | md5 | | S | | | S | | | | | | | | | | | | | | | | | -| monotonically_increasing_id | | | S | | | | | | | | | | | | | | | | | | | | +| monotonically_increasing_id | | | S | | | | | | | | | | | | | | | | | | | | | nanvl | | | S | | | | | | | | | | | | | | | | | | | | | nvl | | | | | | | | | | | | | | | | | | | | | | | | nvl2 | | | | | | | | | | | | | | | | | | | | | | | @@ -428,4 +428,4 @@ Gluten supports 199 functions. (Draw to right to see all data types) | spark_partition_id | | | S | | | | | | | | | | | | | | | | | | | | | stack | | | | | | | | | | | | | | | | | | | | | | | | xxhash64 | xxhash64 | xxhash64 | | | | | | | | | | | | | | | | | | | | | -| uuid | uuid | uuid | S | | | | | | | | | | | | | | | | | | | | +| uuid | uuid | uuid | S | | | | | | | | | | | | | | | | | | | | diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala index e9133b6f228f..b3b1ab12b656 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala @@ -277,6 +277,7 @@ object ExpressionMappings { Sig[Last](LAST), Sig[First](FIRST), Sig[Skewness](SKEWNESS), + Sig[Kurtosis](KURTOSIS), Sig[ApproximatePercentile](APPROX_PERCENTILE) ) ++ SparkShimLoader.getSparkShims.aggregateExpressionMappings diff --git a/gluten-ut/spark32/src/test/resources/sql-tests/results/group-by.sql.out b/gluten-ut/spark32/src/test/resources/sql-tests/results/group-by.sql.out index 79e6f72df77a..a12e830c1117 100644 --- a/gluten-ut/spark32/src/test/resources/sql-tests/results/group-by.sql.out +++ b/gluten-ut/spark32/src/test/resources/sql-tests/results/group-by.sql.out @@ -130,7 +130,7 @@ FROM testData -- !query schema struct -- !query output --0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7 +-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7 -- !query diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala index 9ec55f01533c..dd63b1e4cb97 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala @@ -229,9 +229,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { ) val OVERWRITE_SQL_QUERY_LIST: Set[String] = Set( - // Velox corr has better computation logic but it fails Spark's precision check. + // The calculation formulas for corr, skewness, kurtosis, variance, and stddev in Velox differ + // slightly from those in Spark, resulting in some differences in the final results. // Overwrite below test cases. // -- SPARK-24369 multiple distinct aggregations having the same argument set + // -- Aggregate with nulls. "group-by.sql", "udf/udf-group-by.sql" ) diff --git a/gluten-ut/spark33/src/test/resources/sql-tests/results/group-by.sql.out b/gluten-ut/spark33/src/test/resources/sql-tests/results/group-by.sql.out index ffaa2d5111d1..982278fa5100 100644 --- a/gluten-ut/spark33/src/test/resources/sql-tests/results/group-by.sql.out +++ b/gluten-ut/spark33/src/test/resources/sql-tests/results/group-by.sql.out @@ -150,7 +150,7 @@ FROM testData -- !query schema struct -- !query output --0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7 +-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7 -- !query diff --git a/gluten-ut/spark33/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out b/gluten-ut/spark33/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out index 90272001fc2f..ea088f8e8f40 100644 --- a/gluten-ut/spark33/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out +++ b/gluten-ut/spark33/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out @@ -130,7 +130,7 @@ FROM testData -- !query schema struct -- !query output --0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7 +-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7 -- !query diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala index f773e78e870e..1d00c739c48d 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala @@ -232,9 +232,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { ) private val OVERWRITE_SQL_QUERY_LIST: Set[String] = Set( - // Velox corr has better computation logic but it fails Spark's precision check. + // The calculation formulas for corr, skewness, kurtosis, variance, and stddev in Velox differ + // slightly from those in Spark, resulting in some differences in the final results. // Overwrite below test cases. // -- SPARK-24369 multiple distinct aggregations having the same argument set + // -- Aggregate with nulls. "group-by.sql", "udf/udf-group-by.sql" ) diff --git a/gluten-ut/spark34/src/test/resources/sql-tests/results/group-by.sql.out b/gluten-ut/spark34/src/test/resources/sql-tests/results/group-by.sql.out index a92a58efb9e3..f56420926050 100644 --- a/gluten-ut/spark34/src/test/resources/sql-tests/results/group-by.sql.out +++ b/gluten-ut/spark34/src/test/resources/sql-tests/results/group-by.sql.out @@ -162,7 +162,7 @@ FROM testData -- !query schema struct -- !query output --0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7 +-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7 -- !query diff --git a/gluten-ut/spark34/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out b/gluten-ut/spark34/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out index 35f91a7c40de..d3735acf0f08 100644 --- a/gluten-ut/spark34/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out +++ b/gluten-ut/spark34/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out @@ -151,7 +151,7 @@ FROM testData -- !query schema struct -- !query output --0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7 +-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7 -- !query diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala index a7f190c0dad5..1421a126066d 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxSQLQueryTestSettings.scala @@ -233,9 +233,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { ) val OVERWRITE_SQL_QUERY_LIST: Set[String] = Set( - // Velox corr has better computation logic but it fails Spark's precision check. + // The calculation formulas for corr, skewness, kurtosis, variance, and stddev in Velox differ + // slightly from those in Spark, resulting in some differences in the final results. // Overwrite below test cases. // -- SPARK-24369 multiple distinct aggregations having the same argument set + // -- Aggregate with nulls. "group-by.sql", "udf/udf-group-by.sql", // Exception string doesn't match for diff --git a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala index f61aa3161662..8717be50d8bb 100644 --- a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala @@ -45,8 +45,9 @@ object ExpressionNames { final val FIRST = "first" final val FIRST_IGNORE_NULL = "first_ignore_null" final val APPROX_DISTINCT = "approx_distinct" - final val SKEWNESS = "skewness" final val APPROX_PERCENTILE = "approx_percentile" + final val SKEWNESS = "skewness" + final val KURTOSIS = "kurtosis" // Function names used by Substrait plan. final val ADD = "add"