Skip to content

Commit

Permalink
[GLUTEN-5414] [VL] Move ArrowFileScanExec class to module backends-velox
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh authored May 13, 2024
1 parent c44843b commit 09950de
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension.{BloomFilterMightContainJointRewriteRule, CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule}
import org.apache.gluten.extension.{ArrowScanReplaceRule, BloomFilterMightContainJointRewriteRule, CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule}
import org.apache.gluten.extension.columnar.TransformHints
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, IfThenNode}
Expand Down Expand Up @@ -744,7 +744,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] = List(
BloomFilterMightContainJointRewriteRule.apply
BloomFilterMightContainJointRewriteRule.apply,
ArrowScanReplaceRule.apply
)

/**
Expand Down Expand Up @@ -849,4 +850,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
case other => other
}
}

override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = plan match {
case _: ArrowFileSourceScanExec => true
case _ => false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension

import org.apache.gluten.datasource.ArrowCSVFileFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, FileSourceScanExec, SparkPlan}

case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case plan: FileSourceScanExec if plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
ArrowFileSourceScanExec(plan)
case p => p
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -739,4 +739,6 @@ trait SparkPlanExecApi {
def genPostProjectForGenerate(generate: GenerateExec): SparkPlan

def maybeCollapseTakeOrderedAndProject(plan: SparkPlan): SparkPlan = plan

def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ object MiscColumnarRules {
case RowToColumnarExec(child) =>
logDebug(s"ColumnarPostOverrides RowToColumnarExec(${child.getClass})")
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
case c2r @ ColumnarToRowExec(child) if PlanUtil.outputNativeColumnarData(child) =>
case c2r @ ColumnarToRowExec(child)
if PlanUtil.outputNativeColumnarData(child) &&
!PlanUtil.outputNativeColumnarSparkCompatibleData(child) =>
logDebug(s"ColumnarPostOverrides ColumnarToRowExec(${child.getClass})")
val nativeC2r = BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
if (nativeC2r.doValidate().isValid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,7 @@ object OffloadOthers {
class ReplaceSingleNode() extends LogLevelUtil with Logging {

def doReplace(p: SparkPlan): SparkPlan = {
val plan = p match {
case plan: FileSourceScanExec
if plan.relation.fileFormat.getClass.getSimpleName == "ArrowCSVFileFormat" =>
val arrowScan = ArrowFileSourceScanExec(plan)
TransformHints.tagNotTransformable(arrowScan, "Arrow scan cannot transform")
return arrowScan
case p => p
}

val plan = p
if (TransformHints.isNotTransformable(plan)) {
logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.")
plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.utils

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.GlutenPlan

import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -50,12 +51,15 @@ object PlanUtil {
case s: WholeStageCodegenExec => outputNativeColumnarData(s.child)
case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan)
case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i)
case _: ArrowFileSourceScanExec => false
case _: GlutenPlan => true
case _ => false
}
}

def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = {
BackendsApiManager.getSparkPlanExecApiInstance.outputNativeColumnarSparkCompatibleData(plan)
}

def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
plan match {
case i: InMemoryTableScanExec =>
Expand Down

0 comments on commit 09950de

Please sign in to comment.