Skip to content

Commit

Permalink
[SPARK-SPARK-50519][SQL]Simplify the method of creating SparkPlan for…
Browse files Browse the repository at this point in the history
… queryExecution
  • Loading branch information
guihuawen committed Dec 9, 2024
1 parent 6c2e87a commit 2e2b1b0
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class QueryExecution(
executePhase(QueryPlanningTracker.PLANNING) {
// Clone the logical plan here, in case the planner rules change the states of the logical
// plan.
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
QueryExecution.createSparkPlan(planner, optimizedPlan.clone())
}
}

Expand Down Expand Up @@ -567,7 +567,6 @@ object QueryExecution {
* Note that the returned physical plan still needs to be prepared for execution.
*/
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
Expand All @@ -587,7 +586,7 @@ object QueryExecution {
* [[SparkPlan]] for execution.
*/
def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {
val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())
val sparkPlan = createSparkPlan(spark.sessionState.planner, plan.clone())
prepareExecutedPlan(spark, sparkPlan)
}

Expand All @@ -599,7 +598,7 @@ object QueryExecution {
session: SparkSession,
plan: LogicalPlan,
context: AdaptiveExecutionContext): SparkPlan = {
val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone())
val sparkPlan = createSparkPlan(session.sessionState.planner, plan.clone())
val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)
prepareForExecution(preparationRules, sparkPlan.clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ case class InsertAdaptiveSparkPlan(
// Apply the same instance of this rule to sub-queries so that sub-queries all share the
// same `stageCache` for Exchange reuse.
this.applyInternal(
QueryExecution.createSparkPlan(adaptiveExecutionContext.session,
QueryExecution.createSparkPlan(
adaptiveExecutionContext.session.sessionState.planner, plan.clone()), true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY)) {
case DynamicPruningSubquery(
value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, exprId, _) =>
val sparkPlan = QueryExecution.createSparkPlan(
sparkSession, sparkSession.sessionState.planner, buildPlan)
val sparkPlan = QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan)
// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
Expand Down

0 comments on commit 2e2b1b0

Please sign in to comment.