diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff index 19eda87497..9863713752 100644 --- a/dev/diffs/3.5.1.diff +++ b/dev/diffs/3.5.1.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index 0f504dbee85..71fd49a3744 100644 +index 0f504dbee85..7616b97cc0a 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -250,10 +250,15 @@ index 56e9520fdab..917932336df 100644 spark.range(100).write.saveAsTable(s"$dbName.$table2Name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -index 002719f0689..fa76c42feaa 100644 +index 002719f0689..784d24afe2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -@@ -44,7 +44,7 @@ import org.apache.spark.sql.connector.FakeV2Provider +@@ -40,11 +40,12 @@ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation + import org.apache.spark.sql.catalyst.parser.ParseException + import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LocalRelation, LogicalPlan, OneRowRelation, Statistics} + import org.apache.spark.sql.catalyst.util.DateTimeUtils ++import org.apache.spark.sql.comet.CometBroadcastExchangeExec + import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, SortExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -262,7 +267,7 @@ index 002719f0689..fa76c42feaa 100644 import org.apache.spark.sql.expressions.{Aggregator, Window} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -2020,7 +2020,7 @@ class DataFrameSuite extends QueryTest +@@ -2020,7 +2021,7 @@ class DataFrameSuite extends QueryTest fail("Should not have back to back Aggregates") } atFirstAgg = true @@ -271,7 +276,7 @@ index 002719f0689..fa76c42feaa 100644 case _ => } } -@@ -2344,7 +2344,7 @@ class DataFrameSuite extends QueryTest +@@ -2344,7 +2345,7 @@ class DataFrameSuite extends QueryTest checkAnswer(join, df) assert( collect(join.queryExecution.executedPlan) { @@ -280,7 +285,7 @@ index 002719f0689..fa76c42feaa 100644 assert( collect(join.queryExecution.executedPlan) { case e: ReusedExchangeExec => true }.size === 1) val broadcasted = broadcast(join) -@@ -2352,7 +2352,7 @@ class DataFrameSuite extends QueryTest +@@ -2352,10 +2353,12 @@ class DataFrameSuite extends QueryTest checkAnswer(join2, df) assert( collect(join2.queryExecution.executedPlan) { @@ -288,8 +293,14 @@ index 002719f0689..fa76c42feaa 100644 + case _: ShuffleExchangeLike => true }.size == 1) assert( collect(join2.queryExecution.executedPlan) { - case e: BroadcastExchangeExec => true }.size === 1) -@@ -2915,7 +2915,7 @@ class DataFrameSuite extends QueryTest +- case e: BroadcastExchangeExec => true }.size === 1) ++ case e: BroadcastExchangeExec => true ++ case _: CometBroadcastExchangeExec => true ++ }.size === 1) + assert( + collect(join2.queryExecution.executedPlan) { case e: ReusedExchangeExec => true }.size == 4) + } +@@ -2915,7 +2918,7 @@ class DataFrameSuite extends QueryTest // Assert that no extra shuffle introduced by cogroup. val exchanges = collect(df3.queryExecution.executedPlan) { @@ -298,7 +309,7 @@ index 002719f0689..fa76c42feaa 100644 } assert(exchanges.size == 2) } -@@ -3364,7 +3364,8 @@ class DataFrameSuite extends QueryTest +@@ -3364,7 +3367,8 @@ class DataFrameSuite extends QueryTest assert(df2.isLocal) } @@ -617,17 +628,19 @@ index 7af826583bd..3c3def1eb67 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 9dcf7ec2904..39b57fb166a 100644 +index 9dcf7ec2904..94a171d1aad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +@@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Filter, HintInfo, Join, JoinHint, NO_BROADCAST_AND_REPLICATION} +-import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.comet._ - import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} ++import org.apache.spark.sql.execution.{BinaryExecNode, ColumnarToRowExec, FilterExec, InputAdapter, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.joins._ @@ -801,7 +802,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } @@ -638,7 +651,20 @@ index 9dcf7ec2904..39b57fb166a 100644 withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0", SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") { -@@ -1176,9 +1178,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -927,10 +929,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + val physical = df.queryExecution.sparkPlan + val physicalJoins = physical.collect { + case j: SortMergeJoinExec => j ++ case j: CometSortMergeJoinExec => j.originalPlan.asInstanceOf[SortMergeJoinExec] + } + val executed = df.queryExecution.executedPlan + val executedJoins = collect(executed) { + case j: SortMergeJoinExec => j ++ case j: CometSortMergeJoinExec => j.originalPlan.asInstanceOf[SortMergeJoinExec] + } + // This only applies to the above tested queries, in which a child SortMergeJoin always + // contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never +@@ -1176,9 +1180,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) .groupBy($"k1").count() .queryExecution.executedPlan @@ -652,7 +678,7 @@ index 9dcf7ec2904..39b57fb166a 100644 }) } -@@ -1195,10 +1199,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1195,10 +1201,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType) .queryExecution .executedPlan @@ -666,7 +692,7 @@ index 9dcf7ec2904..39b57fb166a 100644 }) // Test shuffled hash join -@@ -1208,10 +1213,13 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1208,10 +1215,13 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType) .queryExecution .executedPlan @@ -683,7 +709,7 @@ index 9dcf7ec2904..39b57fb166a 100644 }) } -@@ -1302,12 +1310,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1302,12 +1312,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan inputDFs.foreach { case (df1, df2, joinExprs) => val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full") assert(collect(smjDF.queryExecution.executedPlan) { @@ -698,7 +724,58 @@ index 9dcf7ec2904..39b57fb166a 100644 // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) } -@@ -1485,7 +1493,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1366,12 +1376,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) +@@ -1382,12 +1394,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) +@@ -1431,13 +1445,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + assert(shjCodegenDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true + case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true ++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(_: CometHashJoinExec))) => ++ true ++ case WholeStageCodegenExec(ColumnarToRowExec( ++ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true + }.size === 1) + checkAnswer(shjCodegenDF, Seq.empty) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val shjNonCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) + assert(shjNonCodegenDF.queryExecution.executedPlan.collect { +- case _: ShuffledHashJoinExec => true }.size === 1) ++ case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true ++ }.size === 1) + checkAnswer(shjNonCodegenDF, Seq.empty) + } + } +@@ -1485,7 +1505,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -708,7 +785,7 @@ index 9dcf7ec2904..39b57fb166a 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1514,9 +1523,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1514,9 +1535,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -723,7 +800,7 @@ index 9dcf7ec2904..39b57fb166a 100644 } // Test output ordering is not preserved -@@ -1525,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1525,9 +1549,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -738,7 +815,7 @@ index 9dcf7ec2904..39b57fb166a 100644 } // Test singe partition -@@ -1537,7 +1552,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1537,7 +1564,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -748,6 +825,28 @@ index 9dcf7ec2904..39b57fb166a 100644 checkAnswer(fullJoinDF, Row(100)) } } +@@ -1582,6 +1610,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + Seq(semiJoinDF, antiJoinDF).foreach { df => + assert(collect(df.queryExecution.executedPlan) { + case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true ++ case j: CometHashJoinExec ++ if j.originalPlan.asInstanceOf[ShuffledHashJoinExec].ignoreDuplicatedKey == ++ ignoreDuplicatedKey => true + }.size == 1) + } + } +@@ -1633,7 +1664,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + + test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SHJ)") { + def check(plan: SparkPlan): Unit = { +- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) ++ assert(collect(plan) { ++ case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true ++ }.size === 1) + } + dupStreamSideColTest("SHUFFLE_HASH", check) + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index b5b34922694..a72403780c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1173,7 +1272,7 @@ index 5a413c77754..37eee6e2c07 100644 val df = spark.read.parquet(path).selectExpr(projection: _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 68bae34790a..ea906fd1adc 100644 +index 68bae34790a..0cc77ad09d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._ @@ -1197,7 +1296,7 @@ index 68bae34790a..ea906fd1adc 100644 } } -@@ -124,30 +127,38 @@ class AdaptiveQueryExecSuite +@@ -124,30 +127,39 @@ class AdaptiveQueryExecSuite private def findTopLevelSortMergeJoin(plan: SparkPlan): Seq[SortMergeJoinExec] = { collect(plan) { case j: SortMergeJoinExec => j @@ -1219,6 +1318,7 @@ index 68bae34790a..ea906fd1adc 100644 case j: BaseJoinExec => j + case c: CometHashJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec] + case c: CometSortMergeJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec] ++ case c: CometBroadcastHashJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec] } } @@ -1236,7 +1336,7 @@ index 68bae34790a..ea906fd1adc 100644 } } -@@ -191,6 +202,7 @@ class AdaptiveQueryExecSuite +@@ -191,6 +203,7 @@ class AdaptiveQueryExecSuite val parts = rdd.partitions assert(parts.forall(rdd.preferredLocations(_).nonEmpty)) } @@ -1244,7 +1344,7 @@ index 68bae34790a..ea906fd1adc 100644 assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead)) } -@@ -199,7 +211,7 @@ class AdaptiveQueryExecSuite +@@ -199,7 +212,7 @@ class AdaptiveQueryExecSuite val plan = df.queryExecution.executedPlan assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { @@ -1253,7 +1353,7 @@ index 68bae34790a..ea906fd1adc 100644 } assert(shuffle.size == 1) assert(shuffle(0).outputPartitioning.numPartitions == numPartition) -@@ -215,7 +227,8 @@ class AdaptiveQueryExecSuite +@@ -215,7 +228,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) @@ -1263,7 +1363,7 @@ index 68bae34790a..ea906fd1adc 100644 } } -@@ -242,7 +255,8 @@ class AdaptiveQueryExecSuite +@@ -242,7 +256,8 @@ class AdaptiveQueryExecSuite } } @@ -1273,7 +1373,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", -@@ -274,7 +288,8 @@ class AdaptiveQueryExecSuite +@@ -274,7 +289,8 @@ class AdaptiveQueryExecSuite } } @@ -1283,7 +1383,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", -@@ -288,7 +303,8 @@ class AdaptiveQueryExecSuite +@@ -288,7 +304,8 @@ class AdaptiveQueryExecSuite val localReads = collect(adaptivePlan) { case read: AQEShuffleReadExec if read.isLocalRead => read } @@ -1293,7 +1393,29 @@ index 68bae34790a..ea906fd1adc 100644 val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD] val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD] // the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2 -@@ -337,7 +353,7 @@ class AdaptiveQueryExecSuite +@@ -313,7 +330,9 @@ class AdaptiveQueryExecSuite + .groupBy($"a").count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan +- assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) ++ assert(find(plan) { case p => ++ p.isInstanceOf[SortMergeJoinExec] || p.isInstanceOf[CometSortMergeJoinExec] ++ }.isDefined) + val coalescedReads = collect(plan) { + case r: AQEShuffleReadExec => r + } +@@ -327,7 +346,9 @@ class AdaptiveQueryExecSuite + .groupBy($"a").count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan +- assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) ++ assert(find(plan) { case p => ++ p.isInstanceOf[BroadcastHashJoinExec] || p.isInstanceOf[CometBroadcastHashJoinExec] ++ }.isDefined) + val coalescedReads = collect(plan) { + case r: AQEShuffleReadExec => r + } +@@ -337,7 +358,7 @@ class AdaptiveQueryExecSuite } } @@ -1302,7 +1424,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -352,7 +368,7 @@ class AdaptiveQueryExecSuite +@@ -352,7 +373,7 @@ class AdaptiveQueryExecSuite } } @@ -1311,7 +1433,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -368,7 +384,7 @@ class AdaptiveQueryExecSuite +@@ -368,7 +389,7 @@ class AdaptiveQueryExecSuite } } @@ -1320,7 +1442,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -413,7 +429,7 @@ class AdaptiveQueryExecSuite +@@ -413,7 +434,7 @@ class AdaptiveQueryExecSuite } } @@ -1329,7 +1451,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -458,7 +474,7 @@ class AdaptiveQueryExecSuite +@@ -458,7 +479,7 @@ class AdaptiveQueryExecSuite } } @@ -1338,7 +1460,16 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { -@@ -523,7 +539,7 @@ class AdaptiveQueryExecSuite +@@ -504,7 +525,7 @@ class AdaptiveQueryExecSuite + } + } + +- test("Exchange reuse") { ++ test("Exchange reuse", IgnoreComet("Comet shuffle changes shuffle metrics")) { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { +@@ -523,7 +544,7 @@ class AdaptiveQueryExecSuite } } @@ -1347,7 +1478,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -554,7 +570,9 @@ class AdaptiveQueryExecSuite +@@ -554,7 +575,9 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) @@ -1358,7 +1489,7 @@ index 68bae34790a..ea906fd1adc 100644 // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) -@@ -575,7 +593,9 @@ class AdaptiveQueryExecSuite +@@ -575,7 +598,9 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) @@ -1369,7 +1500,7 @@ index 68bae34790a..ea906fd1adc 100644 // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.isEmpty) -@@ -584,7 +604,8 @@ class AdaptiveQueryExecSuite +@@ -584,7 +609,8 @@ class AdaptiveQueryExecSuite } } @@ -1379,7 +1510,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", -@@ -679,7 +700,8 @@ class AdaptiveQueryExecSuite +@@ -679,7 +705,8 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) // There is still a SMJ, and its two shuffles can't apply local read. @@ -1389,7 +1520,7 @@ index 68bae34790a..ea906fd1adc 100644 } } -@@ -801,7 +823,8 @@ class AdaptiveQueryExecSuite +@@ -801,7 +828,8 @@ class AdaptiveQueryExecSuite } } @@ -1399,7 +1530,7 @@ index 68bae34790a..ea906fd1adc 100644 Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint => def getJoinNode(plan: SparkPlan): Seq[ShuffledJoin] = if (joinHint == "SHUFFLE_MERGE") { findTopLevelSortMergeJoin(plan) -@@ -1019,7 +1042,8 @@ class AdaptiveQueryExecSuite +@@ -1019,7 +1047,8 @@ class AdaptiveQueryExecSuite } } @@ -1409,7 +1540,7 @@ index 68bae34790a..ea906fd1adc 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") -@@ -1614,7 +1638,7 @@ class AdaptiveQueryExecSuite +@@ -1614,7 +1643,7 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id") assert(collect(adaptivePlan) { @@ -1418,7 +1549,7 @@ index 68bae34790a..ea906fd1adc 100644 }.length == 1) } } -@@ -1694,7 +1718,8 @@ class AdaptiveQueryExecSuite +@@ -1694,7 +1723,8 @@ class AdaptiveQueryExecSuite } } @@ -1428,7 +1559,7 @@ index 68bae34790a..ea906fd1adc 100644 def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => -@@ -1879,6 +1904,9 @@ class AdaptiveQueryExecSuite +@@ -1879,6 +1909,9 @@ class AdaptiveQueryExecSuite def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = { assert(collect(ds.queryExecution.executedPlan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1438,7 +1569,7 @@ index 68bae34790a..ea906fd1adc 100644 }.size == 1) ds.collect() val plan = ds.queryExecution.executedPlan -@@ -1887,6 +1915,9 @@ class AdaptiveQueryExecSuite +@@ -1887,6 +1920,9 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1448,7 +1579,7 @@ index 68bae34790a..ea906fd1adc 100644 }.size == 1) checkAnswer(ds, testData) } -@@ -2043,7 +2074,8 @@ class AdaptiveQueryExecSuite +@@ -2043,7 +2079,8 @@ class AdaptiveQueryExecSuite } } @@ -1458,7 +1589,7 @@ index 68bae34790a..ea906fd1adc 100644 withTempView("t1", "t2") { def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = { Seq("100", "100000").foreach { size => -@@ -2129,7 +2161,8 @@ class AdaptiveQueryExecSuite +@@ -2129,7 +2166,8 @@ class AdaptiveQueryExecSuite } } @@ -1468,7 +1599,7 @@ index 68bae34790a..ea906fd1adc 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", -@@ -2228,7 +2261,7 @@ class AdaptiveQueryExecSuite +@@ -2228,7 +2266,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") val shuffles1 = collect(adaptive1) { @@ -1477,7 +1608,7 @@ index 68bae34790a..ea906fd1adc 100644 } assert(shuffles1.size == 3) // shuffles1.head is the top-level shuffle under the Aggregate operator -@@ -2241,7 +2274,7 @@ class AdaptiveQueryExecSuite +@@ -2241,7 +2279,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2") val shuffles2 = collect(adaptive2) { @@ -1486,7 +1617,7 @@ index 68bae34790a..ea906fd1adc 100644 } if (hasRequiredDistribution) { assert(shuffles2.size == 3) -@@ -2275,7 +2308,8 @@ class AdaptiveQueryExecSuite +@@ -2275,7 +2313,8 @@ class AdaptiveQueryExecSuite } } @@ -1496,7 +1627,7 @@ index 68bae34790a..ea906fd1adc 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2419,6 +2453,7 @@ class AdaptiveQueryExecSuite +@@ -2419,6 +2458,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -1504,7 +1635,7 @@ index 68bae34790a..ea906fd1adc 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2436,7 +2471,8 @@ class AdaptiveQueryExecSuite +@@ -2436,7 +2476,8 @@ class AdaptiveQueryExecSuite } } @@ -1514,7 +1645,7 @@ index 68bae34790a..ea906fd1adc 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2548,7 +2584,7 @@ class AdaptiveQueryExecSuite +@@ -2548,7 +2589,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -1523,7 +1654,7 @@ index 68bae34790a..ea906fd1adc 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2559,7 +2595,7 @@ class AdaptiveQueryExecSuite +@@ -2559,7 +2600,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -1532,6 +1663,14 @@ index 68bae34790a..ea906fd1adc 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) +@@ -2756,6 +2797,7 @@ class AdaptiveQueryExecSuite + }.size == (if (firstAccess) 1 else 0)) + assert(collect(initialExecutedPlan) { + case s: SortExec => s ++ case s: CometSortExec => s + }.size == (if (firstAccess) 2 else 0)) + assert(collect(initialExecutedPlan) { + case i: InMemoryTableScanExec => i diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala index 05872d41131..a2c328b9742 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala @@ -1586,10 +1725,10 @@ index bf496d6db21..1e92016830f 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala -index ce43edb79c1..c414b19eda7 100644 +index ce43edb79c1..8436cb727c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala -@@ -17,7 +17,7 @@ +@@ -17,9 +17,10 @@ package org.apache.spark.sql.execution.datasources @@ -1597,8 +1736,27 @@ index ce43edb79c1..c414b19eda7 100644 +import org.apache.spark.sql.{IgnoreComet, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} ++import org.apache.spark.sql.comet.CometSortExec import org.apache.spark.sql.execution.{QueryExecution, SortExec} -@@ -305,7 +305,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec + import org.apache.spark.sql.internal.SQLConf +@@ -225,6 +226,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s ++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec] + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), +@@ -272,6 +274,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s ++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec] + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), +@@ -305,7 +308,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } } @@ -2365,6 +2523,55 @@ index b4c4ec7acbf..20579284856 100644 } val aggregateExecsWithoutPartialAgg = allAggregateExecs.filter { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +index 3e1bc57dfa2..4a8d75ff512 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +@@ -31,7 +31,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation + import org.apache.spark.sql.{DataFrame, Row, SparkSession} + import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec ++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike + import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper} + import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStore, StateStoreProviderId} + import org.apache.spark.sql.functions._ +@@ -619,14 +619,27 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { + + val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) + +- assert(query.lastExecution.executedPlan.collect { +- case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _, +- ShuffleExchangeExec(opA: HashPartitioning, _, _, _), +- ShuffleExchangeExec(opB: HashPartitioning, _, _, _)) +- if partitionExpressionsColumns(opA.expressions) === Seq("a", "b") +- && partitionExpressionsColumns(opB.expressions) === Seq("a", "b") +- && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j +- }.size == 1) ++ val join = query.lastExecution.executedPlan.collect { ++ case j: StreamingSymmetricHashJoinExec => j ++ }.head ++ val opA = join.left.collect { ++ case s: ShuffleExchangeLike ++ if s.outputPartitioning.isInstanceOf[HashPartitioning] && ++ partitionExpressionsColumns( ++ s.outputPartitioning ++ .asInstanceOf[HashPartitioning].expressions) === Seq("a", "b") => ++ s.outputPartitioning.asInstanceOf[HashPartitioning] ++ }.head ++ val opB = join.right.collect { ++ case s: ShuffleExchangeLike ++ if s.outputPartitioning.isInstanceOf[HashPartitioning] && ++ partitionExpressionsColumns( ++ s.outputPartitioning ++ .asInstanceOf[HashPartitioning].expressions) === Seq("a", "b") => ++ s.outputPartitioning ++ .asInstanceOf[HashPartitioning] ++ }.head ++ assert(opA.numPartitions == numPartitions && opB.numPartitions == numPartitions) + }) + } + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index abe606ad9c1..2d930b64cca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala