Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
zzcclp committed Sep 10, 2024
1 parent 0d00c93 commit 9e2a6ae
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
".runtime_config.max_source_concatenate_bytes"
val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256

val GLUTEN_AQE_PROPAGATEEMPTY: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".aqe.propagate.empty.relation"

def affinityMode: String = {
SparkEnv.get.conf
.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.utils.PhysicalPlanSelector

import org.apache.spark.sql.SparkSession
Expand All @@ -29,24 +30,28 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ClickHouseBu
case class CHAQEPropagateEmptyRelation(session: SparkSession) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(session, plan) {
plan.transform {
case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, left, right, isNullAwareAntiJoin)
if (joinType == LeftAnti) && isNullAwareAntiJoin =>
right match {
case BroadcastQueryStageExec(_, plan: SparkPlan, _) =>
val columnarBroadcast = plan match {
case c: ColumnarBroadcastExchangeExec => c
case ReusedExchangeExec(_, c: ColumnarBroadcastExchangeExec) => c
}
val chBuildSideRelation = columnarBroadcast.relationFuture.get().value
chBuildSideRelation match {
case c: ClickHouseBuildSideRelation if c.hasNullKeyValues =>
LocalTableScanExec(bhj.output, Seq.empty)
case _ => bhj
}
case o => bhj
}
case other => other
if (!(session.conf.get(CHBackendSettings.GLUTEN_AQE_PROPAGATEEMPTY, "true").toBoolean)) {
plan
} else {
plan.transform {
case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, left, right, isNullAwareAntiJoin)
if (joinType == LeftAnti) && isNullAwareAntiJoin =>
right match {
case BroadcastQueryStageExec(_, plan: SparkPlan, _) =>
val columnarBroadcast = plan match {
case c: ColumnarBroadcastExchangeExec => c
case ReusedExchangeExec(_, c: ColumnarBroadcastExchangeExec) => c
}
val chBuildSideRelation = columnarBroadcast.relationFuture.get().value
chBuildSideRelation match {
case c: ClickHouseBuildSideRelation if c.hasNullKeyValues =>
LocalTableScanExec(bhj.output, Seq.empty)
case _ => bhj
}
case o => bhj
}
case other => other
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
"Avoid changing merge join to broadcast join if too many empty partitions on build plan")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
.exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,25 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
}
}

// because gluten use columnar format, which cannot execute to get rowIterator, then get the key
// null status
ignore(
GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") {}
testGluten("SPARK-32717: AQEOptimizer should respect excludedRules configuration") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
// This test is a copy of test(SPARK-32573), in order to test the configuration
// `spark.sql.adaptive.optimizer.excludedRules` works as expect.
"spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" -> "false"
) {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)")
val bhj = findTopLevelBroadcastHashJoin(plan)
assert(bhj.size == 1)
val join = findTopLevelBaseJoin(adaptivePlan)
// this is different compares to test(SPARK-32573) due to the rule
// `EliminateUnnecessaryJoin` has been excluded.
assert(join.nonEmpty)
checkNumLocalShuffleReads(adaptivePlan)
}
}

// EmptyRelation case
ignore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
.exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, Shuffl
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
Expand Down Expand Up @@ -799,10 +798,25 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
}
}

// because gluten use columnar format, which cannot execute to get rowIterator, then get the key
// null status
ignore(
GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") {}
testGluten("SPARK-32717: AQEOptimizer should respect excludedRules configuration") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
// This test is a copy of test(SPARK-32573), in order to test the configuration
// `spark.sql.adaptive.optimizer.excludedRules` works as expect.
"spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" -> "false"
) {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)")
val bhj = findTopLevelBroadcastHashJoin(plan)
assert(bhj.size == 1)
val join = findTopLevelBaseJoin(adaptivePlan)
// this is different compares to test(SPARK-32573) due to the rule
// `EliminateUnnecessaryJoin` has been excluded.
assert(join.nonEmpty)
checkNumLocalShuffleReads(adaptivePlan)
}
}

testGluten("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
.exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
.exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration")
.exclude("metrics of the shuffle read")
.exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE")
.exclude("SPARK-31220, SPARK-32056: repartition by range with AQE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,25 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
}
}

// because gluten use columnar format, which cannot execute to get rowIterator, then get the key
// null status
ignore(
GLUTEN_TEST + "SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") {}
testGluten("SPARK-32717: AQEOptimizer should respect excludedRules configuration") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
// This test is a copy of test(SPARK-32573), in order to test the configuration
// `spark.sql.adaptive.optimizer.excludedRules` works as expect.
"spark.gluten.sql.columnar.backend.ch.aqe.propagate.empty.relation" -> "false"
) {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)")
val bhj = findTopLevelBroadcastHashJoin(plan)
assert(bhj.size == 1)
val join = findTopLevelBaseJoin(adaptivePlan)
// this is different compares to test(SPARK-32573) due to the rule
// `EliminateUnnecessaryJoin` has been excluded.
assert(join.nonEmpty)
checkNumLocalShuffleReads(adaptivePlan)
}
}

// EmptyRelation case
ignore(
Expand Down

0 comments on commit 9e2a6ae

Please sign in to comment.