Skip to content

Commit

Permalink
feat: Enable remaining Spark 3.5.1 tests (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jul 18, 2024
1 parent 896096a commit b558063
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 82 deletions.
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ object CometConf extends ShimCometConf {
"why a query stage cannot be executed natively. Set this to false to " +
"reduce the amount of logging.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
Expand Down
83 changes: 4 additions & 79 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 746f289c393..bc01ffd52ea 100644
index 746f289c393..1a2f1f7e3fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -25,10 +25,11 @@ import org.apache.spark.sql.catalyst.expressions
Expand Down Expand Up @@ -2120,29 +2120,15 @@ index 746f289c393..bc01ffd52ea 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
@@ -1013,7 +1039,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}
}

- test("bucket coalescing is applied when join expressions match with partitioning expressions") {
+ // https://github.com/apache/datafusion-comet/issues/617
+ test("bucket coalescing is applied when join expressions match with partitioning expressions",
+ IgnoreComet("TODO: fix Comet for this test")) {
withTable("t1", "t2", "t3") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
@@ -1029,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -1029,15 +1055,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
- val shuffles = collect(plan) { case s: ShuffleExchangeExec => s }
+ val shuffles = collect(plan) {
+ case s: ShuffleExchangeLike => s
+ }
+ val shuffles = collect(plan) { case s: ShuffleExchangeLike => s }
assert(shuffles.length == expectedNumShuffles)

- val scans = collect(plan) {
+ val scans = plan.collect {
val scans = collect(plan) {
case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f
+ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b
}
Expand Down Expand Up @@ -2379,67 +2365,6 @@ index b4c4ec7acbf..20579284856 100644
}

val aggregateExecsWithoutPartialAgg = allAggregateExecs.filter {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 3e1bc57dfa2..662640af934 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -28,10 +28,7 @@ import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter

import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.{DataFrame, IgnoreComet, Row, SparkSession}
import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper}
import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStore, StateStoreProviderId}
import org.apache.spark.sql.functions._
@@ -594,40 +591,10 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
CheckNewAnswer((5, 10, 5, 15, 5, 25)))
}

- test("streaming join should require StatefulOpClusteredDistribution from children") {
- val input1 = MemoryStream[Int]
- val input2 = MemoryStream[Int]
-
- val df1 = input1.toDF
- .select($"value" as Symbol("a"), $"value" * 2 as Symbol("b"))
- val df2 = input2.toDF
- .select($"value" as Symbol("a"), $"value" * 2 as Symbol("b"))
- .repartition($"b")
- val joined = df1.join(df2, Seq("a", "b")).select($"a")
-
- testStream(joined)(
- AddData(input1, 1.to(1000): _*),
- AddData(input2, 1.to(1000): _*),
- CheckAnswer(1.to(1000): _*),
- Execute { query =>
- // Verify the query plan
- def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = {
- expressions.flatMap {
- case ref: AttributeReference => Some(ref.name)
- }
- }
-
- val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
-
- assert(query.lastExecution.executedPlan.collect {
- case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
- ShuffleExchangeExec(opA: HashPartitioning, _, _, _),
- ShuffleExchangeExec(opB: HashPartitioning, _, _, _))
- if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
- && partitionExpressionsColumns(opB.expressions) === Seq("a", "b")
- && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j
- }.size == 1)
- })
+ // https://github.com/apache/datafusion-comet/issues/617
+ test("streaming join should require StatefulOpClusteredDistribution from children",
+ IgnoreComet("TODO: fix Comet for this test")) {
+ fail("TODO fix diff")
}

test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index abe606ad9c1..2d930b64cca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | true |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ Comet currently supports the following versions of Apache Spark:

- 3.3.x
- 3.4.x
- 3.5.x

Experimental support is provided for the following versions of Apache Spark and is intended for development/testing
use only and should not be used in production yet.

- 3.5.x
- 4.0.0-preview1

Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by
Expand Down

0 comments on commit b558063

Please sign in to comment.