Skip to content

Commit

Permalink
[VL] Support kurtosis aggregate function (#5151)
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 authored Mar 29, 2024
1 parent 7d16e16 commit 3b9a3ea
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ object CHExpressionUtil {
URL_DECODE -> DefaultValidator(),
SKEWNESS -> DefaultValidator(),
BIT_LENGTH -> DefaultValidator(),
MAKE_YM_INTERVAL -> DefaultValidator()
MAKE_YM_INTERVAL -> DefaultValidator(),
KURTOSIS -> DefaultValidator()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ object VeloxIntermediateData {
Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg")
// CovPopulation, CovSample
private val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg", "yAvg")
// Skewness
private val veloxSkewnessIntermediateDataOrder: Seq[String] = Seq("n", "avg", "m2", "m3", "m4")
// Skewness, Kurtosis
private val veloxCentralMomentAggIntermediateDataOrder: Seq[String] =
Seq("n", "avg", "m2", "m3", "m4")

// Agg functions with inconsistent types of intermediate data between Velox and Spark.
// StddevSamp, StddevPop, VarianceSamp, VariancePop
Expand All @@ -43,8 +44,8 @@ object VeloxIntermediateData {
// Corr
private val veloxCorrIntermediateTypes: Seq[DataType] =
Seq(DoubleType, LongType, DoubleType, DoubleType, DoubleType, DoubleType)
// Skewness
private val veloxSkewnessIntermediateTypes: Seq[DataType] =
// Skewness, Kurtosis
private val veloxCentralMomentAggIntermediateTypes: Seq[DataType] =
Seq(LongType, DoubleType, DoubleType, DoubleType, DoubleType)

/**
Expand All @@ -62,8 +63,8 @@ object VeloxIntermediateData {
veloxCorrIntermediateDataOrder
case _: CovPopulation | _: CovSample =>
veloxCovarIntermediateDataOrder
case _: Skewness =>
veloxSkewnessIntermediateDataOrder
case _: Skewness | _: Kurtosis =>
veloxCentralMomentAggIntermediateDataOrder
case _ =>
aggFunc.aggBufferAttributes.map(_.name)
}
Expand Down Expand Up @@ -143,8 +144,8 @@ object VeloxIntermediateData {
Some(veloxCovarIntermediateTypes)
case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop =>
Some(veloxVarianceIntermediateTypes)
case _: Skewness =>
Some(veloxSkewnessIntermediateTypes)
case _: Skewness | _: Kurtosis =>
Some(veloxCentralMomentAggIntermediateTypes)
case _ if aggFunc.aggBufferAttributes.size > 1 =>
Some(aggFunc.aggBufferAttributes.map(_.dataType))
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,24 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu
}
}
}

test("kurtosis") {
runQueryAndCompare("""
|select kurtosis(l_partkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[HashAggregateExecTransformer]
}
runQueryAndCompare("select kurtosis(l_partkey), count(distinct l_orderkey) from lineitem") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[HashAggregateExecTransformer]
}) == 4)
}
}
}
}

class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite {
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,8 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag
"covar_pop",
"covar_samp",
"approx_distinct",
"skewness"};
"skewness",
"kurtosis"};

for (const auto& funcSpec : funcSpecs) {
auto funcName = SubstraitParser::getNameBeforeDelimiter(funcSpec);
Expand Down
6 changes: 3 additions & 3 deletions docs/velox-backend-support-progress.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ Gluten supports 199 functions. (Draw to right to see all data types)
| first_value | | first_value | S | | | | | | | | | | | | | | | | | | | |
| grouping | | | | | | | | | | | | | | | | | | | | | | |
| grouping_id | | | | | | | | | | | | | | | | | | | | | | |
| kurtosis | | | | | | | | | | | | | | | | | | | | | | |
| kurtosis | kurtosis | kurtosis | S | | | | S | S | S | S | S | | | | | | | | | | | |
| last | | last | S | | | | | | | | | | | | | | | | | | | |
| last_value | | last_value | S | | | | | | | | | | | | | | | | | | | |
| max | max | | S | | | | S | S | S | S | S | | | | | | | | | | | |
Expand Down Expand Up @@ -416,7 +416,7 @@ Gluten supports 199 functions. (Draw to right to see all data types)
| java_method | | | | | | | | | | | | | | | | | | | | | | |
| least | least | least | S | | | | | | S | S | S | S | S | | | | | | | | | |
| md5 | md5 | | S | | | S | | | | | | | | | | | | | | | | |
| monotonically_increasing_id | | | S | | | | | | | | | | | | | | | | | | | |
| monotonically_increasing_id | | | S | | | | | | | | | | | | | | | | | | | |
| nanvl | | | S | | | | | | | | | | | | | | | | | | | |
| nvl | | | | | | | | | | | | | | | | | | | | | | |
| nvl2 | | | | | | | | | | | | | | | | | | | | | | |
Expand All @@ -428,4 +428,4 @@ Gluten supports 199 functions. (Draw to right to see all data types)
| spark_partition_id | | | S | | | | | | | | | | | | | | | | | | | |
| stack | | | | | | | | | | | | | | | | | | | | | | |
| xxhash64 | xxhash64 | xxhash64 | | | | | | | | | | | | | | | | | | | | |
| uuid | uuid | uuid | S | | | | | | | | | | | | | | | | | | | |
| uuid | uuid | uuid | S | | | | | | | | | | | | | | | | | | | |
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ object ExpressionMappings {
Sig[Last](LAST),
Sig[First](FIRST),
Sig[Skewness](SKEWNESS),
Sig[Kurtosis](KURTOSIS),
Sig[ApproximatePercentile](APPROX_PERCENTILE)
) ++ SparkShimLoader.getSparkShims.aggregateExpressionMappings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ FROM testData
-- !query schema
struct<skewness(a):double,kurtosis(a):double,min(a):int,max(a):int,avg(a):double,variance(a):double,stddev(a):double,sum(a):bigint,count(a):bigint>
-- !query output
-0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7
-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings {
)

val OVERWRITE_SQL_QUERY_LIST: Set[String] = Set(
// Velox corr has better computation logic but it fails Spark's precision check.
// The calculation formulas for corr, skewness, kurtosis, variance, and stddev in Velox differ
// slightly from those in Spark, resulting in some differences in the final results.
// Overwrite below test cases.
// -- SPARK-24369 multiple distinct aggregations having the same argument set
// -- Aggregate with nulls.
"group-by.sql",
"udf/udf-group-by.sql"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ FROM testData
-- !query schema
struct<skewness(a):double,kurtosis(a):double,min(a):int,max(a):int,avg(a):double,variance(a):double,stddev(a):double,sum(a):bigint,count(a):bigint>
-- !query output
-0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7
-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ FROM testData
-- !query schema
struct<skewness(udf(a)):double,udf(kurtosis(a)):double,udf(min(a)):int,max(udf(a)):int,udf(avg(udf(a))):double,udf(variance(a)):double,stddev(udf(a)):double,udf(sum(a)):bigint,udf(count(a)):bigint>
-- !query output
-0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7
-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings {
)

private val OVERWRITE_SQL_QUERY_LIST: Set[String] = Set(
// Velox corr has better computation logic but it fails Spark's precision check.
// The calculation formulas for corr, skewness, kurtosis, variance, and stddev in Velox differ
// slightly from those in Spark, resulting in some differences in the final results.
// Overwrite below test cases.
// -- SPARK-24369 multiple distinct aggregations having the same argument set
// -- Aggregate with nulls.
"group-by.sql",
"udf/udf-group-by.sql"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ FROM testData
-- !query schema
struct<skewness(a):double,kurtosis(a):double,min(a):int,max(a):int,avg(a):double,variance(a):double,stddev(a):double,sum(a):bigint,count(a):bigint>
-- !query output
-0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7
-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ FROM testData
-- !query schema
struct<skewness(udf(a)):double,udf(kurtosis(a)):double,udf(min(a)):int,max(udf(a)):int,udf(avg(udf(a))):double,udf(variance(a)):double,stddev(udf(a)):double,udf(sum(a)):bigint,udf(count(a)):bigint>
-- !query output
-0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7
-0.27238010581457284 -1.5069204152249138 1 3 2.142857142857143 0.8095238095238096 0.8997354108424375 15 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings {
)

val OVERWRITE_SQL_QUERY_LIST: Set[String] = Set(
// Velox corr has better computation logic but it fails Spark's precision check.
// The calculation formulas for corr, skewness, kurtosis, variance, and stddev in Velox differ
// slightly from those in Spark, resulting in some differences in the final results.
// Overwrite below test cases.
// -- SPARK-24369 multiple distinct aggregations having the same argument set
// -- Aggregate with nulls.
"group-by.sql",
"udf/udf-group-by.sql",
// Exception string doesn't match for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ object ExpressionNames {
final val FIRST = "first"
final val FIRST_IGNORE_NULL = "first_ignore_null"
final val APPROX_DISTINCT = "approx_distinct"
final val SKEWNESS = "skewness"
final val APPROX_PERCENTILE = "approx_percentile"
final val SKEWNESS = "skewness"
final val KURTOSIS = "kurtosis"

// Function names used by Substrait plan.
final val ADD = "add"
Expand Down

0 comments on commit 3b9a3ea

Please sign in to comment.