From 09950de2dd80090a0bc0fea0631749916614cec1 Mon Sep 17 00:00:00 2001 From: Jin Chengcheng Date: Mon, 13 May 2024 09:37:36 +0800 Subject: [PATCH] [GLUTEN-5414] [VL] Move ArrowFileScanExec class to module backends-velox --- .../velox/VeloxSparkPlanExecApi.scala | 10 ++++-- .../extension/ArrowScanReplaceRule.scala | 34 +++++++++++++++++++ .../execution/ArrowFileSourceScanExec.scala | 0 .../gluten/backendsapi/SparkPlanExecApi.scala | 2 ++ .../columnar/MiscColumnarRules.scala | 4 ++- .../columnar/OffloadSingleNode.scala | 10 +----- .../org/apache/gluten/utils/PlanUtil.scala | 6 +++- 7 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala rename {gluten-core => backends-velox}/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala (100%) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 772f1cfb2422..8d01ab96b845 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -24,7 +24,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.ConverterUtils.FunctionConfig import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} -import org.apache.gluten.extension.{BloomFilterMightContainJointRewriteRule, CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule} +import org.apache.gluten.extension.{ArrowScanReplaceRule, BloomFilterMightContainJointRewriteRule, CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule} import org.apache.gluten.extension.columnar.TransformHints import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, IfThenNode} @@ -744,7 +744,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { * @return */ override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] = List( - BloomFilterMightContainJointRewriteRule.apply + BloomFilterMightContainJointRewriteRule.apply, + ArrowScanReplaceRule.apply ) /** @@ -849,4 +850,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { case other => other } } + + override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = plan match { + case _: ArrowFileSourceScanExec => true + case _ => false + } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala new file mode 100644 index 000000000000..2b7c4b1da91b --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.datasource.ArrowCSVFileFormat + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, FileSourceScanExec, SparkPlan} + +case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + plan.transformUp { + case plan: FileSourceScanExec if plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] => + ArrowFileSourceScanExec(plan) + case p => p + } + + } +} diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala similarity index 100% rename from gluten-core/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala rename to backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index fb2fd961b481..8f2ef19f1408 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -739,4 +739,6 @@ trait SparkPlanExecApi { def genPostProjectForGenerate(generate: GenerateExec): SparkPlan def maybeCollapseTakeOrderedAndProject(plan: SparkPlan): SparkPlan = plan + + def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = false } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala index 068f62e498ce..08c63000ec73 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala @@ -68,7 +68,9 @@ object MiscColumnarRules { case RowToColumnarExec(child) => logDebug(s"ColumnarPostOverrides RowToColumnarExec(${child.getClass})") BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child) - case c2r @ ColumnarToRowExec(child) if PlanUtil.outputNativeColumnarData(child) => + case c2r @ ColumnarToRowExec(child) + if PlanUtil.outputNativeColumnarData(child) && + !PlanUtil.outputNativeColumnarSparkCompatibleData(child) => logDebug(s"ColumnarPostOverrides ColumnarToRowExec(${child.getClass})") val nativeC2r = BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child) if (nativeC2r.doValidate().isValid) { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 067aad32cd7a..84a2ec5c6ec8 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -297,15 +297,7 @@ object OffloadOthers { class ReplaceSingleNode() extends LogLevelUtil with Logging { def doReplace(p: SparkPlan): SparkPlan = { - val plan = p match { - case plan: FileSourceScanExec - if plan.relation.fileFormat.getClass.getSimpleName == "ArrowCSVFileFormat" => - val arrowScan = ArrowFileSourceScanExec(plan) - TransformHints.tagNotTransformable(arrowScan, "Arrow scan cannot transform") - return arrowScan - case p => p - } - + val plan = p if (TransformHints.isNotTransformable(plan)) { logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.") plan match { diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala index 610f14c86024..4c02687a6fa5 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.utils +import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.extension.GlutenPlan import org.apache.spark.sql.execution._ @@ -50,12 +51,15 @@ object PlanUtil { case s: WholeStageCodegenExec => outputNativeColumnarData(s.child) case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan) case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i) - case _: ArrowFileSourceScanExec => false case _: GlutenPlan => true case _ => false } } + def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = { + BackendsApiManager.getSparkPlanExecApiInstance.outputNativeColumnarSparkCompatibleData(plan) + } + def isVanillaColumnarOp(plan: SparkPlan): Boolean = { plan match { case i: InMemoryTableScanExec =>