diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index bcc8d25a7..5c3ebf6fb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -289,7 +289,7 @@ object CometConf extends ShimCometConf { "why a query stage cannot be executed natively. Set this to false to " + "reduce the amount of logging.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff index 6892e8686..19eda8749 100644 --- a/dev/diffs/3.5.1.diff +++ b/dev/diffs/3.5.1.diff @@ -2003,7 +2003,7 @@ index d083cac48ff..3c11bcde807 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 746f289c393..bc01ffd52ea 100644 +index 746f289c393..1a2f1f7e3fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -25,10 +25,11 @@ import org.apache.spark.sql.catalyst.expressions @@ -2120,29 +2120,15 @@ index 746f289c393..bc01ffd52ea 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -1013,7 +1039,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti - } - } - -- test("bucket coalescing is applied when join expressions match with partitioning expressions") { -+ // https://github.com/apache/datafusion-comet/issues/617 -+ test("bucket coalescing is applied when join expressions match with partitioning expressions", -+ IgnoreComet("TODO: fix Comet for this test")) { - withTable("t1", "t2", "t3") { - df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") - df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") -@@ -1029,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1029,15 +1055,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti Seq(true, false).foreach { aqeEnabled => withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) { val plan = sql(query).queryExecution.executedPlan - val shuffles = collect(plan) { case s: ShuffleExchangeExec => s } -+ val shuffles = collect(plan) { -+ case s: ShuffleExchangeLike => s -+ } ++ val shuffles = collect(plan) { case s: ShuffleExchangeLike => s } assert(shuffles.length == expectedNumShuffles) -- val scans = collect(plan) { -+ val scans = plan.collect { + val scans = collect(plan) { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b } @@ -2379,67 +2365,6 @@ index b4c4ec7acbf..20579284856 100644 } val aggregateExecsWithoutPartialAgg = allAggregateExecs.filter { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala -index 3e1bc57dfa2..662640af934 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala -@@ -28,10 +28,7 @@ import org.apache.commons.io.FileUtils - import org.scalatest.BeforeAndAfter - - import org.apache.spark.scheduler.ExecutorCacheTaskLocation --import org.apache.spark.sql.{DataFrame, Row, SparkSession} --import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} --import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning --import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -+import org.apache.spark.sql.{DataFrame, IgnoreComet, Row, SparkSession} - import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper} - import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStore, StateStoreProviderId} - import org.apache.spark.sql.functions._ -@@ -594,40 +591,10 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { - CheckNewAnswer((5, 10, 5, 15, 5, 25))) - } - -- test("streaming join should require StatefulOpClusteredDistribution from children") { -- val input1 = MemoryStream[Int] -- val input2 = MemoryStream[Int] -- -- val df1 = input1.toDF -- .select($"value" as Symbol("a"), $"value" * 2 as Symbol("b")) -- val df2 = input2.toDF -- .select($"value" as Symbol("a"), $"value" * 2 as Symbol("b")) -- .repartition($"b") -- val joined = df1.join(df2, Seq("a", "b")).select($"a") -- -- testStream(joined)( -- AddData(input1, 1.to(1000): _*), -- AddData(input2, 1.to(1000): _*), -- CheckAnswer(1.to(1000): _*), -- Execute { query => -- // Verify the query plan -- def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { -- expressions.flatMap { -- case ref: AttributeReference => Some(ref.name) -- } -- } -- -- val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) -- -- assert(query.lastExecution.executedPlan.collect { -- case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _, -- ShuffleExchangeExec(opA: HashPartitioning, _, _, _), -- ShuffleExchangeExec(opB: HashPartitioning, _, _, _)) -- if partitionExpressionsColumns(opA.expressions) === Seq("a", "b") -- && partitionExpressionsColumns(opB.expressions) === Seq("a", "b") -- && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j -- }.size == 1) -- }) -+ // https://github.com/apache/datafusion-comet/issues/617 -+ test("streaming join should require StatefulOpClusteredDistribution from children", -+ IgnoreComet("TODO: fix Comet for this test")) { -+ fail("TODO fix diff") - } - - test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index abe606ad9c1..2d930b64cca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 501a2ba3c..8a0f2440a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -39,7 +39,7 @@ Comet provides the following configuration settings. | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | | spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | -| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | true | +| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | diff --git a/docs/source/user-guide/overview.md b/docs/source/user-guide/overview.md index d7ecf04d3..fcfbd5f85 100644 --- a/docs/source/user-guide/overview.md +++ b/docs/source/user-guide/overview.md @@ -44,11 +44,11 @@ Comet currently supports the following versions of Apache Spark: - 3.3.x - 3.4.x +- 3.5.x Experimental support is provided for the following versions of Apache Spark and is intended for development/testing use only and should not be used in production yet. -- 3.5.x - 4.0.0-preview1 Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by