Skip to content

Commit

Permalink
chore: Remove COMET_SHUFFLE_ENFORCE_MODE_ENABLED
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jul 10, 2024
1 parent fdd2c4f commit bce36c5
Show file tree
Hide file tree
Showing 6 changed files with 1 addition and 27 deletions.
11 changes: 0 additions & 11 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("jvm")

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
.doc(
"Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " +
"partitions is enabled, Comet shuffle won't be triggered even enabled. This config " +
"is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " +
"enabled. This is for testing purpose only.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
1 change: 0 additions & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.comet.batchSize=8192 \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.comet.shuffle.enforceMode.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
tpcbench.py \
--benchmark tpch \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,21 +1058,13 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
// We should disable Comet shuffle when AQE coalesce partitions is enabled.
(!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)

private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled")
} else if (!isCometShuffleManagerEnabled(conf)) {
Some(s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}")
} else if (conf.coalesceShufflePartitionsEnabled && !COMET_SHUFFLE_ENFORCE_MODE_ENABLED
.get()) {
Some(
s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " +
s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled")
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
EXTENDED_EXPLAIN_PROVIDERS_KEY -> "org.apache.comet.ExtendedExplainInfo") {
val table = "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Seq(true, false).foreach { dictionaryEnabled =>
withSQLConf(
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
withParquetTable(
(0 until numValues).map(i => (i, Random.nextInt() % numGroups)),
Expand All @@ -578,7 +577,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Seq(true, false).foreach { dictionaryEnabled =>
withSQLConf(
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
withTempPath { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand Down Expand Up @@ -619,7 +617,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Seq(true, false).foreach { dictionaryEnabled =>
withSQLConf(
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
withTempPath { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand Down Expand Up @@ -969,7 +966,6 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withSQLConf(
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ class CometTPCDSQuerySuite
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g")
conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
Expand Down

0 comments on commit bce36c5

Please sign in to comment.