diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index ffbb393bef172..431a751dbf5f9 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -70,6 +70,7 @@ private object VeloxRuleApi { injector.injectTransform(c => RewriteTransformer.apply(c.session)) injector.injectTransform(_ => PushDownFilterToScan) injector.injectTransform(_ => PushDownInputFileExpression.PostOffload) + injector.injectTransform(_ => ReplaceDeltaTransformer()) injector.injectTransform(_ => EnsureLocalSortRequirements) injector.injectTransform(_ => EliminateLocalSort) injector.injectTransform(_ => CollapseProjectExecTransformer) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala index cc8da0b59441c..a5bba46dc6051 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala @@ -290,10 +290,11 @@ case class AddFallbackTagRule() extends Rule[SparkPlan] { case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => HiveTableScanExecTransformer.validate(plan).tagOnFallback(plan) case plan: ProjectExec => - val transformer = ProjectTransformerFactory.createProjectTransformer(plan) + val transformer = ProjectExecTransformer(plan.projectList, plan.child) transformer.doValidate().tagOnFallback(plan) case plan: FilterExec => - val transformer = FilterTransformerFactory.createFilterTransformer(plan) + val transformer = BackendsApiManager.getSparkPlanExecApiInstance + .genFilterExecTransformer(plan.condition, plan.child) transformer.doValidate().tagOnFallback(plan) case plan: HashAggregateExec => val transformer = HashAggregateExecBaseTransformer.from(plan) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ReplaceDeltaTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ReplaceDeltaTransformer.scala new file mode 100644 index 0000000000000..ef0ff3d68ac60 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ReplaceDeltaTransformer.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar + +import org.apache.gluten.execution.{FilterTransformerFactory, ProjectTransformerFactory} + +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} + +case class ReplaceDeltaTransformer() extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + plan.transformUp { + case node => + applyDeltaTransformer(node) + } + } + + def applyDeltaTransformer(plan: SparkPlan): SparkPlan = plan match { + case p: ProjectExec => + val transformer = ProjectTransformerFactory.createProjectTransformer(p) + val validationResult = transformer.doValidate() + if (validationResult.ok()) { + logDebug(s"Columnar Processing for ${p.getClass} is currently supported.") + transformer + } else { + logDebug(s"Columnar Processing for ${p.getClass} is currently unsupported.") + FallbackTags.add(p, validationResult.reason()) + p + } + case f: FilterExec => + val transformer = FilterTransformerFactory.createFilterTransformer(f) + val validationResult = transformer.doValidate() + if (validationResult.ok()) { + logDebug(s"Columnar Processing for ${f.getClass} is currently supported.") + transformer + } else { + logDebug(s"Columnar Processing for ${f.getClass} is currently unsupported.") + FallbackTags.add(f, validationResult.reason()) + f + } + case other => other + } +}