diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 69016bc38e..a012147da0 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index a4b1b2c3c9f..a2315d2a95b 100644 +index a4b1b2c3c9f..eacc81ca158 100644 --- a/pom.xml +++ b/pom.xml @@ -147,6 +147,8 @@ @@ -334,10 +334,15 @@ index f6fd6b501d7..11870c85d82 100644 spark.range(100).write.saveAsTable(s"$dbName.$table2Name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -index 760ee802608..db4dc90475e 100644 +index 760ee802608..ebd4a34b08d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -@@ -40,7 +40,7 @@ import org.apache.spark.sql.connector.FakeV2Provider +@@ -36,11 +36,12 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, + import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation + import org.apache.spark.sql.catalyst.parser.ParseException + import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LocalRelation, LogicalPlan, OneRowRelation} ++import org.apache.spark.sql.comet.CometBroadcastExchangeExec + import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, SortExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -346,7 +351,7 @@ index 760ee802608..db4dc90475e 100644 import org.apache.spark.sql.expressions.{Aggregator, Window} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -@@ -1401,7 +1401,7 @@ class DataFrameSuite extends QueryTest +@@ -1401,7 +1402,7 @@ class DataFrameSuite extends QueryTest fail("Should not have back to back Aggregates") } atFirstAgg = true @@ -355,7 +360,7 @@ index 760ee802608..db4dc90475e 100644 case _ => } } -@@ -1591,7 +1591,7 @@ class DataFrameSuite extends QueryTest +@@ -1591,7 +1592,7 @@ class DataFrameSuite extends QueryTest checkAnswer(join, df) assert( collect(join.queryExecution.executedPlan) { @@ -364,7 +369,7 @@ index 760ee802608..db4dc90475e 100644 assert( collect(join.queryExecution.executedPlan) { case e: ReusedExchangeExec => true }.size === 1) val broadcasted = broadcast(join) -@@ -1599,7 +1599,7 @@ class DataFrameSuite extends QueryTest +@@ -1599,10 +1600,12 @@ class DataFrameSuite extends QueryTest checkAnswer(join2, df) assert( collect(join2.queryExecution.executedPlan) { @@ -372,8 +377,14 @@ index 760ee802608..db4dc90475e 100644 + case _: ShuffleExchangeLike => true }.size == 1) assert( collect(join2.queryExecution.executedPlan) { - case e: BroadcastExchangeExec => true }.size === 1) -@@ -2000,7 +2000,7 @@ class DataFrameSuite extends QueryTest +- case e: BroadcastExchangeExec => true }.size === 1) ++ case e: BroadcastExchangeExec => true ++ case _: CometBroadcastExchangeExec => true ++ }.size === 1) + assert( + collect(join2.queryExecution.executedPlan) { case e: ReusedExchangeExec => true }.size == 4) + } +@@ -2000,7 +2003,7 @@ class DataFrameSuite extends QueryTest // Assert that no extra shuffle introduced by cogroup. val exchanges = collect(df3.queryExecution.executedPlan) { @@ -382,7 +393,7 @@ index 760ee802608..db4dc90475e 100644 } assert(exchanges.size == 2) } -@@ -2299,7 +2299,8 @@ class DataFrameSuite extends QueryTest +@@ -2299,7 +2302,8 @@ class DataFrameSuite extends QueryTest assert(df2.isLocal) } @@ -701,17 +712,19 @@ index 53e47f428c3..a55d8f0c161 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index fcb937d82ba..f519436ba50 100644 +index fcb937d82ba..64a11bc0b85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper} import org.apache.spark.sql.catalyst.plans.logical.{Filter, HintInfo, Join, JoinHint, NO_BROADCAST_AND_REPLICATION} +-import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.comet._ - import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} ++import org.apache.spark.sql.execution.{BinaryExecNode, ColumnarToRowExec, FilterExec, InputAdapter, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.joins._ @@ -805,7 +806,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } @@ -722,7 +735,20 @@ index fcb937d82ba..f519436ba50 100644 withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1", SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0", SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") { -@@ -1180,9 +1182,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -931,10 +933,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + val physical = df.queryExecution.sparkPlan + val physicalJoins = physical.collect { + case j: SortMergeJoinExec => j ++ case j: CometSortMergeJoinExec => j.originalPlan.asInstanceOf[SortMergeJoinExec] + } + val executed = df.queryExecution.executedPlan + val executedJoins = collect(executed) { + case j: SortMergeJoinExec => j ++ case j: CometSortMergeJoinExec => j.originalPlan.asInstanceOf[SortMergeJoinExec] + } + // This only applies to the above tested queries, in which a child SortMergeJoin always + // contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never +@@ -1180,9 +1184,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) .groupBy($"k1").count() .queryExecution.executedPlan @@ -736,7 +762,7 @@ index fcb937d82ba..f519436ba50 100644 }) } -@@ -1199,10 +1203,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1199,10 +1205,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType) .queryExecution .executedPlan @@ -750,7 +776,7 @@ index fcb937d82ba..f519436ba50 100644 }) // Test shuffled hash join -@@ -1212,10 +1217,13 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1212,10 +1219,13 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType) .queryExecution .executedPlan @@ -767,7 +793,7 @@ index fcb937d82ba..f519436ba50 100644 }) } -@@ -1306,12 +1314,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1306,12 +1316,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan inputDFs.foreach { case (df1, df2, joinExprs) => val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full") assert(collect(smjDF.queryExecution.executedPlan) { @@ -782,7 +808,58 @@ index fcb937d82ba..f519436ba50 100644 // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) } -@@ -1489,7 +1497,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1370,12 +1380,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) +@@ -1386,12 +1398,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) +@@ -1435,13 +1449,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + assert(shjCodegenDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true + case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true ++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(_: CometHashJoinExec))) => ++ true ++ case WholeStageCodegenExec(ColumnarToRowExec( ++ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true + }.size === 1) + checkAnswer(shjCodegenDF, Seq.empty) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val shjNonCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) + assert(shjNonCodegenDF.queryExecution.executedPlan.collect { +- case _: ShuffledHashJoinExec => true }.size === 1) ++ case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true ++ }.size === 1) + checkAnswer(shjNonCodegenDF, Seq.empty) + } + } +@@ -1489,7 +1509,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -792,7 +869,7 @@ index fcb937d82ba..f519436ba50 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1518,9 +1527,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1518,9 +1539,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -807,7 +884,7 @@ index fcb937d82ba..f519436ba50 100644 } // Test output ordering is not preserved -@@ -1529,9 +1541,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1529,9 +1553,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -822,7 +899,7 @@ index fcb937d82ba..f519436ba50 100644 } // Test singe partition -@@ -1541,7 +1556,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1541,7 +1568,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -832,6 +909,28 @@ index fcb937d82ba..f519436ba50 100644 checkAnswer(fullJoinDF, Row(100)) } } +@@ -1586,6 +1614,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + Seq(semiJoinDF, antiJoinDF).foreach { df => + assert(collect(df.queryExecution.executedPlan) { + case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true ++ case j: CometHashJoinExec ++ if j.originalPlan.asInstanceOf[ShuffledHashJoinExec].ignoreDuplicatedKey == ++ ignoreDuplicatedKey => true + }.size == 1) + } + } +@@ -1637,7 +1668,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan + + test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SHJ)") { + def check(plan: SparkPlan): Unit = { +- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) ++ assert(collect(plan) { ++ case _: ShuffledHashJoinExec => true ++ case _: CometHashJoinExec => true ++ }.size === 1) + } + dupStreamSideColTest("SHUFFLE_HASH", check) + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index 34c6c49bc49..f5dea07a213 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1089,6 +1188,29 @@ index 10a32441b6c..5e5d763ee70 100644 }) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +index 12d5f13df01..816d1518c5b 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +@@ -21,7 +21,7 @@ package org.apache.spark.sql.connector + import java.sql.Date + import java.util.Collections + +-import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame, Row} ++import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame, IgnoreCometSuite, Row} + import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, Literal} + import org.apache.spark.sql.catalyst.expressions.objects.Invoke + import org.apache.spark.sql.catalyst.plans.physical +@@ -45,7 +45,8 @@ import org.apache.spark.sql.util.QueryExecutionListener + import org.apache.spark.tags.SlowSQLTest + + @SlowSQLTest +-class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase { ++class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ++ with IgnoreCometSuite { + import testImplicits._ + + before { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 8238eabc7fe..c960fd75a9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -1322,7 +1444,7 @@ index 3aaf61ffba4..93752e2a535 100644 val df = spark.read.parquet(path).selectExpr(projection: _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index a7efd0aa75e..fa65bda2051 100644 +index a7efd0aa75e..baae0967a2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,11 +28,13 @@ import org.apache.spark.SparkException @@ -1348,7 +1470,7 @@ index a7efd0aa75e..fa65bda2051 100644 } } -@@ -132,30 +135,38 @@ class AdaptiveQueryExecSuite +@@ -132,30 +135,39 @@ class AdaptiveQueryExecSuite private def findTopLevelSortMergeJoin(plan: SparkPlan): Seq[SortMergeJoinExec] = { collect(plan) { case j: SortMergeJoinExec => j @@ -1370,6 +1492,7 @@ index a7efd0aa75e..fa65bda2051 100644 case j: BaseJoinExec => j + case c: CometHashJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec] + case c: CometSortMergeJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec] ++ case c: CometBroadcastHashJoinExec => c.originalPlan.asInstanceOf[BaseJoinExec] } } @@ -1387,7 +1510,7 @@ index a7efd0aa75e..fa65bda2051 100644 } } -@@ -205,6 +216,7 @@ class AdaptiveQueryExecSuite +@@ -205,6 +217,7 @@ class AdaptiveQueryExecSuite val parts = rdd.partitions assert(parts.forall(rdd.preferredLocations(_).nonEmpty)) } @@ -1395,7 +1518,7 @@ index a7efd0aa75e..fa65bda2051 100644 assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead)) } -@@ -213,7 +225,7 @@ class AdaptiveQueryExecSuite +@@ -213,7 +226,7 @@ class AdaptiveQueryExecSuite val plan = df.queryExecution.executedPlan assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { @@ -1404,7 +1527,7 @@ index a7efd0aa75e..fa65bda2051 100644 } assert(shuffle.size == 1) assert(shuffle(0).outputPartitioning.numPartitions == numPartition) -@@ -229,7 +241,8 @@ class AdaptiveQueryExecSuite +@@ -229,7 +242,8 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) @@ -1414,7 +1537,7 @@ index a7efd0aa75e..fa65bda2051 100644 } } -@@ -256,7 +269,8 @@ class AdaptiveQueryExecSuite +@@ -256,7 +270,8 @@ class AdaptiveQueryExecSuite } } @@ -1424,7 +1547,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", -@@ -288,7 +302,8 @@ class AdaptiveQueryExecSuite +@@ -288,7 +303,8 @@ class AdaptiveQueryExecSuite } } @@ -1434,7 +1557,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", -@@ -302,7 +317,8 @@ class AdaptiveQueryExecSuite +@@ -302,7 +318,8 @@ class AdaptiveQueryExecSuite val localReads = collect(adaptivePlan) { case read: AQEShuffleReadExec if read.isLocalRead => read } @@ -1444,7 +1567,29 @@ index a7efd0aa75e..fa65bda2051 100644 val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD] val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD] // the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2 -@@ -351,7 +367,7 @@ class AdaptiveQueryExecSuite +@@ -327,7 +344,9 @@ class AdaptiveQueryExecSuite + .groupBy($"a").count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan +- assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) ++ assert(find(plan) { case p => ++ p.isInstanceOf[SortMergeJoinExec] || p.isInstanceOf[CometSortMergeJoinExec] ++ }.isDefined) + val coalescedReads = collect(plan) { + case r: AQEShuffleReadExec => r + } +@@ -341,7 +360,9 @@ class AdaptiveQueryExecSuite + .groupBy($"a").count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan +- assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) ++ assert(find(plan) { case p => ++ p.isInstanceOf[BroadcastHashJoinExec] || p.isInstanceOf[CometBroadcastHashJoinExec] ++ }.isDefined) + val coalescedReads = collect(plan) { + case r: AQEShuffleReadExec => r + } +@@ -351,7 +372,7 @@ class AdaptiveQueryExecSuite } } @@ -1453,7 +1598,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -366,7 +382,7 @@ class AdaptiveQueryExecSuite +@@ -366,7 +387,7 @@ class AdaptiveQueryExecSuite } } @@ -1462,7 +1607,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -382,7 +398,7 @@ class AdaptiveQueryExecSuite +@@ -382,7 +403,7 @@ class AdaptiveQueryExecSuite } } @@ -1471,7 +1616,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -427,7 +443,7 @@ class AdaptiveQueryExecSuite +@@ -427,7 +448,7 @@ class AdaptiveQueryExecSuite } } @@ -1480,7 +1625,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -472,7 +488,7 @@ class AdaptiveQueryExecSuite +@@ -472,7 +493,7 @@ class AdaptiveQueryExecSuite } } @@ -1489,7 +1634,16 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { -@@ -537,7 +553,7 @@ class AdaptiveQueryExecSuite +@@ -518,7 +539,7 @@ class AdaptiveQueryExecSuite + } + } + +- test("Exchange reuse") { ++ test("Exchange reuse", IgnoreComet("Comet shuffle changes shuffle metrics")) { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { +@@ -537,7 +558,7 @@ class AdaptiveQueryExecSuite } } @@ -1498,7 +1652,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { -@@ -568,7 +584,9 @@ class AdaptiveQueryExecSuite +@@ -568,7 +589,9 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) @@ -1509,7 +1663,7 @@ index a7efd0aa75e..fa65bda2051 100644 // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.nonEmpty) -@@ -589,7 +607,9 @@ class AdaptiveQueryExecSuite +@@ -589,7 +612,9 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) @@ -1520,7 +1674,7 @@ index a7efd0aa75e..fa65bda2051 100644 // Even with local shuffle read, the query stage reuse can also work. val ex = findReusedExchange(adaptivePlan) assert(ex.isEmpty) -@@ -598,7 +618,8 @@ class AdaptiveQueryExecSuite +@@ -598,7 +623,8 @@ class AdaptiveQueryExecSuite } } @@ -1530,7 +1684,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", -@@ -693,7 +714,8 @@ class AdaptiveQueryExecSuite +@@ -693,7 +719,8 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) // There is still a SMJ, and its two shuffles can't apply local read. @@ -1540,7 +1694,7 @@ index a7efd0aa75e..fa65bda2051 100644 } } -@@ -815,7 +837,8 @@ class AdaptiveQueryExecSuite +@@ -815,7 +842,8 @@ class AdaptiveQueryExecSuite } } @@ -1550,7 +1704,7 @@ index a7efd0aa75e..fa65bda2051 100644 Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint => def getJoinNode(plan: SparkPlan): Seq[ShuffledJoin] = if (joinHint == "SHUFFLE_MERGE") { findTopLevelSortMergeJoin(plan) -@@ -1123,7 +1146,8 @@ class AdaptiveQueryExecSuite +@@ -1123,7 +1151,8 @@ class AdaptiveQueryExecSuite } } @@ -1560,7 +1714,7 @@ index a7efd0aa75e..fa65bda2051 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT key FROM testData GROUP BY key") -@@ -1718,7 +1742,7 @@ class AdaptiveQueryExecSuite +@@ -1718,7 +1747,7 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id") assert(collect(adaptivePlan) { @@ -1569,7 +1723,7 @@ index a7efd0aa75e..fa65bda2051 100644 }.length == 1) } } -@@ -1798,7 +1822,8 @@ class AdaptiveQueryExecSuite +@@ -1798,7 +1827,8 @@ class AdaptiveQueryExecSuite } } @@ -1579,7 +1733,7 @@ index a7efd0aa75e..fa65bda2051 100644 def hasRepartitionShuffle(plan: SparkPlan): Boolean = { find(plan) { case s: ShuffleExchangeLike => -@@ -1983,6 +2008,9 @@ class AdaptiveQueryExecSuite +@@ -1983,6 +2013,9 @@ class AdaptiveQueryExecSuite def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = { assert(collect(ds.queryExecution.executedPlan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1589,7 +1743,7 @@ index a7efd0aa75e..fa65bda2051 100644 }.size == 1) ds.collect() val plan = ds.queryExecution.executedPlan -@@ -1991,6 +2019,9 @@ class AdaptiveQueryExecSuite +@@ -1991,6 +2024,9 @@ class AdaptiveQueryExecSuite }.isEmpty) assert(collect(plan) { case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s @@ -1599,7 +1753,7 @@ index a7efd0aa75e..fa65bda2051 100644 }.size == 1) checkAnswer(ds, testData) } -@@ -2147,7 +2178,8 @@ class AdaptiveQueryExecSuite +@@ -2147,7 +2183,8 @@ class AdaptiveQueryExecSuite } } @@ -1609,7 +1763,7 @@ index a7efd0aa75e..fa65bda2051 100644 withTempView("t1", "t2") { def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = { Seq("100", "100000").foreach { size => -@@ -2233,7 +2265,8 @@ class AdaptiveQueryExecSuite +@@ -2233,7 +2270,8 @@ class AdaptiveQueryExecSuite } } @@ -1619,7 +1773,7 @@ index a7efd0aa75e..fa65bda2051 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", -@@ -2332,7 +2365,7 @@ class AdaptiveQueryExecSuite +@@ -2332,7 +2370,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2 GROUP BY key1") val shuffles1 = collect(adaptive1) { @@ -1628,7 +1782,7 @@ index a7efd0aa75e..fa65bda2051 100644 } assert(shuffles1.size == 3) // shuffles1.head is the top-level shuffle under the Aggregate operator -@@ -2345,7 +2378,7 @@ class AdaptiveQueryExecSuite +@@ -2345,7 +2383,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " + s"JOIN skewData2 ON key1 = key2") val shuffles2 = collect(adaptive2) { @@ -1637,7 +1791,7 @@ index a7efd0aa75e..fa65bda2051 100644 } if (hasRequiredDistribution) { assert(shuffles2.size == 3) -@@ -2379,7 +2412,8 @@ class AdaptiveQueryExecSuite +@@ -2379,7 +2417,8 @@ class AdaptiveQueryExecSuite } } @@ -1647,7 +1801,17 @@ index a7efd0aa75e..fa65bda2051 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2545,6 +2579,7 @@ class AdaptiveQueryExecSuite +@@ -2510,7 +2549,8 @@ class AdaptiveQueryExecSuite + } + + test("SPARK-48037: Fix SortShuffleWriter lacks shuffle write related metrics " + +- "resulting in potentially inaccurate data") { ++ "resulting in potentially inaccurate data", ++ IgnoreComet("too many shuffle partitions causes Java heap OOM")) { + withTable("t3") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +@@ -2545,6 +2585,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -1655,7 +1819,7 @@ index a7efd0aa75e..fa65bda2051 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2562,7 +2597,8 @@ class AdaptiveQueryExecSuite +@@ -2562,7 +2603,8 @@ class AdaptiveQueryExecSuite } } @@ -1665,7 +1829,7 @@ index a7efd0aa75e..fa65bda2051 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2674,7 +2710,7 @@ class AdaptiveQueryExecSuite +@@ -2674,7 +2716,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -1674,7 +1838,7 @@ index a7efd0aa75e..fa65bda2051 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2685,7 +2721,7 @@ class AdaptiveQueryExecSuite +@@ -2685,7 +2727,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -1683,6 +1847,22 @@ index a7efd0aa75e..fa65bda2051 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) +@@ -2911,6 +2953,7 @@ class AdaptiveQueryExecSuite + }.size == (if (firstAccess) 1 else 0)) + assert(collect(initialExecutedPlan) { + case s: SortExec => s ++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec] + }.size == (if (firstAccess) 2 else 0)) + assert(collect(initialExecutedPlan) { + case i: InMemoryTableScanLike => i +@@ -2923,6 +2966,7 @@ class AdaptiveQueryExecSuite + }.isEmpty) + assert(collect(finalExecutedPlan) { + case s: SortExec => s ++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec] + }.isEmpty) + assert(collect(initialExecutedPlan) { + case i: InMemoryTableScanLike => i diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala index 05872d41131..0dd83608bbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala @@ -1727,10 +1907,10 @@ index 0a0b23d1e60..5685926250f 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala -index 04a7b4834f4..3b0fd1eb5aa 100644 +index 04a7b4834f4..8cab62ce4ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala -@@ -17,7 +17,7 @@ +@@ -17,9 +17,10 @@ package org.apache.spark.sql.execution.datasources @@ -1738,8 +1918,27 @@ index 04a7b4834f4..3b0fd1eb5aa 100644 +import org.apache.spark.sql.{IgnoreComet, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} ++import org.apache.spark.sql.comet.CometSortExec import org.apache.spark.sql.execution.{QueryExecution, SortExec} -@@ -305,7 +305,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec + import org.apache.spark.sql.internal.SQLConf +@@ -225,6 +226,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s ++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec] + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), +@@ -272,6 +274,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s ++ case s: CometSortExec => s.originalPlan.asInstanceOf[SortExec] + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), +@@ -305,7 +308,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } }