From 22bb0b39368ff074b9dc66685fb83dc779343f94 Mon Sep 17 00:00:00 2001 From: Qian Sun Date: Tue, 3 Sep 2024 16:23:49 +0800 Subject: [PATCH 1/2] [GLUTEN-7090][VL] fix: Number of sorting keys must be greater than zero (#7089) Closes #7090 --- .../execution/VeloxWindowExpressionSuite.scala | 14 ++++++++++++++ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 8 ++++++++ 2 files changed, 22 insertions(+) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxWindowExpressionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxWindowExpressionSuite.scala index 03b295f4983f..7af17c4cd151 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxWindowExpressionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxWindowExpressionSuite.scala @@ -73,6 +73,20 @@ class VeloxWindowExpressionSuite extends WholeStageTransformerSuite { } } + test("test overlapping partition and sorting keys") { + runAndCompare( + """ + |WITH t AS ( + |SELECT + | l_linenumber, + | row_number() over (partition by l_linenumber order by l_linenumber) as rn + |FROM lineitem + |) + |SELECT * FROM t WHERE rn = 1 + |""".stripMargin + ) {} + } + test("collect_list / collect_set") { withTable("t") { val data = Seq( diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 1604c15e338a..bd545a960947 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1024,6 +1024,14 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan( } } const std::optional rowNumberColumnName = std::nullopt; + + if (sortingKeys.empty()) { + // Handle if all sorting keys are also used as partition keys. + + return std::make_shared( + nextPlanNodeId(), partitionKeys, rowNumberColumnName, (int32_t)windowGroupLimitRel.limit(), childNode); + } + return std::make_shared( nextPlanNodeId(), partitionKeys, From 6e0b11943242d18699634df818c1755bf3b5aa40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=89=AC?= <654010905@qq.com> Date: Tue, 3 Sep 2024 17:22:38 +0800 Subject: [PATCH 2/2] [GLUTEN-7054][CH] Fix cse alias issues (#7084) * fix cse alias issues * fix issue https://github.com/apache/incubator-gluten/issues/7054 * fix uts --- .../CommonSubexpressionEliminateRule.scala | 12 ++- .../hive/GlutenClickHouseHiveTableSuite.scala | 84 ++++++++++++++++--- 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala index a3b74366fc7b..52e278b3dace 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala @@ -28,6 +28,11 @@ import org.apache.spark.sql.internal.SQLConf import scala.collection.mutable +// If you want to debug CommonSubexpressionEliminateRule, you can: +// 1. replace all `logTrace` to `logError` +// 2. append two options to spark config +// --conf spark.sql.planChangeLog.level=error +// --conf spark.sql.planChangeLog.batches=all class CommonSubexpressionEliminateRule(session: SparkSession, conf: SQLConf) extends Rule[LogicalPlan] with Logging { @@ -121,7 +126,12 @@ class CommonSubexpressionEliminateRule(session: SparkSession, conf: SQLConf) if (expr.find(_.isInstanceOf[AggregateExpression]).isDefined) { addToEquivalentExpressions(expr, equivalentExpressions) } else { - equivalentExpressions.addExprTree(expr) + expr match { + case alias: Alias => + equivalentExpressions.addExprTree(alias.child) + case _ => + equivalentExpressions.addExprTree(expr) + } } }) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index f165d7aef69c..cbc3aed3607d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -985,19 +985,19 @@ class GlutenClickHouseHiveTableSuite } } - test("GLUTEN-4333: fix CSE in aggregate operator") { - def checkOperatorCount[T <: TransformSupport](count: Int)(df: DataFrame)(implicit - tag: ClassTag[T]): Unit = { - if (sparkVersion.equals("3.3")) { - assert( - getExecutedPlan(df).count( - plan => { - plan.getClass == tag.runtimeClass - }) == count, - s"executed plan: ${getExecutedPlan(df)}") - } + def checkOperatorCount[T <: TransformSupport](count: Int)(df: DataFrame)(implicit + tag: ClassTag[T]): Unit = { + if (sparkVersion.equals("3.3")) { + assert( + getExecutedPlan(df).count( + plan => { + plan.getClass == tag.runtimeClass + }) == count, + s"executed plan: ${getExecutedPlan(df)}") } + } + test("GLUTEN-4333: fix CSE in aggregate operator") { val createTableSql = """ |CREATE TABLE `test_cse`( @@ -1262,4 +1262,66 @@ class GlutenClickHouseHiveTableSuite compareResultsAgainstVanillaSpark(selectSql, true, _ => {}) sql(s"drop table if exists $tableName") } + + test("GLUTEN-7054: Fix exception when CSE meets common alias expression") { + val createTableSql = """ + |CREATE TABLE test_tbl_7054 ( + | day STRING, + | event_id STRING, + | event STRUCT< + | event_info: MAP + | > + |) STORED AS PARQUET; + |""".stripMargin + + val insertDataSql = """ + |INSERT INTO test_tbl_7054 + |VALUES + | ('2024-08-27', '011441004', + | STRUCT(MAP('type', '1', 'action', '8', 'value_vmoney', '100'))), + | ('2024-08-27', '011441004', + | STRUCT(MAP('type', '2', 'action', '8', 'value_vmoney', '200'))), + | ('2024-08-27', '011441004', + | STRUCT(MAP('type', '4', 'action', '8', 'value_vmoney', '300'))); + |""".stripMargin + + val selectSql = """ + |SELECT + | COALESCE(day, 'all') AS daytime, + | COALESCE(type, 'all') AS type, + | COALESCE(value_money, 'all') AS value_vmoney, + | SUM(CASE + | WHEN type IN (1, 2) AND action = 8 THEN value_vmoney + | ELSE 0 + | END) / 60 AS total_value_vmoney + |FROM ( + | SELECT + | day, + | type, + | NVL(CAST(value_vmoney AS BIGINT), 0) AS value_money, + | action, + | type, + | CAST(value_vmoney AS BIGINT) AS value_vmoney + | FROM ( + | SELECT + | day, + | event.event_info["type"] AS type, + | event.event_info["action"] AS action, + | event.event_info["value_vmoney"] AS value_vmoney + | FROM test_tbl_7054 + | WHERE + | day = '2024-08-27' + | AND event_id = '011441004' + | AND event.event_info["type"] IN (1, 2, 4) + | ) a + |) b + |GROUP BY + | day, type, value_money + |""".stripMargin + + spark.sql(createTableSql) + spark.sql(insertDataSql) + runQueryAndCompare(selectSql)(df => checkOperatorCount[ProjectExecTransformer](3)(df)) + spark.sql("DROP TABLE test_tbl_7054") + } }