Skip to content

Commit

Permalink
fix: push down input_file_name expression to transformer scan in delta (
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun authored Oct 12, 2024
1 parent 2835c78 commit 4b6e929
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@
package org.apache.gluten.extension

import org.apache.gluten.execution.{DeltaScanTransformer, ProjectExecTransformer}
import org.apache.gluten.extension.DeltaRewriteTransformerRules.columnMappingRule
import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, pushDownInputFileExprRule}
import org.apache.gluten.extension.columnar.RewriteTransformerRules

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, NoMapping}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat

import scala.collection.mutable.ListBuffer

class DeltaRewriteTransformerRules extends RewriteTransformerRules {
override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil
override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: pushDownInputFileExprRule :: Nil
}

object DeltaRewriteTransformerRules {
Expand All @@ -58,13 +58,27 @@ object DeltaRewriteTransformerRules {
transformColumnMappingPlan(p)
}

val pushDownInputFileExprRule: Rule[SparkPlan] = (plan: SparkPlan) =>
plan.transformUp {
case p @ ProjectExec(projectList, child: DeltaScanTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output)
}

private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match {
case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
true
case _ =>
false
}

private def containsInputFileRelatedExpr(expr: Expression): Boolean = {
expr match {
case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true
case _ => expr.children.exists(containsInputFileRelatedExpr)
}
}

/**
* This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping)
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,42 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
checkAnswer(spark.read.format("delta").load(path), df1)
}
}

testWithSpecifiedSparkVersion("delta: push down input_file_name expression", Some("3.2")) {
withTable("source_table") {
withTable("target_table") {
spark.sql(s"""
|CREATE TABLE source_table(id INT, name STRING, age INT) USING delta;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE target_table(id INT, name STRING, age INT) USING delta;
|
|""".stripMargin)

spark.sql(s"""
|INSERT INTO source_table VALUES(1, 'a', 10),(2, 'b', 20);
|""".stripMargin)

spark.sql(s"""
|INSERT INTO target_table VALUES(1, 'c', 10),(3, 'c', 30);
|""".stripMargin)

spark.sql(s"""
|MERGE INTO target_table AS target
|USING source_table AS source
|ON target.id = source.id
|WHEN MATCHED THEN
|UPDATE SET
| target.name = source.name,
| target.age = source.age
|WHEN NOT MATCHED THEN
|INSERT (id, name, age) VALUES (source.id, source.name, source.age);
|""".stripMargin)

val df1 = runQueryAndCompare("SELECT * FROM target_table") { _ => }
checkAnswer(df1, Row(1, "a", 10) :: Row(2, "b", 20) :: Row(3, "c", 30) :: Nil)
}
}
}
}

0 comments on commit 4b6e929

Please sign in to comment.