Skip to content

Commit

Permalink
Merge branch 'apache:main' into gayangya/log_even_reason_is_none
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyangxiaozhu authored Jun 27, 2024
2 parents 2ec64c8 + b65ecce commit d2edd0f
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 22 deletions.
7 changes: 0 additions & 7 deletions cpp/velox/operators/functions/RegistrationAllFunctions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ void registerFunctionOverwrite() {
velox::registerFunction<RoundFunction, int64_t, int64_t, int32_t>({"round"});
velox::registerFunction<RoundFunction, double, double, int32_t>({"round"});
velox::registerFunction<RoundFunction, float, float, int32_t>({"round"});
// TODO: the below rand function registry can be removed after presto function registry is removed.
velox::registerFunction<velox::functions::sparksql::RandFunction, double, velox::Constant<int32_t>>({"spark_rand"});
velox::registerFunction<velox::functions::sparksql::RandFunction, double, velox::Constant<int64_t>>({"spark_rand"});

auto kRowConstructorWithNull = RowConstructorWithNullCallToSpecialForm::kRowConstructorWithNull;
velox::exec::registerVectorFunction(
Expand All @@ -74,10 +71,6 @@ void registerFunctionOverwrite() {
velox::exec::registerFunctionCallToSpecialForm(
kRowConstructorWithAllNull,
std::make_unique<RowConstructorWithNullCallToSpecialForm>(kRowConstructorWithAllNull));
velox::functions::registerBinaryIntegral<velox::functions::CheckedPlusFunction>({"check_add"});
velox::functions::registerBinaryIntegral<velox::functions::CheckedMinusFunction>({"check_subtract"});
velox::functions::registerBinaryIntegral<velox::functions::CheckedMultiplyFunction>({"check_multiply"});
velox::functions::registerBinaryIntegral<velox::functions::CheckedDivideFunction>({"check_divide"});

velox::functions::registerPrestoVectorFunctions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.shuffle.gluten.celeborn;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.exception.GlutenException;

Expand Down Expand Up @@ -194,9 +195,14 @@ public <K, V, C> ShuffleHandle registerShuffle(
if (dependency instanceof ColumnarShuffleDependency) {
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
logger.warn("Fallback to ColumnarShuffleManager!");
columnarShuffleIds.add(shuffleId);
return columnarShuffleManager().registerShuffle(shuffleId, dependency);
if (GlutenConfig.getConf().enableCelebornFallback()) {
logger.warn("Fallback to ColumnarShuffleManager!");
columnarShuffleIds.add(shuffleId);
return columnarShuffleManager().registerShuffle(shuffleId, dependency);
} else {
throw new GlutenException(
"The Celeborn service(Master: " + celebornConf.masterHost() + ") is unavailable");
}
} else {
return registerCelebornShuffleHandle(shuffleId, dependency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,63 +564,63 @@ object ExpressionConverter extends SQLConfHelper with Logging {
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
tryEval,
ExpressionNames.CHECK_ADD
ExpressionNames.CHECKED_ADD
)
case tryEval @ TryEval(a: Subtract) =>
BackendsApiManager.getSparkPlanExecApiInstance.genTryArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
tryEval,
ExpressionNames.CHECK_SUBTRACT
ExpressionNames.CHECKED_SUBTRACT
)
case tryEval @ TryEval(a: Divide) =>
BackendsApiManager.getSparkPlanExecApiInstance.genTryArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
tryEval,
ExpressionNames.CHECK_DIVIDE
ExpressionNames.CHECKED_DIVIDE
)
case tryEval @ TryEval(a: Multiply) =>
BackendsApiManager.getSparkPlanExecApiInstance.genTryArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
tryEval,
ExpressionNames.CHECK_MULTIPLY
ExpressionNames.CHECKED_MULTIPLY
)
case a: Add =>
BackendsApiManager.getSparkPlanExecApiInstance.genArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
a,
ExpressionNames.CHECK_ADD
ExpressionNames.CHECKED_ADD
)
case a: Subtract =>
BackendsApiManager.getSparkPlanExecApiInstance.genArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
a,
ExpressionNames.CHECK_SUBTRACT
ExpressionNames.CHECKED_SUBTRACT
)
case a: Multiply =>
BackendsApiManager.getSparkPlanExecApiInstance.genArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
a,
ExpressionNames.CHECK_MULTIPLY
ExpressionNames.CHECKED_MULTIPLY
)
case a: Divide =>
BackendsApiManager.getSparkPlanExecApiInstance.genArithmeticTransformer(
substraitExprName,
replaceWithExpressionTransformerInternal(a.left, attributeSeq, expressionsMap),
replaceWithExpressionTransformerInternal(a.right, attributeSeq, expressionsMap),
a,
ExpressionNames.CHECK_DIVIDE
ExpressionNames.CHECKED_DIVIDE
)
case tryEval: TryEval =>
// This is a placeholder to handle try_eval(other expressions).
Expand Down
10 changes: 10 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)

def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)

def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
}

object GlutenConfig {
Expand Down Expand Up @@ -2049,4 +2051,12 @@ object GlutenConfig {
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]")
.createWithDefault(0.6)

val CELEBORN_FALLBACK_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled")
.internal()
.doc("If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable." +
"Otherwise, throw an exception.")
.booleanConf
.createWithDefault(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ object ExpressionNames {
final val IS_NAN = "isnan"
final val NANVL = "nanvl"
final val TRY_EVAL = "try"
final val CHECK_ADD = "check_add"
final val CHECK_SUBTRACT = "check_subtract"
final val CHECK_DIVIDE = "check_divide"
final val CHECK_MULTIPLY = "check_multiply"
final val CHECKED_ADD = "checked_add"
final val CHECKED_SUBTRACT = "checked_subtract"
final val CHECKED_DIVIDE = "checked_divide"
final val CHECKED_MULTIPLY = "checked_multiply"

// SparkSQL String functions
final val ASCII = "ascii"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Constants {

val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false)
.set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
.set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", "false")
.set("spark.sql.parquet.enableVectorizedReader", "true")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set(
Expand Down

0 comments on commit d2edd0f

Please sign in to comment.