From d7488e5630dc342fe073373fa2023952a4f1ba58 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 21 Aug 2024 16:53:50 +0800 Subject: [PATCH] [GLUTEN-6950][CORE] Move specific rules into backend modules (#6953) Closes #6950 --- .../backendsapi/clickhouse/CHBackend.scala | 2 - .../backendsapi/clickhouse/CHRuleApi.scala | 2 - .../CommonSubexpressionEliminateRule.scala | 3 +- .../CountDistinctWithoutExpand.scala | 0 .../MergeTwoPhasesHashBaseAggregate.scala | 10 +- .../RewriteDateTimestampComparisonRule.scala | 2 - .../RewriteToDateExpresstionRule.scala | 0 .../backendsapi/velox/VeloxBackend.scala | 49 +------ .../backendsapi/velox/VeloxRuleApi.scala | 5 +- .../extension/EmptySchemaWorkaround.scala | 131 ++++++++++++++++++ .../backendsapi/BackendSettingsApi.scala | 10 -- .../extension/columnar/FallbackRules.scala | 55 -------- .../columnar/validator/Validators.scala | 3 +- .../execution/FallbackStrategiesSuite.scala | 34 +++-- .../GlutenReplaceHashWithSortAggSuite.scala | 4 +- .../execution/FallbackStrategiesSuite.scala | 33 +++-- .../execution/FallbackStrategiesSuite.scala | 33 +++-- 17 files changed, 215 insertions(+), 161 deletions(-) rename {gluten-core => backends-clickhouse}/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala (98%) rename {gluten-core => backends-clickhouse}/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala (100%) rename gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala => backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala (94%) rename {gluten-core => backends-clickhouse}/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala (99%) rename {gluten-core => backends-clickhouse}/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala (100%) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 3c151df7e144..86a69f842280 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -380,8 +380,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { GlutenConfig.getConf.enableNativeWriter.getOrElse(false) } - override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true - override def supportCartesianProductExec(): Boolean = true override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index f4a7522d30c2..fb5147157d94 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -64,10 +64,8 @@ private object CHRuleApi { injector.injectTransform(_ => RemoveTransitions) injector.injectTransform(c => FallbackOnANSIMode.apply(c.session)) injector.injectTransform(c => FallbackMultiCodegens.apply(c.session)) - injector.injectTransform(c => PlanOneRowRelation.apply(c.session)) injector.injectTransform(_ => RewriteSubqueryBroadcast()) injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session)) - injector.injectTransform(_ => FallbackEmptySchemaRelation()) injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session)) injector.injectTransform(_ => RewriteSparkPlanRulesManager()) injector.injectTransform(_ => AddFallbackTagRule()) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala similarity index 98% rename from gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala rename to backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala index 29199eb0ed35..a3b74366fc7b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala @@ -21,8 +21,7 @@ import org.apache.gluten.GlutenConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala similarity index 100% rename from gluten-core/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala rename to backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala similarity index 94% rename from gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala rename to backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala index e19cd09a01a3..43adf27b4eb1 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala @@ -14,10 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.extension.columnar +package org.apache.gluten.extension import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Final, Partial} @@ -39,7 +38,7 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S val columnarConf: GlutenConfig = GlutenConfig.getConf val scanOnly: Boolean = columnarConf.enableScanOnly val enableColumnarHashAgg: Boolean = !scanOnly && columnarConf.enableColumnarHashAgg - val replaceSortAggWithHashAgg = BackendsApiManager.getSettings.replaceSortAggWithHashAgg + val replaceSortAggWithHashAgg: Boolean = GlutenConfig.getConf.forceToUseHashAgg private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg: BaseAggregateExec): Boolean = { // TODO: now it can not support to merge agg which there are the filters in the aggregate exprs. @@ -57,10 +56,7 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S } override def apply(plan: SparkPlan): SparkPlan = { - if ( - !enableColumnarHashAgg || !BackendsApiManager.getSettings - .mergeTwoPhasesHashBaseAggregateIfNeed() - ) { + if (!enableColumnarHashAgg) { plan } else { plan.transformDown { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala similarity index 99% rename from gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala rename to backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala index ec1106955067..ea92ddec2c8a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala @@ -27,8 +27,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import java.lang.IllegalArgumentException - // For readable, people usually convert a unix timestamp into date, and compare it with another // date. For example // select * from table where '2023-11-02' >= from_unixtime(unix_timestamp, 'yyyy-MM-dd') diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala similarity index 100% rename from gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala rename to backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 21175f20eb64..065adf338c19 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -28,12 +28,10 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFo import org.apache.gluten.utils._ import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Pi, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SparkVersion, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Count, Sum} +import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Lag, Lead, NamedExpression, NthValue, NTile, PercentRank, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary, SpecifiedWindowFrame} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile} import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -443,49 +441,6 @@ object VeloxBackendSettings extends BackendSettingsApi { } } - /** - * Check whether a plan needs to be offloaded even though they have empty input schema, e.g, - * Sum(1), Count(1), rand(), etc. - * @param plan: - * The Spark plan to check. - */ - private def mayNeedOffload(plan: SparkPlan): Boolean = { - def checkExpr(expr: Expression): Boolean = { - expr match { - // Block directly falling back the below functions by FallbackEmptySchemaRelation. - case alias: Alias => checkExpr(alias.child) - case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: EulerNumber | _: Pi | - _: SparkVersion => - true - case _ => false - } - } - - plan match { - case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty => - // Check Sum(Literal) or Count(Literal). - exec.aggregateExpressions.forall( - expression => { - val aggFunction = expression.aggregateFunction - aggFunction match { - case Sum(Literal(_, _), _) => true - case Count(Seq(Literal(_, _))) => true - case _ => false - } - }) - case p: ProjectExec if p.projectList.nonEmpty => - p.projectList.forall(checkExpr(_)) - case _ => - false - } - } - - override def fallbackOnEmptySchema(plan: SparkPlan): Boolean = { - // Count(1) and Sum(1) are special cases that Velox backend can handle. - // Do not fallback it and its children in the first place. - !mayNeedOffload(plan) - } - override def fallbackAggregateWithEmptyOutputChild(): Boolean = true override def recreateJoinExecOnFallback(): Boolean = true diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 645407be8be5..f152da885887 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -18,7 +18,8 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.backendsapi.RuleApi import org.apache.gluten.datasource.ArrowConvertorRule -import org.apache.gluten.extension._ +import org.apache.gluten.extension.{ArrowScanReplaceRule, BloomFilterMightContainJointRewriteRule, CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule} +import org.apache.gluten.extension.EmptySchemaWorkaround.{FallbackEmptySchemaRelation, PlanOneRowRelation} import org.apache.gluten.extension.columnar._ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides} import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform @@ -61,7 +62,6 @@ private object VeloxRuleApi { injector.injectTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session)) injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session)) injector.injectTransform(_ => FallbackEmptySchemaRelation()) - injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session)) injector.injectTransform(_ => RewriteSparkPlanRulesManager()) injector.injectTransform(_ => AddFallbackTagRule()) injector.injectTransform(_ => TransformPreOverrides()) @@ -103,7 +103,6 @@ private object VeloxRuleApi { injector.inject(_ => RewriteSubqueryBroadcast()) injector.inject(c => BloomFilterMightContainJointRewriteRule.apply(c.session)) injector.inject(c => ArrowScanReplaceRule.apply(c.session)) - injector.inject(c => MergeTwoPhasesHashBaseAggregate.apply(c.session)) // Gluten RAS: The RAS rule. injector.inject(c => EnumeratedTransform(c.session, c.outputsColumnar)) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala new file mode 100644 index 000000000000..3f34e7fc262d --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/EmptySchemaWorkaround.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.extension.columnar.FallbackTags + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, EulerNumber, Expression, Literal, MakeYMInterval, Pi, Rand, SparkPartitionID, SparkVersion, Uuid} +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Sum} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{ProjectExec, RDDScanExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.datasources.WriteFilesExec +import org.apache.spark.sql.types.StringType + +/** Rules to make Velox backend work correctly with query plans that have empty output schemas. */ +object EmptySchemaWorkaround { + + /** + * This rule plans [[RDDScanExec]] with a fake schema to make gluten work, because gluten does not + * support empty output relation, see [[FallbackEmptySchemaRelation]]. + */ + case class PlanOneRowRelation(spark: SparkSession) extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if (!GlutenConfig.getConf.enableOneRowRelationColumnar) { + return plan + } + + plan.transform { + // We should make sure the output does not change, e.g. + // Window + // OneRowRelation + case u: UnaryExecNode + if u.child.isInstanceOf[RDDScanExec] && + u.child.asInstanceOf[RDDScanExec].name == "OneRowRelation" && + u.outputSet != u.child.outputSet => + val rdd = spark.sparkContext.parallelize(InternalRow(null) :: Nil, 1) + val attr = AttributeReference("fake_column", StringType)() + u.withNewChildren(RDDScanExec(attr :: Nil, rdd, "OneRowRelation") :: Nil) + } + } + } + + /** + * FIXME To be removed: Since Velox backend is the only one to use the strategy, and we already + * support offloading zero-column batch in ColumnarBatchInIterator via PR #3309. + * + * We'd make sure all Velox operators be able to handle zero-column input correctly then remove + * the rule together with [[PlanOneRowRelation]]. + */ + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (fallbackOnEmptySchema(p)) { + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + } + p + } + + private def fallbackOnEmptySchema(plan: SparkPlan): Boolean = { + // Count(1) and Sum(1) are special cases that Velox backend can handle. + // Do not fallback it and its children in the first place. + !mayNeedOffload(plan) + } + + /** + * Check whether a plan needs to be offloaded even though they have empty input schema, e.g, + * Sum(1), Count(1), rand(), etc. + * @param plan: + * The Spark plan to check. + * + * Since https://github.com/apache/incubator-gluten/pull/2749. + */ + private def mayNeedOffload(plan: SparkPlan): Boolean = { + def checkExpr(expr: Expression): Boolean = { + expr match { + // Block directly falling back the below functions by FallbackEmptySchemaRelation. + case alias: Alias => checkExpr(alias.child) + case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: EulerNumber | + _: Pi | _: SparkVersion => + true + case _ => false + } + } + + plan match { + case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty => + // Check Sum(Literal) or Count(Literal). + exec.aggregateExpressions.forall( + expression => { + val aggFunction = expression.aggregateFunction + aggFunction match { + case Sum(Literal(_, _), _) => true + case Count(Seq(Literal(_, _))) => true + case _ => false + } + }) + case p: ProjectExec if p.projectList.nonEmpty => + p.projectList.forall(checkExpr(_)) + case _ => + false + } + } + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index c9a0301b8dc6..c9205bae9d8f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -69,7 +69,6 @@ trait BackendSettingsApi { case _ => false } def supportStructType(): Boolean = false - def fallbackOnEmptySchema(plan: SparkPlan): Boolean = false // Whether to fallback aggregate at the same time if its empty-output child is fallen back. def fallbackAggregateWithEmptyOutputChild(): Boolean = false @@ -90,12 +89,6 @@ trait BackendSettingsApi { def excludeScanExecFromCollapsedStage(): Boolean = false def rescaleDecimalArithmetic: Boolean = false - /** - * Whether to replace sort agg with hash agg., e.g., sort agg will be used in spark's planning for - * string type input. - */ - def replaceSortAggWithHashAgg: Boolean = GlutenConfig.getConf.forceToUseHashAgg - /** Get the config prefix for each backend */ def getBackendConfigPrefix: String @@ -147,9 +140,6 @@ trait BackendSettingsApi { def supportSampleExec(): Boolean = false - /** Merge two phases hash based aggregate if need */ - def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = false - def supportColumnarArrowUdf(): Boolean = false def generateHdfsConfForLibhdfs(): Boolean = false diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala index f9eaa4179c67..6b043fbce269 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala @@ -27,8 +27,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} import org.apache.spark.sql.execution._ @@ -41,7 +39,6 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec} import org.apache.spark.sql.execution.window.{WindowExec, WindowGroupLimitExecShim} import org.apache.spark.sql.hive.HiveTableScanExecTransformer -import org.apache.spark.sql.types.StringType import org.apache.commons.lang3.exception.ExceptionUtils @@ -241,58 +238,6 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] } } -/** - * This rule plans [[RDDScanExec]] with a fake schema to make gluten work, because gluten does not - * support empty output relation, see [[FallbackEmptySchemaRelation]]. - */ -case class PlanOneRowRelation(spark: SparkSession) extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = { - if (!GlutenConfig.getConf.enableOneRowRelationColumnar) { - return plan - } - - plan.transform { - // We should make sure the output does not change, e.g. - // Window - // OneRowRelation - case u: UnaryExecNode - if u.child.isInstanceOf[RDDScanExec] && - u.child.asInstanceOf[RDDScanExec].name == "OneRowRelation" && - u.outputSet != u.child.outputSet => - val rdd = spark.sparkContext.parallelize(InternalRow(null) :: Nil, 1) - val attr = AttributeReference("fake_column", StringType)() - u.withNewChildren(RDDScanExec(attr :: Nil, rdd, "OneRowRelation") :: Nil) - } - } -} - -/** - * FIXME To be removed: Since Velox backend is the only one to use the strategy, and we already - * support offloading zero-column batch in ColumnarBatchInIterator via PR #3309. - * - * We'd make sure all Velox operators be able to handle zero-column input correctly then remove the - * rule together with [[PlanOneRowRelation]]. - */ -case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { - case p => - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(p)) { - if (p.children.exists(_.output.isEmpty)) { - // Some backends are not eligible to offload plan with zero-column input. - // If any child have empty output, mark the plan and that child as UNSUPPORTED. - FallbackTags.add(p, "at least one of its children has empty output") - p.children.foreach { - child => - if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { - FallbackTags.add(child, "at least one of its children has empty output") - } - } - } - } - p - } -} - // This rule will try to convert a plan into plan transformer. // The doValidate function will be called to check if the conversion is supported. // If false is returned or any unsupported exception is thrown, a row guard will diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index a85cb163ceaa..f1cb4792383b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -143,8 +143,6 @@ object Validators { case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => fail(p) case p: WriteFilesExec if !settings.enableNativeWriteFiles() => fail(p) - case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg => - fail(p) case p: CartesianProductExec if !settings.supportCartesianProductExec() => fail(p) case p: TakeOrderedAndProjectExec if !settings.supportColumnarShuffleExec() => fail(p) case _ => pass() @@ -162,6 +160,7 @@ object Validators { case p: FilterExec if !conf.enableColumnarFilter => fail(p) case p: UnionExec if !conf.enableColumnarUnion => fail(p) case p: ExpandExec if !conf.enableColumnarExpand => fail(p) + case p: SortAggregateExec if !conf.forceToUseHashAgg => fail(p) case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin => fail(p) case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p) case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange => fail(p) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 1ce0025f2944..a4da5c127c5f 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule} +import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule} import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.rules.Rule class FallbackStrategiesSuite extends GlutenSQLTestsTrait { import FallbackStrategiesSuite._ @@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) val reason = FallbackTags.get(newPlan).reason() - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { - assert( - reason.contains("fake reason") && - reason.contains("at least one of its children has empty output")) - } else { - assert(reason.contains("fake reason")) - } + assert( + reason.contains("fake reason") && + reason.contains("at least one of its children has empty output")) } testGluten("test enabling/disabling Gluten at thread level") { @@ -173,6 +169,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { thread.join(10000) } } + private object FallbackStrategiesSuite { def newRuleApplier( spark: SparkSession, @@ -189,6 +186,25 @@ private object FallbackStrategiesSuite { ) } + // TODO: Generalize the code among shim versions. + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + p + } + } + case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala index 92e6fee97ea9..f394b4687d3d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala @@ -16,8 +16,8 @@ */ package org.apache.spark.sql.execution -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.HashAggregateExecBaseTransformer +import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait} import org.apache.spark.sql.execution.aggregate.{ObjectHashAggregateExec, SortAggregateExec} @@ -99,7 +99,7 @@ class GlutenReplaceHashWithSortAggSuite |) |GROUP BY key """.stripMargin - if (BackendsApiManager.getSettings.mergeTwoPhasesHashBaseAggregateIfNeed()) { + if (BackendTestUtils.isCHBackendLoaded()) { checkAggs(query, 1, 0, 1, 0) } else { checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 3acc9c4b39aa..a4da5c127c5f 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule} +import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule} import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.rules.Rule class FallbackStrategiesSuite extends GlutenSQLTestsTrait { import FallbackStrategiesSuite._ @@ -133,13 +133,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) val reason = FallbackTags.get(newPlan).reason() - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { - assert( - reason.contains("fake reason") && - reason.contains("at least one of its children has empty output")) - } else { - assert(reason.contains("fake reason")) - } + assert( + reason.contains("fake reason") && + reason.contains("at least one of its children has empty output")) } testGluten("test enabling/disabling Gluten at thread level") { @@ -190,6 +186,25 @@ private object FallbackStrategiesSuite { ) } + // TODO: Generalize the code among shim versions. + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + p + } + } + case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index bcc4e829b535..bbdeebfe6a13 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackEmptySchemaRelation, FallbackTags, RemoveFallbackTagRule} +import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, FallbackTags, RemoveFallbackTagRule} import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -31,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.rules.Rule class FallbackStrategiesSuite extends GlutenSQLTestsTrait { import FallbackStrategiesSuite._ @@ -134,13 +134,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) val reason = FallbackTags.get(newPlan).reason() - if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { - assert( - reason.contains("fake reason") && - reason.contains("at least one of its children has empty output")) - } else { - assert(reason.contains("fake reason")) - } + assert( + reason.contains("fake reason") && + reason.contains("at least one of its children has empty output")) } testGluten("test enabling/disabling Gluten at thread level") { @@ -191,6 +187,25 @@ private object FallbackStrategiesSuite { ) } + // TODO: Generalize the code among shim versions. + case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case p => + if (p.children.exists(_.output.isEmpty)) { + // Some backends are not eligible to offload plan with zero-column input. + // If any child have empty output, mark the plan and that child as UNSUPPORTED. + FallbackTags.add(p, "at least one of its children has empty output") + p.children.foreach { + child => + if (child.output.isEmpty) { + FallbackTags.add(child, "at least one of its children has empty output") + } + } + } + p + } + } + case class LeafOp(override val supportsColumnar: Boolean = false) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty