Skip to content

Commit

Permalink
fix API change
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed Oct 9, 2024
1 parent 6ef648e commit 820da22
Show file tree
Hide file tree
Showing 6 changed files with 712 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.utils.FileIndexUtil

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
Expand Down Expand Up @@ -57,8 +57,8 @@ case class BatchScanExecTransformer(

override def doCanonicalize(): BatchScanExecTransformer = {
this.copy(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
output = output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
runtimeFilters = QueryPlanWrapper.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.utils.FileIndexUtil

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
Expand Down Expand Up @@ -57,14 +57,14 @@ case class FileSourceScanExecTransformer(
override def doCanonicalize(): FileSourceScanExecTransformer = {
FileSourceScanExecTransformer(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
QueryPlanWrapper.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
QueryPlanWrapper.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelation, HashJoin, LongHashedRelation}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -59,7 +59,7 @@ case class ColumnarSubqueryBroadcastExec(
BackendsApiManager.getMetricsApiInstance.genColumnarSubqueryBroadcastMetrics(sparkContext)

override def doCanonicalize(): SparkPlan = {
val keys = buildKeys.map(k => QueryPlan.normalizeExpressions(k, child.output))
val keys = buildKeys.map(k => QueryPlanWrapper.normalizeExpressions(k, child.output))
copy(name = "native-dpp", buildKeys = keys, child = child.canonicalized)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
try {
generateWholeStageCodegenIds(plan)

QueryPlan.append(plan, append, verbose = false, addSuffix = false, printOperatorId = true)
QueryPlanWrapper.append(
plan,
append,
verbose = false,
addSuffix = false,
printOperatorId = true)

append("\n")

Expand Down Expand Up @@ -401,7 +406,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {

def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
if (currentCodegenId != -1) {
p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
p.setTagValue(QueryPlanWrapper.CODEGEN_ID_TAG, currentCodegenId)
}
children.foreach(generateWholeStageCodegenIds)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -170,9 +170,9 @@ case class HiveTableScanExecTransformer(
override def doCanonicalize(): HiveTableScanExecTransformer = {
val input: AttributeSeq = relation.output
HiveTableScanExecTransformer(
requestedAttributes.map(QueryPlan.normalizeExpressions(_, input)),
requestedAttributes.map(QueryPlanWrapper.normalizeExpressions(_, input)),
relation.canonicalized.asInstanceOf[HiveTableRelation],
QueryPlan.normalizePredicates(partitionPruningPred, input)
QueryPlanWrapper.normalizePredicates(partitionPruningPred, input)
)(sparkSession)
}
}
Expand Down
Loading

0 comments on commit 820da22

Please sign in to comment.