Skip to content

Commit

Permalink
[GLUTEN-5618][CH] Fix 'Position x is out of bound in Block' error whe…
Browse files Browse the repository at this point in the history
…n executing count distinct (apache#5619)

When excuting count distinct, the group by keys are also in the count distinct expression, it will throw 'Position x is out of bound in Block' error or core dump.

RC:
CH backend will remove the duplicated column when executing pipeline.

Close apache#5618.
  • Loading branch information
zzcclp authored May 7, 2024
1 parent 717b263 commit fea4705
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan): HashAggregateExecBaseTransformer =
CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
groupingExpressions.distinct,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
child)
resultExpressions.distinct,
child
)

/** Generate HashAggregateExecPullOutHelper */
override def genHashAggregateExecPullOutHelper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,102 @@ class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTrans
"values (0, null,1), (0,null,1), (1, 1,1), (2, 2, 1) ,(2,2,2),(3,3,3) as data(a,b,c)"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test(
"Gluten-5618: [CH] Fix 'Position x is out of bound in Block' error " +
"when executing count distinct") {

withSQLConf(("spark.gluten.sql.countDistinctWithoutExpand", "false")) {
val sql =
"""
|select count(distinct a, b, c) from
|values (0, null, 1), (1, 1, 1), (2, 2, 1), (1, 2, 1) ,(2, 2, 2) as data(a,b,c) group by c
|""".stripMargin

compareResultsAgainstVanillaSpark(
sql,
true,
{
df =>
{

val planExecs = df.queryExecution.executedPlan.collect {
case aggTransformer: HashAggregateExecBaseTransformer => aggTransformer
}

planExecs.head.aggregateExpressions.foreach {
expr => assert(expr.toString().startsWith("count("))
}
planExecs(1).aggregateExpressions.foreach {
expr => assert(expr.toString().startsWith("partial_count("))
}
}
}
)
}

val sql =
"""
|select count(distinct a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
|from values
|(0, null, 1, 0, null, 1, 0, 5, 1, 0),
|(null, 1, 1, null, 1, 1, null, 1, 1, 3),
|(2, 2, 1, 2, 2, 1, 2, 2, 1, 2),
|(1, 2, null, 1, 2, null, 1, 2, 3, 1),
|(2, 2, 2, 2, 2, 2, 2, 2, 2, 2)
|as data(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10)
|group by a10
|""".stripMargin

compareResultsAgainstVanillaSpark(
sql,
true,
{
df =>
{

val planExecs = df.queryExecution.executedPlan.collect {
case aggTransformer: HashAggregateExecBaseTransformer => aggTransformer
}

planExecs.head.aggregateExpressions.foreach {
expr => assert(expr.toString().startsWith("count("))
}
planExecs(1).aggregateExpressions.foreach {
expr => assert(expr.toString().startsWith("partial_count("))
}
}
}
)

val sql1 =
"""
|select count(distinct a, b, c)
|from
|values (0, null, 1), (1, 1, 1), (null, 2, 1), (1, 2, 1) ,(2, 2, null)
|as data(a,b,c)
|group by c
|""".stripMargin

compareResultsAgainstVanillaSpark(
sql1,
true,
{
df =>
{

val planExecs = df.queryExecution.executedPlan.collect {
case aggTransformer: HashAggregateExecBaseTransformer => aggTransformer
}

planExecs.head.aggregateExpressions.foreach {
expr => assert(expr.toString().startsWith("countdistinct("))
}
planExecs(1).aggregateExpressions.foreach {
expr => assert(expr.toString().startsWith("partial_countdistinct("))
}
}
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableCountDistinctWithoutExpand
) {
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
case ae: AggregateExpression if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] =>
case ae: AggregateExpression
if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
// The maximum number of arguments for aggregate function with Nullable types in CH
// backend is 8
ae.aggregateFunction.children.size <= 8 =>
ae.copy(
aggregateFunction =
CountDistinct.apply(ae.aggregateFunction.asInstanceOf[Count].children),
Expand Down

0 comments on commit fea4705

Please sign in to comment.