diff --git a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index d196691d1b14..63fb0cc1b9bd 100644 --- a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -16,6 +16,7 @@ */ package org.apache.spark.shuffle.gluten.celeborn; +import org.apache.gluten.GlutenConfig; import org.apache.gluten.backendsapi.BackendsApiManager; import org.apache.gluten.exception.GlutenException; @@ -194,9 +195,14 @@ public ShuffleHandle registerShuffle( if (dependency instanceof ColumnarShuffleDependency) { if (fallbackPolicyRunner.applyAllFallbackPolicy( lifecycleManager, dependency.partitioner().numPartitions())) { - logger.warn("Fallback to ColumnarShuffleManager!"); - columnarShuffleIds.add(shuffleId); - return columnarShuffleManager().registerShuffle(shuffleId, dependency); + if (GlutenConfig.getConf().enableCelebornFallback()) { + logger.warn("Fallback to ColumnarShuffleManager!"); + columnarShuffleIds.add(shuffleId); + return columnarShuffleManager().registerShuffle(shuffleId, dependency); + } else { + throw new GlutenException( + "The Celeborn service(Master: " + celebornConf.masterHost() + ") is unavailable"); + } } else { return registerCelebornShuffleHandle(shuffleId, dependency); } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 89933cc58a4d..58b99a7f3064 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -447,6 +447,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED) + + def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED) } object GlutenConfig { @@ -2049,4 +2051,12 @@ object GlutenConfig { .doubleConf .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]") .createWithDefault(0.6) + + val CELEBORN_FALLBACK_ENABLED = + buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled") + .internal() + .doc("If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable." + + "Otherwise, throw an exception.") + .booleanConf + .createWithDefault(true) } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala index 50766f3a91d1..e680ce9d5dda 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala @@ -44,6 +44,7 @@ object Constants { val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false) .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true") + .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", "false") .set("spark.sql.parquet.enableVectorizedReader", "true") .set("spark.plugins", "org.apache.gluten.GlutenPlugin") .set(