From 04705aaeec00cfdc4ae645e6b500c2e2d6f71264 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Fri, 6 Dec 2024 11:15:51 +0800 Subject: [PATCH] [SPARK-50496][Core]Change partitioning to SinglePartition if partition number is 1 --- .../plans/logical/basicLogicalOperators.scala | 6 ++++- .../adaptive/AdaptiveQueryExecSuite.scala | 22 ++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0cb04064a6178..08e4713fd61dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1980,7 +1980,11 @@ case class RebalancePartitions( override def output: Seq[Attribute] = child.output override val nodePatterns: Seq[TreePattern] = Seq(REBALANCE_PARTITIONS) - override val partitioning: Partitioning = super.partitioning + override val partitioning: Partitioning = if (numPartitions == 1) { + SinglePartition + } else { + super.partitioning + } override protected def withNewChildInternal(newChild: LogicalPlan): RebalancePartitions = copy(child = newChild) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ad28fd5176d99..6bbbe0c50a6e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SparkSession, S import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, RepartitionOperation} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike} @@ -2236,6 +2237,25 @@ class AdaptiveQueryExecSuite } } + test("SPARK-50496: Change rebalance partitioning to SinglePartition if partition number is 1") { + def checkSinglePartitioning(df: DataFrame): Unit = { + assert( + df.queryExecution.analyzed.collect { + case r: RepartitionOperation => r + }.size == 1) + + assert( + collect(df.queryExecution.executedPlan) { + case s: ShuffleExchangeExec if s.outputPartitioning == SinglePartition => s + }.size == 1) + } + + checkSinglePartitioning(sql("SELECT /*+ REBALANCE(1) */ * FROM VALUES(1),(2),(3) AS t(c)")) + checkSinglePartitioning(sql("SELECT /*+ REBALANCE(1, c) */ * FROM VALUES(1),(2),(3) AS t(c)")) + } + + + test("SPARK-35725: Support optimize skewed partitions in RebalancePartitions") { withTempView("v") { withSQLConf(