diff --git a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index f454cf00c656f..6c9eacb9c72b2 100644 --- a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -16,6 +16,7 @@ */ package org.apache.spark.shuffle.gluten.celeborn; +import org.apache.gluten.GlutenConfig; import org.apache.gluten.backendsapi.BackendsApiManager; import org.apache.gluten.exception.GlutenException; @@ -194,9 +195,14 @@ public ShuffleHandle registerShuffle( if (dependency instanceof ColumnarShuffleDependency) { if (fallbackPolicyRunner.applyAllFallbackPolicy( lifecycleManager, dependency.partitioner().numPartitions())) { - logger.warn("Fallback to ColumnarShuffleManager!"); - columnarShuffleIds.add(shuffleId); - return columnarShuffleManager().registerShuffle(shuffleId, dependency); + if (GlutenConfig.getConf().enableCelebornFallback()) { + logger.warn("Fallback to ColumnarShuffleManager!"); + columnarShuffleIds.add(shuffleId); + return columnarShuffleManager().registerShuffle(shuffleId, dependency); + } else { + throw new GlutenException( + "The Celeborn service(Master: " + celebornConf.masterHost() + ") is unavailable"); + } } else { return registerCelebornShuffleHandle(shuffleId, dependency); } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index cc2d6ac5fdefb..a79534606c321 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -448,6 +448,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED) + + def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED) } object GlutenConfig { @@ -2043,4 +2045,11 @@ object GlutenConfig { .doubleConf .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]") .createWithDefault(0.6) + + val CELEBORN_FALLBACK_ENABLED = + buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled") + .internal() + .doc("Enable celeborn fallback. Throw an exception immediately if the celeborn service is unavailable when disabled.") + .booleanConf + .createWithDefault(true) } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala index 50766f3a91d1b..e680ce9d5dda3 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala @@ -44,6 +44,7 @@ object Constants { val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false) .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true") + .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", "false") .set("spark.sql.parquet.enableVectorizedReader", "true") .set("spark.plugins", "org.apache.gluten.GlutenPlugin") .set(