Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE][VL] Add OffloadProject to offload project having input_file_name's support considered #6200

Merged
merged 7 commits into from
Jun 28, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def supportNativeRowIndexColumn(): Boolean = true

override def supportNativeInputFileRelatedExpr(): Boolean = true

override def supportExpandExec(): Boolean = true

override def supportSortExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.extension.InputFileNameReplaceRule
import org.apache.gluten.metrics.IMetrics
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
Expand Down Expand Up @@ -134,13 +133,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
val metadataColumn =
SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames)
metadataColumn.put(InputFileNameReplaceRule.replacedInputFileName, file.filePath.toString)
metadataColumn.put(
InputFileNameReplaceRule.replacedInputFileBlockStart,
file.start.toString)
metadataColumn.put(
InputFileNameReplaceRule.replacedInputFileBlockLength,
file.length.toString)
metadataColumns.add(metadataColumn)
val partitionColumn = new JHashMap[String, String]()
for (i <- 0 until file.partitionValues.numFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,12 +806,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] = {
val buf: ListBuffer[SparkSession => Rule[SparkPlan]] =
ListBuffer(BloomFilterMightContainJointRewriteRule.apply, ArrowScanReplaceRule.apply)
if (GlutenConfig.getConf.enableInputFileNameReplaceRule) {
buf += InputFileNameReplaceRule.apply
}
buf.result
List(BloomFilterMightContainJointRewriteRule.apply, ArrowScanReplaceRule.apply)
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,9 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest {
}

test("Test input_file_name function") {
withSQLConf(
"spark.gluten.sql.enableInputFileNameReplaceRule" -> "true"
) {
runQueryAndCompare("""SELECT input_file_name(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
runQueryAndCompare("""SELECT input_file_name(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait BackendSettingsApi {
def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportNativeRowIndexColumn(): Boolean = false
def supportNativeInputFileRelatedExpr(): Boolean = false

def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object MiscColumnarRules {
object TransformPreOverrides {
def apply(): TransformPreOverrides = {
TransformPreOverrides(
List(OffloadFilter()),
List(OffloadProject(), OffloadFilter()),
List(
OffloadOthers(),
OffloadAggregate(),
Expand Down
Loading
Loading