From bce36c519c26391e70e9ebcfa7213d2cd7a08b5a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 10 Jul 2024 12:40:20 -0700 Subject: [PATCH] chore: Remove COMET_SHUFFLE_ENFORCE_MODE_ENABLED --- .../src/main/scala/org/apache/comet/CometConf.scala | 11 ----------- docs/source/contributor-guide/benchmarking.md | 1 - .../apache/comet/CometSparkSessionExtensions.scala | 10 +--------- .../scala/org/apache/comet/CometExpressionSuite.scala | 1 - .../org/apache/comet/exec/CometAggregateSuite.scala | 4 ---- .../org/apache/spark/sql/CometTPCDSQuerySuite.scala | 1 - 6 files changed, 1 insertion(+), 27 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 787e8b4e93..4daa0f18de 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -146,17 +146,6 @@ object CometConf extends ShimCometConf { .checkValues(Set("native", "jvm", "auto")) .createWithDefault("jvm") - val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.shuffle.enforceMode.enabled") - .doc( - "Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " + - "partitions is enabled, Comet shuffle won't be triggered even enabled. This config " + - "is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " + - "enabled. This is for testing purpose only.") - .internal() - .booleanConf - .createWithDefault(false) - val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled") .doc( diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index d315c559eb..db9adae454 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -69,7 +69,6 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.comet.batchSize=8192 \ --conf spark.comet.exec.shuffle.enabled=true \ --conf spark.comet.exec.shuffle.mode=auto \ - --conf spark.comet.shuffle.enforceMode.enabled=true \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ tpcbench.py \ --benchmark tpch \ diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index c30d6b0f7c..0fef22f07b 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1058,21 +1058,13 @@ object CometSparkSessionExtensions extends Logging { } private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = - COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) && - // TODO: AQE coalesce partitions feature causes Comet shuffle memory leak. - // We should disable Comet shuffle when AQE coalesce partitions is enabled. - (!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get()) + COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = { if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) { Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled") } else if (!isCometShuffleManagerEnabled(conf)) { Some(s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}") - } else if (conf.coalesceShufflePartitionsEnabled && !COMET_SHUFFLE_ENFORCE_MODE_ENABLED - .get()) { - Some( - s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " + - s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled") } else { None } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index cb2069ed22..7c992055a5 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1519,7 +1519,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", EXTENDED_EXPLAIN_PROVIDERS_KEY -> "org.apache.comet.ExtendedExplainInfo") { val table = "test" diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 2acc562b5a..58cc9be8bb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -551,7 +551,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withSQLConf( SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { withParquetTable( (0 until numValues).map(i => (i, Random.nextInt() % numGroups)), @@ -578,7 +577,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withSQLConf( SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { withTempPath { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -619,7 +617,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withSQLConf( SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { withTempPath { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -969,7 +966,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 6eeb7e334e..ab5b9d3d9c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -192,7 +192,6 @@ class CometTPCDSQuerySuite conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g") - conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true") conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")