From 9275022f2b28fb66c99cf9e4bb5058f63e118f87 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 21 Aug 2024 13:18:54 +0800 Subject: [PATCH] fixup --- .../execution/FallbackStrategiesSuite.scala | 34 ++++++++++++++----- .../GlutenReplaceHashWithSortAggSuite.scala | 4 +-- .../execution/FallbackStrategiesSuite.scala | 33 +++++++++++++----- .../execution/FallbackStrategiesSuite.scala | 33 +++++++++++++----- 4 files changed, 75 insertions(+), 29 deletions(-) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 1ce0025f29449..a4da5c127c5ff 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule} +import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule} import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.rules.Rule class FallbackStrategiesSuite extends GlutenSQLTestsTrait { import FallbackStrategiesSuite._ @@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) val reason = FallbackTags.get(newPlan).reason() - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { - assert( - reason.contains("fake reason") && - reason.contains("at least one of its children has empty output")) - } else { - assert(reason.contains("fake reason")) - } + assert( + reason.contains("fake reason") && + reason.contains("at least one of its children has empty output")) } testGluten("test enabling/disabling Gluten at thread level") { @@ -173,6 +169,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { thread.join(10000) } } + private object FallbackStrategiesSuite { def newRuleApplier( spark: SparkSession, @@ -189,6 +186,25 @@ private object FallbackStrategiesSuite { ) } + // TODO: Generalize the code among shim versions. + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + p + } + } + case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala index 92e6fee97ea93..f394b4687d3d5 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala @@ -16,8 +16,8 @@ */ package org.apache.spark.sql.execution -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.HashAggregateExecBaseTransformer +import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait} import org.apache.spark.sql.execution.aggregate.{ObjectHashAggregateExec, SortAggregateExec} @@ -99,7 +99,7 @@ class GlutenReplaceHashWithSortAggSuite |) |GROUP BY key """.stripMargin - if (BackendsApiManager.getSettings.mergeTwoPhasesHashBaseAggregateIfNeed()) { + if (BackendTestUtils.isCHBackendLoaded()) { checkAggs(query, 1, 0, 1, 0) } else { checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 3acc9c4b39aae..a4da5c127c5ff 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule} +import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule} import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.rules.Rule class FallbackStrategiesSuite extends GlutenSQLTestsTrait { import FallbackStrategiesSuite._ @@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) val reason = FallbackTags.get(newPlan).reason() - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { - assert( - reason.contains("fake reason") && - reason.contains("at least one of its children has empty output")) - } else { - assert(reason.contains("fake reason")) - } + assert( + reason.contains("fake reason") && + reason.contains("at least one of its children has empty output")) } testGluten("test enabling/disabling Gluten at thread level") { @@ -190,6 +186,25 @@ private object FallbackStrategiesSuite { ) } + // TODO: Generalize the code among shim versions. + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + p + } + } + case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index bcc4e829b535e..bbdeebfe6a134 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule} +import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule} import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.rules.Rule class FallbackStrategiesSuite extends GlutenSQLTestsTrait { import FallbackStrategiesSuite._ @@ -134,13 +134,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) val reason = FallbackTags.get(newPlan).reason() - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { - assert( - reason.contains("fake reason") && - reason.contains("at least one of its children has empty output")) - } else { - assert(reason.contains("fake reason")) - } + assert( + reason.contains("fake reason") && + reason.contains("at least one of its children has empty output")) } testGluten("test enabling/disabling Gluten at thread level") { @@ -191,6 +187,25 @@ private object FallbackStrategiesSuite { ) } + // TODO: Generalize the code among shim versions. + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + p + } + } + case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty