From fea47053c9716ad1cc2433403312b348889ca959 Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Tue, 7 May 2024 09:43:12 +0800 Subject: [PATCH] [GLUTEN-5618][CH] Fix 'Position x is out of bound in Block' error when executing count distinct (#5619) When excuting count distinct, the group by keys are also in the count distinct expression, it will throw 'Position x is out of bound in Block' error or core dump. RC: CH backend will remove the duplicated column when executing pipeline. Close #5618. --- .../clickhouse/CHSparkPlanExecApi.scala | 7 +- .../GlutenClickhouseCountDistinctSuite.scala | 98 +++++++++++++++++++ .../CountDistinctWithoutExpand.scala | 6 +- 3 files changed, 107 insertions(+), 4 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index ee8b7dd4540d..64090af287e4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -194,12 +194,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { child: SparkPlan): HashAggregateExecBaseTransformer = CHHashAggregateExecTransformer( requiredChildDistributionExpressions, - groupingExpressions, + groupingExpressions.distinct, aggregateExpressions, aggregateAttributes, initialInputBufferOffset, - resultExpressions, - child) + resultExpressions.distinct, + child + ) /** Generate HashAggregateExecPullOutHelper */ override def genHashAggregateExecPullOutHelper( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala index b12f886e5d6d..1b954df22eac 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala @@ -115,4 +115,102 @@ class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTrans "values (0, null,1), (0,null,1), (1, 1,1), (2, 2, 1) ,(2,2,2),(3,3,3) as data(a,b,c)" compareResultsAgainstVanillaSpark(sql, true, { _ => }) } + + test( + "Gluten-5618: [CH] Fix 'Position x is out of bound in Block' error " + + "when executing count distinct") { + + withSQLConf(("spark.gluten.sql.countDistinctWithoutExpand", "false")) { + val sql = + """ + |select count(distinct a, b, c) from + |values (0, null, 1), (1, 1, 1), (2, 2, 1), (1, 2, 1) ,(2, 2, 2) as data(a,b,c) group by c + |""".stripMargin + + compareResultsAgainstVanillaSpark( + sql, + true, + { + df => + { + + val planExecs = df.queryExecution.executedPlan.collect { + case aggTransformer: HashAggregateExecBaseTransformer => aggTransformer + } + + planExecs.head.aggregateExpressions.foreach { + expr => assert(expr.toString().startsWith("count(")) + } + planExecs(1).aggregateExpressions.foreach { + expr => assert(expr.toString().startsWith("partial_count(")) + } + } + } + ) + } + + val sql = + """ + |select count(distinct a1, a2, a3, a4, a5, a6, a7, a8, a9, a10) + |from values + |(0, null, 1, 0, null, 1, 0, 5, 1, 0), + |(null, 1, 1, null, 1, 1, null, 1, 1, 3), + |(2, 2, 1, 2, 2, 1, 2, 2, 1, 2), + |(1, 2, null, 1, 2, null, 1, 2, 3, 1), + |(2, 2, 2, 2, 2, 2, 2, 2, 2, 2) + |as data(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10) + |group by a10 + |""".stripMargin + + compareResultsAgainstVanillaSpark( + sql, + true, + { + df => + { + + val planExecs = df.queryExecution.executedPlan.collect { + case aggTransformer: HashAggregateExecBaseTransformer => aggTransformer + } + + planExecs.head.aggregateExpressions.foreach { + expr => assert(expr.toString().startsWith("count(")) + } + planExecs(1).aggregateExpressions.foreach { + expr => assert(expr.toString().startsWith("partial_count(")) + } + } + } + ) + + val sql1 = + """ + |select count(distinct a, b, c) + |from + |values (0, null, 1), (1, 1, 1), (null, 2, 1), (1, 2, 1) ,(2, 2, null) + |as data(a,b,c) + |group by c + |""".stripMargin + + compareResultsAgainstVanillaSpark( + sql1, + true, + { + df => + { + + val planExecs = df.queryExecution.executedPlan.collect { + case aggTransformer: HashAggregateExecBaseTransformer => aggTransformer + } + + planExecs.head.aggregateExpressions.foreach { + expr => assert(expr.toString().startsWith("countdistinct(")) + } + planExecs(1).aggregateExpressions.foreach { + expr => assert(expr.toString().startsWith("partial_countdistinct(")) + } + } + } + ) + } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala index 43cc68eadbe5..82051baeebc7 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala @@ -36,7 +36,11 @@ object CountDistinctWithoutExpand extends Rule[LogicalPlan] { GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableCountDistinctWithoutExpand ) { plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { - case ae: AggregateExpression if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] => + case ae: AggregateExpression + if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] && + // The maximum number of arguments for aggregate function with Nullable types in CH + // backend is 8 + ae.aggregateFunction.children.size <= 8 => ae.copy( aggregateFunction = CountDistinct.apply(ae.aggregateFunction.asInstanceOf[Count].children),