Skip to content

Commit

Permalink
use delta write rule
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun committed Oct 11, 2024
1 parent fb9b7d7 commit 867285c
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ private object VeloxRuleApi {
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => PushDownFilterToScan)
injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectTransform(_ => ReplaceDeltaTransformer())
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.execution.{DeltaScanTransformer, ProjectExecTransformer}
import org.apache.gluten.extension.DeltaRewriteTransformerRules.columnMappingRule
import org.apache.gluten.execution.{DeltaFilterExecTransformer, DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, filterRule, projectRule}
import org.apache.gluten.extension.columnar.RewriteTransformerRules

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, NoMapping}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat

import scala.collection.mutable.ListBuffer

class DeltaRewriteTransformerRules extends RewriteTransformerRules {
override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil
override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: filterRule :: projectRule :: Nil
}

object DeltaRewriteTransformerRules {
Expand All @@ -58,13 +58,32 @@ object DeltaRewriteTransformerRules {
transformColumnMappingPlan(p)
}

val filterRule: Rule[SparkPlan] = (plan: SparkPlan) =>
plan.transformUp {
case FilterExec(condition, child) if condition.exists(containsIncrementMetricExpr) =>
DeltaFilterExecTransformer(condition, child)
}

val projectRule: Rule[SparkPlan] = (plan: SparkPlan) =>
plan.transformUp {
case ProjectExec(projectList, child) if projectList.exists(containsIncrementMetricExpr) =>
DeltaProjectExecTransformer(projectList, child)
}

private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match {
case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
true
case _ =>
false
}

private def containsIncrementMetricExpr(expr: Expression): Boolean = {
expr match {
case e if e.prettyName == "increment_metric" => true
case _ => expr.children.exists(containsIncrementMetricExpr)
}
}

/**
* This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping)
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 867285c

Please sign in to comment.