Skip to content

Commit

Permalink
[CORE] Remove ColumnarAQEShuffleRead (#3607)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Nov 6, 2023
1 parent ba045c7 commit 87b3c2e
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 739 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.ColumnarAQEShuffleReadExec
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.NativeWritePostRule
import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -286,10 +286,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
Seq(ProjectExecTransformer(child.output ++ appendedProjections, wt.child)))
case w: WholeStageCodegenExec =>
w.withNewChildren(Seq(ProjectExec(child.output ++ appendedProjections, w.child)))
case columnarAQEShuffleReadExec: ColumnarAQEShuffleReadExec =>
case r: AQEShuffleReadExec if r.supportsColumnar =>
// when aqe is open
// TODO: remove this after pushdowning preprojection
wrapChild(columnarAQEShuffleReadExec)
wrapChild(r)
case r2c: RowToCHNativeColumnarExec =>
wrapChild(r2c)
case union: UnionExecTransformer =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.CoalescedPartitionSpec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, ColumnarAQEShuffleReadExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec}

class GlutenClickHouseColumnarShuffleAQESuite
extends GlutenClickHouseTPCHAbstractSuite
Expand All @@ -45,7 +45,7 @@ class GlutenClickHouseColumnarShuffleAQESuite
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])

val colCustomShuffleReaderExecs = collect(df.queryExecution.executedPlan) {
case csr: ColumnarAQEShuffleReadExec => csr
case csr: AQEShuffleReadExec => csr
}
assert(colCustomShuffleReaderExecs.size == 2)
val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.CoalescedPartitionSpec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, ColumnarAQEShuffleReadExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec}

class GlutenClickHousePreferSpillColumnarShuffleAQESuite
extends GlutenClickHouseTPCHAbstractSuite
Expand Down Expand Up @@ -46,7 +46,7 @@ class GlutenClickHousePreferSpillColumnarShuffleAQESuite
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])

val colCustomShuffleReaderExecs = collect(df.queryExecution.executedPlan) {
case csr: ColumnarAQEShuffleReadExec => csr
case csr: AQEShuffleReadExec => csr
}
assert(colCustomShuffleReaderExecs.size == 2)
val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.glutenproject.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, ColumnarAQEShuffleReadExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec}

class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
Expand Down Expand Up @@ -116,7 +116,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
) {
df =>
val aqeRead = find(df.queryExecution.executedPlan) {
case _: ColumnarAQEShuffleReadExec => true
case _: AQEShuffleReadExec => true
case _ => false
}
assert(aqeRead.isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.CoalescedPartitionSpec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, ColumnarAQEShuffleReadExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec}
import org.apache.spark.sql.internal.SQLConf

case class TestData(id: Int)
Expand Down Expand Up @@ -52,7 +52,7 @@ class GlutenClickHouseRSSColumnarShuffleAQESuite
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])

val colCustomShuffleReaderExecs = collect(df.queryExecution.executedPlan) {
case csr: ColumnarAQEShuffleReadExec => csr
case csr: AQEShuffleReadExec => csr
}
assert(colCustomShuffleReaderExecs.size == 2)
val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,18 +477,6 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
left,
right,
isNullAwareAntiJoin = plan.isNullAwareAntiJoin)
case plan: AQEShuffleReadExec
if BackendsApiManager.getSettings.supportColumnarShuffleExec() =>
plan.child match {
case ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarAQEShuffleReadExec(plan.child, plan.partitionSpecs)
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), _) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarAQEShuffleReadExec(plan.child, plan.partitionSpecs)
case _ =>
plan
}
case plan: WindowExec =>
WindowExecTransformer(
plan.windowExpression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
package io.glutenproject.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution.BroadcastHashJoinExecTransformer
import io.glutenproject.extension.columnar.TransformHints

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarShuffleExchangeExec, ColumnarToRowExec, CommandResultExec, LeafExecNode, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, ColumnarAQEShuffleReadExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarToRowExec, CommandResultExec, LeafExecNode, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.exchange.Exchange

// spotless:off
/**
Expand Down Expand Up @@ -229,23 +228,11 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP

private def fallbackToRowBasedPlan(): SparkPlan = {
val transformPostOverrides = TransformPostOverrides(isAdaptiveContext)
val planWithReplacedAQERead = originalPlan.transform {
case plan: AQEShuffleReadExec
if BackendsApiManager.getSettings.supportColumnarShuffleExec() =>
plan.child match {
case ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _) =>
ColumnarAQEShuffleReadExec(plan.child, plan.partitionSpecs)
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), _) =>
ColumnarAQEShuffleReadExec(plan.child, plan.partitionSpecs)
case _ =>
plan
}
}
val planWithColumnarToRow = InsertTransitions.insertTransitions(planWithReplacedAQERead, false)
val planWithColumnarToRow = InsertTransitions.insertTransitions(originalPlan, false)
planWithColumnarToRow.transform {
case c2r @ ColumnarToRowExec(_: ShuffleQueryStageExec) =>
transformPostOverrides.transformColumnarToRowExec(c2r)
case c2r @ ColumnarToRowExec(_: ColumnarAQEShuffleReadExec) =>
case c2r @ ColumnarToRowExec(_: AQEShuffleReadExec) =>
transformPostOverrides.transformColumnarToRowExec(c2r)
// `InMemoryTableScanExec` itself supports columnar to row
case ColumnarToRowExec(child: SparkPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.glutenproject.extension.GlutenPlan

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, ColumnarAQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

/**
Expand Down Expand Up @@ -57,7 +57,7 @@ object FallbackUtil extends Logging with AdaptiveSparkPlanHelper {
true
case _: ReusedExchangeExec =>
true
case _: ColumnarAQEShuffleReadExec =>
case p: SparkPlan if p.supportsColumnar =>
true
case _ =>
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ case class ColumnarShuffleExchangeExec(
}

override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = {
cachedShuffleRDD
new ShuffledColumnarBatchRDD(columnarShuffleDependency, readMetrics, partitionSpecs)
}

override def stringArgs: Iterator[Any] =
Expand Down
Loading

0 comments on commit 87b3c2e

Please sign in to comment.