diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index c0dee707ef4f..44aeba021557 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteToDateExpresstionRule} -import org.apache.gluten.extension.columnar.AddTransformHintRule +import org.apache.gluten.extension.columnar.AddFallbackTagRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.sql.shims.SparkShimLoader @@ -146,7 +146,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { child match { case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) => - // For the validation phase of the AddTransformHintRule + // For the validation phase of the AddFallbackTagRule CHFilterExecTransformer(condition, child) case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) => // For the transform phase, the FileSourceScanExec is already transformed @@ -226,7 +226,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { // FIXME: The operation happens inside ReplaceSingleNode(). // Caller may not know it adds project on top of the shuffle. val project = TransformPreOverrides().apply( - AddTransformHintRule().apply( + AddFallbackTagRule().apply( ProjectExec(plan.child.output ++ projectExpressions, plan.child))) var newExprs = Seq[Expression]() for (i <- exprs.indices) { @@ -251,7 +251,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { // FIXME: The operation happens inside ReplaceSingleNode(). // Caller may not know it adds project on top of the shuffle. val project = TransformPreOverrides().apply( - AddTransformHintRule().apply( + AddFallbackTagRule().apply( ProjectExec(plan.child.output ++ projectExpressions, plan.child))) var newOrderings = Seq[SortOrder]() for (i <- orderings.indices) { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala index 873ecb8342a6..59c2d6494bdb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits +import org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits import org.apache.gluten.utils.PhysicalPlanSelector import org.apache.spark.sql.SparkSession @@ -61,7 +61,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend "columnar broadcast exchange is disabled or " + "columnar broadcast join is disabled") } else { - if (TransformHints.isNotTransformable(bhj)) { + if (FallbackTags.nonEmpty(bhj)) { ValidationResult.notOk("broadcast join is already tagged as not transformable") } else { val bhjTransformer = BackendsApiManager.getSparkPlanExecApiInstance @@ -83,8 +83,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend } } } - TransformHints.tagNotTransformable(bhj, isTransformable) - TransformHints.tagNotTransformable(exchange, isTransformable) + FallbackTags.add(bhj, isTransformable) + FallbackTags.add(exchange, isTransformable) case _ => // Skip. This might be the case that the exchange was already // executed in earlier stage @@ -116,7 +116,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl // Currently their doBroadcast() methods just propagate child's broadcast // payloads which is not right in speaking of columnar. if (!enableColumnarBroadcastJoin) { - TransformHints.tagNotTransformable( + FallbackTags.add( bhj, "columnar BroadcastJoin is not enabled in BroadcastHashJoinExec") } else { @@ -149,7 +149,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl case Some(exchange @ BroadcastExchangeExec(mode, child)) => isBhjTransformable.tagOnFallback(bhj) if (!isBhjTransformable.isValid) { - TransformHints.tagNotTransformable(exchange, isBhjTransformable) + FallbackTags.add(exchange, isBhjTransformable) } case None => // we are in AQE, find the hidden exchange @@ -182,7 +182,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl // to conform to the underlying exchange's type, columnar or vanilla exchange match { case BroadcastExchangeExec(mode, child) => - TransformHints.tagNotTransformable( + FallbackTags.add( bhj, "it's a materialized broadcast exchange or reused broadcast exchange") case ColumnarBroadcastExchangeExec(mode, child) => @@ -199,7 +199,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl } } catch { case e: UnsupportedOperationException => - TransformHints.tagNotTransformable( + FallbackTags.add( p, s"${e.getMessage}, original Spark plan is " + s"${p.getClass}(${p.children.toList.map(_.getClass)})") diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 582bf997fba1..e13ebd971ef5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -25,7 +25,7 @@ import org.apache.gluten.expression._ import org.apache.gluten.expression.ExpressionNames.{TRANSFORM_KEYS, TRANSFORM_VALUES} import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension._ -import org.apache.gluten.extension.columnar.TransformHints +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride import org.apache.gluten.sql.shims.SparkShimLoader @@ -371,7 +371,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val newChild = maybeAddAppendBatchesExec(projectTransformer) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1)) } else { - TransformHints.tagNotTransformable(shuffle, validationResult) + FallbackTags.add(shuffle, validationResult) shuffle.withNewChildren(child :: Nil) } case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && num > 1 => @@ -397,7 +397,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { projectTransformer } else { val project = ProjectExec(projectList, child) - TransformHints.tagNotTransformable(project, projectBeforeSortValidationResult) + FallbackTags.add(project, projectBeforeSortValidationResult) project } val sortOrder = SortOrder(projectBeforeSort.output.head, Ascending) @@ -410,7 +410,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) } else { - TransformHints.tagNotTransformable(shuffle, validationResult) + FallbackTags.add(shuffle, validationResult) shuffle.withNewChildren(child :: Nil) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index fcb9e983e76b..44a823834f92 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.exception.GlutenNotSupportException -import org.apache.gluten.extension.columnar.TransformHints +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.catalyst.expressions.Expression @@ -99,7 +99,7 @@ object ScanTransformerFactory { transformer } else { val newSource = batchScan.copy(runtimeFilters = transformer.runtimeFilters) - TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + FallbackTags.add(newSource, validationResult.reason.get) newSource } } else { @@ -109,7 +109,7 @@ object ScanTransformerFactory { if (validation) { throw new GlutenNotSupportException(s"Unsupported scan ${batchScan.scan}") } - TransformHints.tagNotTransformable(batchScan, "The scan in BatchScanExec is not supported.") + FallbackTags.add(batchScan, "The scan in BatchScanExec is not supported.") batchScan } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala index 0f5fc21aff87..afc29a51e19a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala @@ -37,7 +37,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] { requiredOrdering: Seq[SortOrder]): SparkPlan = { val newChild = SortExec(requiredOrdering, global = false, child = originalChild) if (!GlutenConfig.getConf.enableColumnarSort) { - TransformHints.tagNotTransformable(newChild, "columnar Sort is not enabled in SortExec") + FallbackTags.add(newChild, "columnar Sort is not enabled in SortExec") newChild } else { val newChildWithTransformer = @@ -50,7 +50,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] { if (validationResult.isValid) { newChildWithTransformer } else { - TransformHints.tagNotTransformable(newChild, validationResult) + FallbackTags.add(newChild, validationResult) newChild } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala index 4ee153173c5c..e334fcfbce88 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala @@ -239,11 +239,11 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP // Propagate fallback reason to vanilla SparkPlan glutenPlan.foreach { case _: GlutenPlan => - case p: SparkPlan if TransformHints.isNotTransformable(p) && p.logicalLink.isDefined => + case p: SparkPlan if FallbackTags.nonEmpty(p) && p.logicalLink.isDefined => originalPlan .find(_.logicalLink.exists(_.fastEquals(p.logicalLink.get))) - .filterNot(TransformHints.isNotTransformable) - .foreach(origin => TransformHints.tag(origin, TransformHints.getHint(p))) + .filterNot(FallbackTags.nonEmpty) + .foreach(origin => FallbackTags.tag(origin, FallbackTags.getTag(p))) case _ => } @@ -278,7 +278,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP ) { plan } else { - TransformHints.tagAllNotTransformable( + FallbackTags.addRecursively( vanillaSparkPlan, TRANSFORM_UNSUPPORTED(fallbackInfo.reason, appendReasonIfExists = false)) FallbackNode(vanillaSparkPlan) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala similarity index 86% rename from gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala index 7fb451057a2e..d34cb0df4e7e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.extension.{GlutenPlan, ValidationResult} -import org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits +import org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.sql.shims.SparkShimLoader @@ -45,19 +45,19 @@ import org.apache.spark.sql.types.StringType import org.apache.commons.lang3.exception.ExceptionUtils -sealed trait TransformHint { +sealed trait FallbackTag { val stacktrace: Option[String] = - if (TransformHints.DEBUG) { + if (FallbackTags.DEBUG) { Some(ExceptionUtils.getStackTrace(new Throwable())) } else None } case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists: Boolean = true) - extends TransformHint + extends FallbackTag -object TransformHints { - val TAG: TreeNodeTag[TransformHint] = - TreeNodeTag[TransformHint]("org.apache.gluten.transformhint") +object FallbackTags { + val TAG: TreeNodeTag[FallbackTag] = + TreeNodeTag[FallbackTag]("org.apache.gluten.FallbackTag") val DEBUG = false @@ -69,8 +69,8 @@ object TransformHints { * validation rule. So user should not consider the plan "transformable" unless all validation * rules are passed. */ - def isNotTransformable(plan: SparkPlan): Boolean = { - getHintOption(plan) match { + def nonEmpty(plan: SparkPlan): Boolean = { + getTagOption(plan) match { case Some(TRANSFORM_UNSUPPORTED(_, _)) => true case _ => false } @@ -82,10 +82,10 @@ object TransformHints { * within Gluten transformers. If false, the plan node will be guaranteed fallback to Vanilla plan * node while being implemented. */ - def maybeTransformable(plan: SparkPlan): Boolean = !isNotTransformable(plan) + def maybeOffloadable(plan: SparkPlan): Boolean = !nonEmpty(plan) - def tag(plan: SparkPlan, hint: TransformHint): Unit = { - val mergedHint = getHintOption(plan) + def tag(plan: SparkPlan, hint: FallbackTag): Unit = { + val mergedHint = getTagOption(plan) .map { case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason), originAppend) => hint match { @@ -117,33 +117,33 @@ object TransformHints { plan.unsetTagValue(TAG) } - def tagNotTransformable(plan: SparkPlan, validationResult: ValidationResult): Unit = { + def add(plan: SparkPlan, validationResult: ValidationResult): Unit = { if (!validationResult.isValid) { tag(plan, TRANSFORM_UNSUPPORTED(validationResult.reason)) } } - def tagNotTransformable(plan: SparkPlan, reason: String): Unit = { + def add(plan: SparkPlan, reason: String): Unit = { tag(plan, TRANSFORM_UNSUPPORTED(Some(reason))) } - def tagAllNotTransformable(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = { + def addRecursively(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = { plan.foreach { case _: GlutenPlan => // ignore case other => tag(other, hint) } } - def getHint(plan: SparkPlan): TransformHint = { - getHintOption(plan).getOrElse( + def getTag(plan: SparkPlan): FallbackTag = { + getTagOption(plan).getOrElse( throw new IllegalStateException("Transform hint tag not set in plan: " + plan.toString())) } - def getHintOption(plan: SparkPlan): Option[TransformHint] = { + def getTagOption(plan: SparkPlan): Option[FallbackTag] = { plan.getTagValue(TAG) } - implicit class EncodeTransformableTagImplicits(validationResult: ValidationResult) { + implicit class EncodeFallbackTagImplicits(validationResult: ValidationResult) { def tagOnFallback(plan: SparkPlan): Unit = { if (validationResult.isValid) { return @@ -157,7 +157,7 @@ object TransformHints { case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { if (GlutenConfig.getConf.enableAnsiMode) { - plan.foreach(TransformHints.tagNotTransformable(_, "does not support ansi mode")) + plan.foreach(FallbackTags.add(_, "does not support ansi mode")) } plan } @@ -179,11 +179,11 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] case plan: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin => if ((count + 1) >= optimizeLevel) return true plan.children.exists(existsMultiCodegens(_, count + 1)) - case other => false + case _ => false } - def tagNotTransformable(plan: SparkPlan): SparkPlan = { - TransformHints.tagNotTransformable(plan, "fallback multi codegens") + def addFallbackTag(plan: SparkPlan): SparkPlan = { + FallbackTags.add(plan, "fallback multi codegens") plan } @@ -200,35 +200,35 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] } } - def tagNotTransformableRecursive(plan: SparkPlan): SparkPlan = { + def addFallbackTagRecursive(plan: SparkPlan): SparkPlan = { plan match { case p: ShuffleExchangeExec => - tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens))) + addFallbackTag(p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens))) case p: BroadcastExchangeExec => - tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens))) + addFallbackTag(p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens))) case p: ShuffledHashJoinExec => - tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableRecursive))) + addFallbackTag(p.withNewChildren(p.children.map(addFallbackTagRecursive))) case p if !supportCodegen(p) => - p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens)) + p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens)) case p if isAQEShuffleReadExec(p) => - p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens)) + p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens)) case p: QueryStageExec => p - case p => tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableRecursive))) + case p => addFallbackTag(p.withNewChildren(p.children.map(addFallbackTagRecursive))) } } - def tagNotTransformableForMultiCodegens(plan: SparkPlan): SparkPlan = { + def tagOnFallbackForMultiCodegens(plan: SparkPlan): SparkPlan = { plan match { case plan if existsMultiCodegens(plan) => - tagNotTransformableRecursive(plan) + addFallbackTagRecursive(plan) case other => - other.withNewChildren(other.children.map(tagNotTransformableForMultiCodegens)) + other.withNewChildren(other.children.map(tagOnFallbackForMultiCodegens)) } } override def apply(plan: SparkPlan): SparkPlan = { if (physicalJoinOptimize) { - tagNotTransformableForMultiCodegens(plan) + tagOnFallbackForMultiCodegens(plan) } else plan } } @@ -272,13 +272,11 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { if (p.children.exists(_.output.isEmpty)) { // Some backends are not eligible to offload plan with zero-column input. // If any child have empty output, mark the plan and that child as UNSUPPORTED. - TransformHints.tagNotTransformable(p, "at least one of its children has empty output") + FallbackTags.add(p, "at least one of its children has empty output") p.children.foreach { child => if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { - TransformHints.tagNotTransformable( - child, - "at least one of its children has empty output") + FallbackTags.add(child, "at least one of its children has empty output") } } } @@ -291,8 +289,8 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { // The doValidate function will be called to check if the conversion is supported. // If false is returned or any unsupported exception is thrown, a row guard will // be added on the top of that plan to prevent actual conversion. -case class AddTransformHintRule() extends Rule[SparkPlan] { - import AddTransformHintRule._ +case class AddFallbackTagRule() extends Rule[SparkPlan] { + import AddFallbackTagRule._ private val glutenConf: GlutenConfig = GlutenConfig.getConf private val validator = Validators .builder() @@ -305,22 +303,15 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { .build() def apply(plan: SparkPlan): SparkPlan = { - addTransformableTags(plan) - } - - /** Inserts a transformable tag on top of those that are not supported. */ - private def addTransformableTags(plan: SparkPlan): SparkPlan = { - // Walk the tree with post-order - val out = plan.mapChildren(addTransformableTags) - addTransformableTag(out) - out + plan.foreachUp { case p => addFallbackTag(p) } + plan } - private def addTransformableTag(plan: SparkPlan): Unit = { + private def addFallbackTag(plan: SparkPlan): Unit = { val outcome = validator.validate(plan) outcome match { case Validator.Failed(reason) => - TransformHints.tagNotTransformable(plan, reason) + FallbackTags.add(plan, reason) return case Validator.Passed => } @@ -508,11 +499,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { ) transformer.doValidate().tagOnFallback(plan) case _ => - // Currently we assume a plan to be transformable by default. + // Currently we assume a plan to be offload-able by default. } } catch { case e @ (_: GlutenNotSupportException | _: UnsupportedOperationException) => - TransformHints.tagNotTransformable( + FallbackTags.add( plan, s"${e.getMessage}, original Spark plan is " + s"${plan.getClass}(${plan.children.toList.map(_.getClass)})") @@ -523,7 +514,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { } } -object AddTransformHintRule { +object AddFallbackTagRule { implicit private class ValidatorBuilderImplicits(builder: Validators.Builder) { /** @@ -561,9 +552,9 @@ object AddTransformHintRule { } } -case class RemoveTransformHintRule() extends Rule[SparkPlan] { +case class RemoveFallbackTagRule() extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - plan.foreach(TransformHints.untag) + plan.foreach(FallbackTags.untag) plan } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 1f6f840b5552..7a4222b5cb38 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -56,7 +56,7 @@ sealed trait OffloadSingleNode extends Logging { // Aggregation transformation. case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case plan if TransformHints.isNotTransformable(plan) => + case plan if FallbackTags.nonEmpty(plan) => plan case agg: HashAggregateExec => genHashAggregateExec(agg) @@ -72,7 +72,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { * the actually used plan for execution. */ private def genHashAggregateExec(plan: HashAggregateExec): SparkPlan = { - if (TransformHints.isNotTransformable(plan)) { + if (FallbackTags.nonEmpty(plan)) { return plan } @@ -92,7 +92,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { HashAggregateExecBaseTransformer.from(plan)() case _ => // If the child is not transformable, do not transform the agg. - TransformHints.tagNotTransformable(plan, "child output schema is empty") + FallbackTags.add(plan, "child output schema is empty") plan } } else { @@ -105,7 +105,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { // Exchange transformation. case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case p if TransformHints.isNotTransformable(p) => + case p if FallbackTags.nonEmpty(p) => p case s: ShuffleExchangeExec if (s.child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && @@ -124,7 +124,7 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = { - if (TransformHints.isNotTransformable(plan)) { + if (FallbackTags.nonEmpty(plan)) { logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.") return plan } @@ -291,11 +291,11 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { f } } - val addHint = AddTransformHintRule() + val addHint = AddFallbackTagRule() val newProjectList = projectExec.projectList.filterNot(containsInputFileRelatedExpr) val newProjectExec = ProjectExec(newProjectList, projectExec.child) addHint.apply(newProjectExec) - if (TransformHints.isNotTransformable(newProjectExec)) { + if (FallbackTags.nonEmpty(newProjectExec)) { // Project is still not transformable after remove `input_file_name` expressions. projectExec } else { @@ -305,7 +305,7 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { // /sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L506 val leafScans = findScanNodes(projectExec) assert(leafScans.size <= 1) - if (leafScans.isEmpty || TransformHints.isNotTransformable(leafScans(0))) { + if (leafScans.isEmpty || FallbackTags.nonEmpty(leafScans(0))) { // It means // 1. projectExec has `input_file_name` but no scan child. // 2. It has scan child node but the scan node fallback. @@ -326,12 +326,12 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { private def genProjectExec(projectExec: ProjectExec): SparkPlan = { if ( - TransformHints.isNotTransformable(projectExec) && + FallbackTags.nonEmpty(projectExec) && BackendsApiManager.getSettings.supportNativeInputFileRelatedExpr() && projectExec.projectList.exists(containsInputFileRelatedExpr) ) { tryOffloadProjectExecWithInputFileRelatedExprs(projectExec) - } else if (TransformHints.isNotTransformable(projectExec)) { + } else if (FallbackTags.nonEmpty(projectExec)) { projectExec } else { logDebug(s"Columnar Processing for ${projectExec.getClass} is currently supported.") @@ -366,7 +366,7 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { * the actually used plan for execution. */ private def genFilterExec(filter: FilterExec): SparkPlan = { - if (TransformHints.isNotTransformable(filter)) { + if (FallbackTags.nonEmpty(filter)) { return filter } @@ -375,7 +375,7 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { // Push down the left conditions in Filter into FileSourceScan. val newChild: SparkPlan = filter.child match { case scan @ (_: FileSourceScanExec | _: BatchScanExec) => - if (TransformHints.maybeTransformable(scan)) { + if (FallbackTags.maybeOffloadable(scan)) { val newScan = FilterHandler.pushFilterToScan(filter.condition, scan) newScan match { @@ -410,7 +410,7 @@ object OffloadOthers { def doReplace(p: SparkPlan): SparkPlan = { val plan = p - if (TransformHints.isNotTransformable(plan)) { + if (FallbackTags.nonEmpty(plan)) { return plan } plan match { @@ -561,7 +561,7 @@ object OffloadOthers { transformer } else { logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - TransformHints.tagNotTransformable(plan, validationResult.reason.get) + FallbackTags.add(plan, validationResult.reason.get) plan } case plan: BatchScanExec => @@ -576,7 +576,7 @@ object OffloadOthers { return hiveTableScanExecTransformer } logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - TransformHints.tagNotTransformable(plan, validateResult.reason.get) + FallbackTags.add(plan, validateResult.reason.get) plan case other => throw new GlutenNotSupportException(s"${other.getClass.toString} is not supported.") diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala index ce94626d999d..d32de32ebb32 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala @@ -76,7 +76,7 @@ object NativeWriteFilesWithSkippingSortAndProject extends Logging { } else { // If we can not transform the project, then we fallback to origin plan which means // we also retain the sort operator. - TransformHints.tagNotTransformable(p, validationResult) + FallbackTags.add(p, validationResult) None } case _ => None diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index 3d7509abc631..519db966c225 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -133,7 +133,7 @@ class EnumeratedApplier(session: SparkSession) // when columnar table cache is enabled. (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveTransformHintRule() + (_: SparkSession) => RemoveFallbackTagRule() ) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index 34bcf3220daa..03b2b66b09b3 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -108,7 +108,7 @@ class HeuristicApplier(session: SparkSession) (_: SparkSession) => FallbackEmptySchemaRelation(), (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark), (_: SparkSession) => RewriteSparkPlanRulesManager(), - (_: SparkSession) => AddTransformHintRule() + (_: SparkSession) => AddFallbackTagRule() ) ::: List((_: SparkSession) => TransformPreOverrides()) ::: List( @@ -155,7 +155,7 @@ class HeuristicApplier(session: SparkSession) // when columnar table cache is enabled. (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveTransformHintRule() + (_: SparkSession) => RemoveFallbackTagRule() ) } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala index 34fe34f3f3fa..2abd4d7d4807 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.rewrite -import org.apache.gluten.extension.columnar.{AddTransformHintRule, TransformHint, TransformHints} +import org.apache.gluten.extension.columnar.{AddFallbackTagRule, FallbackTag, FallbackTags} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD @@ -49,7 +49,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] extends Rule[SparkPlan] { private def mayNeedRewrite(plan: SparkPlan): Boolean = { - TransformHints.maybeTransformable(plan) && { + FallbackTags.maybeOffloadable(plan) && { plan match { case _: SortExec => true case _: TakeOrderedAndProjectExec => true @@ -67,14 +67,14 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } } - private def getTransformHintBack(rewrittenPlan: SparkPlan): Option[TransformHint] = { + private def getFallbackTagBack(rewrittenPlan: SparkPlan): Option[FallbackTag] = { // The rewritten plan may contain more nodes than origin, for now it should only be // `ProjectExec`. val target = rewrittenPlan.collect { case p if !p.isInstanceOf[ProjectExec] && !p.isInstanceOf[RewrittenNodeWall] => p } assert(target.size == 1) - TransformHints.getHintOption(target.head) + FallbackTags.getTagOption(target.head) } private def applyRewriteRules(origin: SparkPlan): (SparkPlan, Option[String]) = { @@ -93,7 +93,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } override def apply(plan: SparkPlan): SparkPlan = { - val addHint = AddTransformHintRule() + val addHint = AddFallbackTagRule() plan.transformUp { case origin if mayNeedRewrite(origin) => // Add a wall to avoid transforming unnecessary nodes. @@ -104,18 +104,18 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] // Note, it is not expected, but it happens in CH backend when pulling out // aggregate. // TODO: Fix the exception and remove this branch - TransformHints.tagNotTransformable(origin, error.get) + FallbackTags.add(origin, error.get) origin } else if (withWall.fastEquals(rewrittenPlan)) { // Return origin if the rewrite rules do nothing. - // We do not add tag and leave it to the outside `AddTransformHintRule`. + // We do not add tag and leave it to the outside `AddFallbackTagRule`. origin } else { addHint.apply(rewrittenPlan) - val hint = getTransformHintBack(rewrittenPlan) + val hint = getFallbackTagBack(rewrittenPlan) if (hint.isDefined) { // If the rewritten plan is still not transformable, return the original plan. - TransformHints.tag(origin, hint.get) + FallbackTags.tag(origin, hint.get) origin } else { rewrittenPlan.transformUp { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 2103537500aa..959bf808aba4 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.validator import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi} import org.apache.gluten.expression.ExpressionUtils -import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution._ @@ -108,8 +108,8 @@ object Validators { private object FallbackByHint extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { - if (TransformHints.isNotTransformable(plan)) { - val hint = TransformHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] + if (FallbackTags.nonEmpty(plan)) { + val hint = FallbackTags.getTag(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] return fail(hint.reason.getOrElse("Reason not recorded")) } pass() diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index 721a30eb4f40..d41dce882602 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.sql.SparkSession @@ -57,8 +57,8 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio val validationLogLevel = glutenConfig.validationLogLevel plan.foreachUp { case _: GlutenPlan => // ignore - case p: SparkPlan if TransformHints.isNotTransformable(p) => - TransformHints.getHint(p) match { + case p: SparkPlan if FallbackTags.nonEmpty(p) => + FallbackTags.getTag(p) match { case TRANSFORM_UNSUPPORTED(Some(reason), append) => logFallbackReason(validationLogLevel, p.nodeName, reason) // With in next round stage in AQE, the physical plan would be a new instance that diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala index fbdbeadba886..450b88163afc 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer, TransformSupport, WholeStageTransformer} import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects -import org.apache.gluten.extension.columnar.AddTransformHintRule +import org.apache.gluten.extension.columnar.AddFallbackTagRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager @@ -47,7 +47,7 @@ trait GlutenFormatWriterInjectsBase extends GlutenFormatWriterInjects { val rules = List( RewriteSparkPlanRulesManager(), - AddTransformHintRule(), + AddFallbackTagRule(), TransformPreOverrides() ) val transformed = rules.foldLeft(plan) { case (latestPlan, rule) => rule.apply(latestPlan) } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index fff883d49e86..b9c9d8a270bf 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -124,10 +124,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - TransformHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = TransformHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 7976288dd4ef..8ce0af8df051 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -125,10 +125,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - TransformHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = TransformHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 7976288dd4ef..8ce0af8df051 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -125,10 +125,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - TransformHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = TransformHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert(