Skip to content

Commit

Permalink
[CORE] Fix convertBroadcastExchangeToColumnar for non-WholeStageCodeg…
Browse files Browse the repository at this point in the history
…en fallback (#3424)
  • Loading branch information
liujiayi771 authored Oct 25, 2023
1 parent 0e867cf commit caf959e
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.glutenproject.expression
import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution.{ColumnarToRowExecBase, WholeStageTransformer}
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.test.TestStats
import io.glutenproject.utils.DecimalArithmeticUtil

Expand Down Expand Up @@ -464,16 +465,16 @@ object ExpressionConverter extends SQLConfHelper with Logging {
// get WholeStageTransformer directly
case c2r: ColumnarToRowExecBase => c2r.child
// in fallback case
case codeGen: WholeStageCodegenExec =>
if (codeGen.child.isInstanceOf[ColumnarToRowExec]) {
case plan: UnaryExecNode if !plan.isInstanceOf[GlutenPlan] =>
if (plan.child.isInstanceOf[ColumnarToRowExec]) {
val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer])
if (wholeStageTransformer.nonEmpty) {
wholeStageTransformer.get
} else {
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(codeGen)
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)
}
} else {
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(codeGen)
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)
}
}
ColumnarBroadcastExchangeExec(exchange.mode, newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOnDisableScan].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOnDisableProject].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOffDisableProject].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOff].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan].exclude(
Expand All @@ -295,6 +299,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOnDisableProject].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOffDisableProject].exclude(
"Gluten - SPARK-32659: Fix the data issue when pruning DPP on non-atomic type")
enableSuite[GlutenExpressionsSchemaSuite]
enableSuite[GlutenExtraStrategiesSuite]
enableSuite[GlutenFileBasedDataSourceSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOn]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOnDisableScan]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffDisableScan]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOnDisableProject]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOffDisableProject]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOff]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOnDisableProject]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOffDisableProject]

enableSuite[GlutenAdaptiveQueryExecSuite]
.includeByPrefix(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,22 @@ class GlutenDynamicPartitionPruningV1SuiteAEOffDisableScan
}
}

class GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOnDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
}
}

class GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOffDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
}
}

// Test DPP with batch scan disabled by user for some reason, which can also mock the situation
// that scan is not transformable.
class GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan
Expand All @@ -737,3 +753,19 @@ class GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan
super.sparkConf.set(GlutenConfig.COLUMNAR_BATCHSCAN_ENABLED.key, "false")
}
}

class GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOnDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
}
}

class GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOffDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1064,10 +1064,14 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOn]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOnDisableScan]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffDisableScan]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOnDisableProject]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOffDisableProject]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOff]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOn]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOnDisableProject]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOffDisableProject]
enableSuite[GlutenExpressionsSchemaSuite]
enableSuite[GlutenExtraStrategiesSuite]
enableSuite[GlutenFileBasedDataSourceSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,22 @@ class GlutenDynamicPartitionPruningV1SuiteAEOffDisableScan
}
}

class GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOnDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
}
}

class GlutenDynamicPartitionPruningV1SuiteAEOffWSCGOffDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
}
}

// Test DPP with batch scan disabled by user for some reason, which can also mock the situation
// that scan is not transformable.
class GlutenDynamicPartitionPruningV2SuiteAEOnDisableScan
Expand All @@ -742,3 +758,19 @@ class GlutenDynamicPartitionPruningV2SuiteAEOffDisableScan
super.sparkConf.set(GlutenConfig.COLUMNAR_BATCHSCAN_ENABLED.key, "false")
}
}

class GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOnDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
}
}

class GlutenDynamicPartitionPruningV2SuiteAEOffWSCGOffDisableProject
extends GlutenDynamicPartitionPruningV2SuiteAEOff {
override def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.COLUMNAR_PROJECT_ENABLED.key, "false")
.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
}
}

0 comments on commit caf959e

Please sign in to comment.