Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 14, 2024
1 parent 1e2f005 commit a73d7b1
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.execution.{FlushableHashAggregateExecTransformer, HashAggregateExecTransformer, ProjectExecTransformer, RegularHashAggregateExecTransformer}
import org.apache.gluten.execution._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.aggregate.{Partial, PartialMerge}
Expand All @@ -30,11 +30,12 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
* optimizations such as flushing and abandoning.
*/
case class FlushableHashAggregateRule(session: SparkSession) extends Rule[SparkPlan] {
import FlushableHashAggregateRule._
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case shuffle: ShuffleExchangeLike =>
case ShuffleAndChild(shuffle, child) =>
// If an exchange follows a hash aggregate in which all functions are in partial mode,
// then it's safe to convert the hash aggregate to flushable hash aggregate.
shuffle.child match {
child match {
case HashAggPropagatedToShuffle(proj, agg) =>
shuffle.withNewChildren(
Seq(proj.withNewChildren(Seq(FlushableHashAggregateExecTransformer(
Expand Down Expand Up @@ -125,4 +126,16 @@ object FlushableHashAggregateRule {
val distribution = ClusteredDistribution(agg.groupingExpressions)
agg.child.outputPartitioning.satisfies(distribution)
}

private object ShuffleAndChild {
def unapply(plan: SparkPlan): Option[(SparkPlan, SparkPlan)] = plan match {
case s: ShuffleExchangeLike =>
val child = s.child match {
case VeloxAppendBatchesExec(child, _) => child
case other => other
}
Some(s, child)
case other => None
}
}
}

0 comments on commit a73d7b1

Please sign in to comment.