Skip to content

Commit

Permalink
simplify shim 1
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Oct 9, 2024
1 parent 6704720 commit dadb4cb
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 2,701 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -66,14 +66,14 @@ case class DeltaScanTransformer(
override def doCanonicalize(): DeltaScanTransformer = {
DeltaScanTransformer(
relation,
output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlanWrapper.normalizePredicates(
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlanWrapper.normalizePredicates(dataFilters, output),
QueryPlan.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -61,14 +61,14 @@ case class HudiScanTransformer(
override def doCanonicalize(): HudiScanTransformer = {
HudiScanTransformer(
relation,
output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlanWrapper.normalizePredicates(
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlanWrapper.normalizePredicates(dataFilters, output),
QueryPlan.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.gluten.substrait.rel.SplitInfo

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
Expand Down Expand Up @@ -71,8 +71,8 @@ case class IcebergScanTransformer(

override def doCanonicalize(): IcebergScanTransformer = {
this.copy(
output = output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
runtimeFilters = QueryPlanWrapper.normalizePredicates(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.utils.FileIndexUtil

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
Expand Down Expand Up @@ -57,8 +57,8 @@ case class BatchScanExecTransformer(

override def doCanonicalize(): BatchScanExecTransformer = {
this.copy(
output = output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
runtimeFilters = QueryPlanWrapper.normalizePredicates(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.utils.FileIndexUtil

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
Expand Down Expand Up @@ -57,14 +57,14 @@ case class FileSourceScanExecTransformer(
override def doCanonicalize(): FileSourceScanExecTransformer = {
FileSourceScanExecTransformer(
relation,
output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlanWrapper.normalizePredicates(
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlanWrapper.normalizePredicates(dataFilters, output),
QueryPlan.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelation, HashJoin, LongHashedRelation}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -59,7 +59,7 @@ case class ColumnarSubqueryBroadcastExec(
BackendsApiManager.getMetricsApiInstance.genColumnarSubqueryBroadcastMetrics(sparkContext)

override def doCanonicalize(): SparkPlan = {
val keys = buildKeys.map(k => QueryPlanWrapper.normalizeExpressions(k, child.output))
val keys = buildKeys.map(k => QueryPlan.normalizeExpressions(k, child.output))
copy(name = "native-dpp", buildKeys = keys, child = child.canonicalized)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -165,9 +165,9 @@ case class HiveTableScanExecTransformer(
override def doCanonicalize(): HiveTableScanExecTransformer = {
val input: AttributeSeq = relation.output
HiveTableScanExecTransformer(
requestedAttributes.map(QueryPlanWrapper.normalizeExpressions(_, input)),
requestedAttributes.map(QueryPlan.normalizeExpressions(_, input)),
relation.canonicalized.asInstanceOf[HiveTableRelation],
QueryPlanWrapper.normalizePredicates(partitionPruningPred, input)
QueryPlan.normalizePredicates(partitionPruningPred, input)
)(sparkSession)
}
}
Expand Down
Loading

0 comments on commit dadb4cb

Please sign in to comment.