Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 21, 2024
1 parent 0331886 commit f10c306
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule}
import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
Expand All @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule

class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
Expand Down Expand Up @@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
val reason = FallbackTags.get(newPlan).reason()
if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
assert(
reason.contains("fake reason") &&
reason.contains("at least one of its children has empty output"))
} else {
assert(reason.contains("fake reason"))
}
assert(
reason.contains("fake reason") &&
reason.contains("at least one of its children has empty output"))
}

testGluten("test enabling/disabling Gluten at thread level") {
Expand Down Expand Up @@ -173,6 +169,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
thread.join(10000)
}
}

private object FallbackStrategiesSuite {
def newRuleApplier(
spark: SparkSession,
Expand All @@ -189,6 +186,25 @@ private object FallbackStrategiesSuite {
)
}

// TODO: Generalize the code among shim versions.
case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case p =>
if (p.children.exists(_.output.isEmpty)) {
// Some backends are not eligible to offload plan with zero-column input.
// If any child have empty output, mark the plan and that child as UNSUPPORTED.
FallbackTags.add(p, "at least one of its children has empty output")
p.children.foreach {
child =>
if (child.output.isEmpty) {
FallbackTags.add(child, "at least one of its children has empty output")
}
}
}
p
}
}

case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.HashAggregateExecBaseTransformer
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait}
import org.apache.spark.sql.execution.aggregate.{ObjectHashAggregateExec, SortAggregateExec}
Expand Down Expand Up @@ -99,7 +99,7 @@ class GlutenReplaceHashWithSortAggSuite
|)
|GROUP BY key
""".stripMargin
if (BackendsApiManager.getSettings.mergeTwoPhasesHashBaseAggregateIfNeed()) {
if (BackendTestUtils.isCHBackendLoaded()) {
checkAggs(query, 1, 0, 1, 0)
} else {
checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule}
import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
Expand All @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule

class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
Expand Down Expand Up @@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
val reason = FallbackTags.get(newPlan).reason()
if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
assert(
reason.contains("fake reason") &&
reason.contains("at least one of its children has empty output"))
} else {
assert(reason.contains("fake reason"))
}
assert(
reason.contains("fake reason") &&
reason.contains("at least one of its children has empty output"))
}

testGluten("test enabling/disabling Gluten at thread level") {
Expand Down Expand Up @@ -190,6 +186,25 @@ private object FallbackStrategiesSuite {
)
}

// TODO: Generalize the code among shim versions.
case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case p =>
if (p.children.exists(_.output.isEmpty)) {
// Some backends are not eligible to offload plan with zero-column input.
// If any child have empty output, mark the plan and that child as UNSUPPORTED.
FallbackTags.add(p, "at least one of its children has empty output")
p.children.foreach {
child =>
if (child.output.isEmpty) {
FallbackTags.add(child, "at least one of its children has empty output")
}
}
}
p
}
}

case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.spark.sql.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule}
import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule}
import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
Expand All @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule

class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
import FallbackStrategiesSuite._
Expand Down Expand Up @@ -134,13 +134,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
val reason = FallbackTags.get(newPlan).reason()
if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
assert(
reason.contains("fake reason") &&
reason.contains("at least one of its children has empty output"))
} else {
assert(reason.contains("fake reason"))
}
assert(
reason.contains("fake reason") &&
reason.contains("at least one of its children has empty output"))
}

testGluten("test enabling/disabling Gluten at thread level") {
Expand Down Expand Up @@ -191,6 +187,25 @@ private object FallbackStrategiesSuite {
)
}

// TODO: Generalize the code among shim versions.
case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case p =>
if (p.children.exists(_.output.isEmpty)) {
// Some backends are not eligible to offload plan with zero-column input.
// If any child have empty output, mark the plan and that child as UNSUPPORTED.
FallbackTags.add(p, "at least one of its children has empty output")
p.children.foreach {
child =>
if (child.output.isEmpty) {
FallbackTags.add(child, "at least one of its children has empty output")
}
}
}
p
}
}

case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
Expand Down

0 comments on commit f10c306

Please sign in to comment.