From 9fdc3754d676bcb5de500c87025b9c571a7cf523 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 20 Dec 2023 16:46:54 +0800 Subject: [PATCH] [SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING ### What changes were proposed in this pull request? This PR enhanced the analyzer to handle the following pattern properly. ``` Sort - Filter - Aggregate ``` ### Why are the changes needed? ``` spark-sql (default)> CREATE TABLE t1 (flag BOOLEAN, dt STRING); spark-sql (default)> SELECT LENGTH(dt), > COUNT(t1.flag) > FROM t1 > GROUP BY LENGTH(dt) > HAVING COUNT(t1.flag) > 1 > ORDER BY LENGTH(dt); [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `dt` cannot be resolved. Did you mean one of the following? [`length(dt)`, `count(flag)`].; line 6 pos 16; 'Sort ['LENGTH('dt) ASC NULLS FIRST], true +- Filter (count(flag)#60L > cast(1 as bigint)) +- Aggregate [length(dt#9)], [length(dt#9) AS length(dt)#59, count(flag#8) AS count(flag)#60L] +- SubqueryAlias spark_catalog.default.t1 +- Relation spark_catalog.default.t1[flag#8,dt#9] parquet ``` The above code demonstrates the failure case, the query failed during the analysis phase when both `HAVING` and `ORDER BY` clauses are present, but successful if only one is present. ### Does this PR introduce _any_ user-facing change? Yes, maybe we can call it a bugfix. ### How was this patch tested? New UTs are added ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44352 from pan3793/SPARK-28386. Authored-by: Cheng Pan Signed-off-by: Kent Yao --- .../sql/catalyst/analysis/Analyzer.scala | 9 ++++++ .../analysis/ResolveReferencesInSort.scala | 16 ++++++---- .../sql-tests/analyzer-results/having.sql.out | 29 +++++++++++++++++++ .../udf/postgreSQL/udf-select_having.sql.out | 11 ++++--- .../resources/sql-tests/inputs/having.sql | 6 ++++ .../sql-tests/results/having.sql.out | 18 ++++++++++++ .../approved-plans-v2_7/q6.sf100/explain.txt | 8 ++--- .../q6.sf100/simplified.txt | 2 +- .../approved-plans-v2_7/q6/explain.txt | 8 ++--- .../approved-plans-v2_7/q6/simplified.txt | 2 +- 10 files changed, 87 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e3538647e3754..94f6d33462656 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2690,6 +2690,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } s.copy(order = newSortOrder, child = newChild) }) + + case s @ Sort(_, _, f @ Filter(cond, agg: Aggregate)) + if agg.resolved && cond.resolved && s.order.forall(_.resolved) => + resolveOperatorWithAggregate(s.order.map(_.child), agg, (newExprs, newChild) => { + val newSortOrder = s.order.zip(newExprs).map { + case (sortOrder, expr) => sortOrder.copy(child = expr) + } + s.copy(order = newSortOrder, child = f.copy(child = newChild)) + }) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala index 02583ebb8f6ba..6fa723d4a75fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.SortOrder -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort} import org.apache.spark.sql.connector.catalog.CatalogManager /** @@ -28,10 +28,11 @@ import org.apache.spark.sql.connector.catalog.CatalogManager * includes metadata columns as well. * 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g. * `SELECT col, current_date FROM t`. - * 3. If the child plan is Aggregate, resolves the column to [[TempResolvedColumn]] with the output - * of Aggregate's child plan. This is to allow Sort to host grouping expressions and aggregate - * functions, which can be pushed down to the Aggregate later. For example, - * `SELECT max(a) FROM t GROUP BY b ORDER BY min(a)`. + * 3. If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to + * [[TempResolvedColumn]] with the output of Aggregate's child plan. + * This is to allow Sort to host grouping expressions and aggregate functions, which can + * be pushed down to the Aggregate later. For example, + * `SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)`. * 4. Resolves the column to [[AttributeReference]] with the output of a descendant plan node. * Spark will propagate the missing attributes from the descendant plan node to the Sort node. * This is to allow users to ORDER BY columns that are not in the SELECT clause, which is @@ -51,7 +52,10 @@ class ResolveReferencesInSort(val catalogManager: CatalogManager) def apply(s: Sort): LogicalPlan = { val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child)) - val resolvedWithAgg = resolvedBasic.map(resolveColWithAgg(_, s.child)) + val resolvedWithAgg = s.child match { + case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_, agg)) + case _ => resolvedBasic.map(resolveColWithAgg(_, s.child)) + } val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child) val orderByAllResolved = resolveOrderByAll( s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder])) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out index 12eb5a34146a2..d96ea1e43e18b 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out @@ -179,3 +179,32 @@ Filter (c1#x = 1) +- Aggregate [c1#x], [c1#x] +- SubqueryAlias t +- LocalRelation [c1#x, c2#x] + + +-- !query +SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v) +-- !query analysis +Sort [sum(v)#xL ASC NULLS FIRST], true ++- Filter (sum(v)#xL > cast(2 as bigint)) + +- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL] + +- SubqueryAlias hav + +- View (`hav`, [k#x,v#x]) + +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x] + +- Project [k#x, v#x] + +- SubqueryAlias hav + +- LocalRelation [k#x, v#x] + + +-- !query +SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v) +-- !query analysis +Project [k#x, sum(v)#xL] ++- Sort [avg(v#x)#x ASC NULLS FIRST], true + +- Filter (sum(v)#xL > cast(2 as bigint)) + +- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL, avg(v#x) AS avg(v#x)#x] + +- SubqueryAlias hav + +- View (`hav`, [k#x,v#x]) + +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x] + +- Project [k#x, v#x] + +- SubqueryAlias hav + +- LocalRelation [k#x, v#x] diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out index 5f7e92a62f302..ea6b1716869ff 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out @@ -102,12 +102,11 @@ Project [udf(b)#x, udf(c)#x] SELECT udf(b), udf(c) FROM test_having GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c) -- !query analysis -Project [udf(b)#x, udf(c)#x] -+- Sort [cast(udf(cast(b#x as string)) as int) ASC NULLS FIRST, cast(udf(cast(c#x as string)) as string) ASC NULLS FIRST], true - +- Filter (udf(b)#x = 3) - +- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x, b#x, c#x] - +- SubqueryAlias spark_catalog.default.test_having - +- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet +Sort [udf(b)#x ASC NULLS FIRST, udf(c)#x ASC NULLS FIRST], true ++- Filter (udf(b)#x = 3) + +- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x] + +- SubqueryAlias spark_catalog.default.test_having + +- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet -- !query diff --git a/sql/core/src/test/resources/sql-tests/inputs/having.sql b/sql/core/src/test/resources/sql-tests/inputs/having.sql index 056b99e363d21..4c25a60c8abb4 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/having.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/having.sql @@ -33,3 +33,9 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING SETS(t.c1) HAVING t. SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1; SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 = 1; SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1; + +-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the agg function presents on SELECT list +SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v); + +-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the agg function does not present on SELECT list +SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v); diff --git a/sql/core/src/test/resources/sql-tests/results/having.sql.out b/sql/core/src/test/resources/sql-tests/results/having.sql.out index 6eaba0b4119cb..c9d5886426364 100644 --- a/sql/core/src/test/resources/sql-tests/results/having.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/having.sql.out @@ -134,3 +134,21 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1 struct -- !query output 1 + + +-- !query +SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v) +-- !query schema +struct +-- !query output +three 3 +one 6 + + +-- !query +SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v) +-- !query schema +struct +-- !query output +one 6 +three 3 diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt index afdfc51a17dd4..82a6e00c79c4b 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt @@ -248,15 +248,15 @@ Input [2]: [ca_state#18, count#22] Keys [1]: [ca_state#18] Functions [1]: [count(1)] Aggregate Attributes [1]: [count(1)#23] -Results [3]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25, ca_state#18] +Results [2]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25] (44) Filter [codegen id : 14] -Input [3]: [state#24, cnt#25, ca_state#18] +Input [2]: [state#24, cnt#25] Condition : (cnt#25 >= 10) (45) TakeOrderedAndProject -Input [3]: [state#24, cnt#25, ca_state#18] -Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#18 ASC NULLS FIRST], [state#24, cnt#25] +Input [2]: [state#24, cnt#25] +Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, cnt#25] ===== Subqueries ===== diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt index 7339df16a2895..d69eb47d92c88 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt @@ -1,4 +1,4 @@ -TakeOrderedAndProject [cnt,ca_state,state] +TakeOrderedAndProject [cnt,state] WholeStageCodegen (14) Filter [cnt] HashAggregate [ca_state,count] [count(1),state,cnt,count] diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt index a2638dac56456..507d4991a046a 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt @@ -218,15 +218,15 @@ Input [2]: [ca_state#2, count#22] Keys [1]: [ca_state#2] Functions [1]: [count(1)] Aggregate Attributes [1]: [count(1)#23] -Results [3]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25, ca_state#2] +Results [2]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25] (38) Filter [codegen id : 8] -Input [3]: [state#24, cnt#25, ca_state#2] +Input [2]: [state#24, cnt#25] Condition : (cnt#25 >= 10) (39) TakeOrderedAndProject -Input [3]: [state#24, cnt#25, ca_state#2] -Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], [state#24, cnt#25] +Input [2]: [state#24, cnt#25] +Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, cnt#25] ===== Subqueries ===== diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt index c9c8358ba0b9f..a15b638fbb2c0 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt @@ -1,4 +1,4 @@ -TakeOrderedAndProject [cnt,ca_state,state] +TakeOrderedAndProject [cnt,state] WholeStageCodegen (8) Filter [cnt] HashAggregate [ca_state,count] [count(1),state,cnt,count]