Skip to content

Commit

Permalink
[GLUTEN-7096] [CH] fix exception when same names in group by
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai-xu committed Sep 3, 2024
1 parent 6e0b119 commit c78b898
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverri
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.utils.{CHAggUtil, CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer

import org.apache.spark.ShuffleDependency
Expand Down Expand Up @@ -163,7 +163,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
child: SparkPlan): HashAggregateExecBaseTransformer =
CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions.distinct,
CHAggUtil.distinctIgnoreQualifier(groupingExpressions),
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.gluten.substrait.{AggregationParams, SubstraitContext}
import org.apache.gluten.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode}
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
import org.apache.gluten.utils.CHAggUtil

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -429,6 +430,11 @@ case class CHHashAggregateExecPullOutHelper(
aggregateAttr.toList
}

override def allAggregateResultAttributes(
groupingExpressions: Seq[NamedExpression]): List[Attribute] = {
super.allAggregateResultAttributes(CHAggUtil.distinctIgnoreQualifier(groupingExpressions))
}

protected def getAttrForAggregateExpr(
exp: AggregateExpression,
aggregateAttributeList: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.NamedExpression

import scala.util.control.Breaks.{break, breakable}

object CHAggUtil extends Logging {
def distinctIgnoreQualifier(expressions: Seq[NamedExpression]): Seq[NamedExpression] = {
var dist = List[NamedExpression]()
for (i <- expressions.indices) {
var k = -1
breakable {
for (j <- 0 to i - i)
if (
j != i &&
expressions(i).name == expressions(j).name &&
expressions(i).exprId == expressions(j).exprId &&
expressions(i).dataType == expressions(j).dataType &&
expressions(i).nullable == expressions(j).nullable
) {
k = j
break
}
}
if (k < 0) dist = dist :+ expressions(i)
}
dist
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,44 @@ class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4)
}

testGluten("GLUTEN-7096: Same names in group by may cause exception") {
sql("create table if not exists user (day string, rtime int, uid string, owner string)")
sql("insert into user values ('2024-09-01', 123, 'user1', 'owner1')")
sql("insert into user values ('2024-09-01', 123, 'user1', 'owner1')")
sql("insert into user values ('2024-09-02', 567, 'user2', 'owner2')")
val query =
"""
|select days, rtime, uid, uid, day1
|from (
| select day1 as days, rtime, uid, uid, day1
| from (
| select distinct coalesce(day, "today") as day1, rtime, uid, owner
| from user where day = '2024-09-01'
| )) group by days, rtime, guest_uid, owner_uid, day1
|""".stripMargin
val df = sql(query)
checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", "2024-09-01")))
sql("drop table if exists user")
}

testGluten("GLUTEN-7096: Same names with different qualifier in group by may cause exception") {
sql("create table if not exists user (day string, rtime int, uid string, owner string)")
sql("insert into user values ('2024-09-01', 123, 'user1', 'owner1')")
sql("insert into user values ('2024-09-01', 123, 'user1', 'owner1')")
sql("insert into user values ('2024-09-02', 567, 'user2', 'owner2')")
val query =
"""
|select days, rtime, uid, uid, day1
|from (
| select day1 as days, rtime, uid, uid, day1
| from (
| select distinct coalesce(day, "today") as day1, rtime, uid, owner
| from user where day = '2024-09-01'
| ) t1 ) t2 group by days, rtime, guest_uid, owner_uid, day1
|""".stripMargin
val df = sql(query)
checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", "2024-09-01")))
sql("drop table if exists user")
}
}

0 comments on commit c78b898

Please sign in to comment.