Skip to content

Commit

Permalink
[CORE][VL] Add OffloadProject to offload project having input_file_na…
Browse files Browse the repository at this point in the history
…me's support considered (#6200)
  • Loading branch information
gaoyangxiaozhu authored Jun 28, 2024
1 parent e684bf3 commit 86449d0
Show file tree
Hide file tree
Showing 19 changed files with 343 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def supportNativeRowIndexColumn(): Boolean = true

override def supportNativeInputFileRelatedExpr(): Boolean = true

override def supportExpandExec(): Boolean = true

override def supportSortExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.extension.InputFileNameReplaceRule
import org.apache.gluten.metrics.IMetrics
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
Expand Down Expand Up @@ -134,13 +133,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
val metadataColumn =
SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames)
metadataColumn.put(InputFileNameReplaceRule.replacedInputFileName, file.filePath.toString)
metadataColumn.put(
InputFileNameReplaceRule.replacedInputFileBlockStart,
file.start.toString)
metadataColumn.put(
InputFileNameReplaceRule.replacedInputFileBlockLength,
file.length.toString)
metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] = {
val buf: ListBuffer[SparkSession => Rule[SparkPlan]] =
ListBuffer(BloomFilterMightContainJointRewriteRule.apply, ArrowScanReplaceRule.apply)
if (GlutenConfig.getConf.enableInputFileNameReplaceRule) {
buf += InputFileNameReplaceRule.apply
}
buf.result
List(BloomFilterMightContainJointRewriteRule.apply, ArrowScanReplaceRule.apply)
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,9 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest {
}

test("Test input_file_name function") {
withSQLConf(
"spark.gluten.sql.enableInputFileNameReplaceRule" -> "true"
) {
runQueryAndCompare("""SELECT input_file_name(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
runQueryAndCompare("""SELECT input_file_name(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait BackendSettingsApi {
def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportNativeRowIndexColumn(): Boolean = false
def supportNativeInputFileRelatedExpr(): Boolean = false

def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object MiscColumnarRules {
object TransformPreOverrides {
def apply(): TransformPreOverrides = {
TransformPreOverrides(
List(OffloadFilter()),
List(OffloadProject(), OffloadFilter()),
List(
OffloadOthers(),
OffloadAggregate(),
Expand Down
Loading

0 comments on commit 86449d0

Please sign in to comment.