Skip to content

Commit

Permalink
Merge branch 'main' into fix-input-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ivoson authored Sep 2, 2024
2 parents e4a004c + 376167e commit ffe058a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
} else if (SparkShimLoader.getSparkShims.withAnsiEvalMode(original)) {
throw new GlutenNotSupportException(s"$substraitExprName with ansi mode is not supported")
} else {
GenericExpressionTransformer(substraitExprName, Seq(left, right), original)
if (
left.dataType.isInstanceOf[DecimalType] && right.dataType
.isInstanceOf[DecimalType] && !SQLConf.get.decimalOperationsAllowPrecisionLoss
) {
val newName = "not_allow_precision_loss_"
GenericExpressionTransformer(newName, Seq(left, right), original)
} else {
GenericExpressionTransformer(substraitExprName, Seq(left, right), original)
}
}
}

Expand Down
6 changes: 1 addition & 5 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ WholeStageResultIterator::WholeStageResultIterator(
0,
std::move(queryCtx),
velox::exec::Task::ExecutionMode::kSerial);
if (!task_->supportsSingleThreadedExecution()) {
if (!task_->supportSerialExecutionMode()) {
throw std::runtime_error("Task doesn't support single thread execution: " + planNode->toString());
}
auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr);
Expand Down Expand Up @@ -445,10 +445,6 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
// Adjust timestamp according to the above configured session timezone.
configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = "true";

// To align with Spark's behavior, allow decimal precision loss or not.
configs[velox::core::QueryConfig::kSparkDecimalOperationsAllowPrecisionLoss] =
veloxCfg_->get<std::string>(kAllowPrecisionLoss, "true");

{
// partial aggregation memory config
auto offHeapMemory = veloxCfg_->get<int64_t>(kSparkTaskOffHeapMemory, facebook::velox::memory::kMaxMemory);
Expand Down
11 changes: 8 additions & 3 deletions cpp/velox/memory/VeloxMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,27 @@ bool VeloxMemoryManager::tryDestructSafe() {

// Velox memory manager considered safe to destruct when no alive pools.
if (veloxMemoryManager_) {
if (veloxMemoryManager_->numPools() > 1) {
if (veloxMemoryManager_->numPools() > 2) {
return false;
}
if (veloxMemoryManager_->numPools() == 1) {
if (veloxMemoryManager_->numPools() == 2) {
// Assert the pool is spill pool
// See https://github.com/facebookincubator/velox/commit/e6f84e8ac9ef6721f527a2d552a13f7e79bdf72e
int32_t spillPoolCount = 0;
int32_t tracePoolCount = 0;
veloxMemoryManager_->testingDefaultRoot().visitChildren([&](velox::memory::MemoryPool* child) -> bool {
if (child == veloxMemoryManager_->spillPool()) {
spillPoolCount++;
}
if (child == veloxMemoryManager_->tracePool()) {
tracePoolCount++;
}
return true;
});
GLUTEN_CHECK(spillPoolCount == 1, "Illegal pool count state: spillPoolCount: " + std::to_string(spillPoolCount));
GLUTEN_CHECK(tracePoolCount == 1, "Illegal pool count state: tracePoolCount: " + std::to_string(tracePoolCount));
}
if (veloxMemoryManager_->numPools() < 1) {
if (veloxMemoryManager_->numPools() < 2) {
GLUTEN_CHECK(false, "Unreachable code");
}
}
Expand Down
10 changes: 10 additions & 0 deletions cpp/velox/operators/functions/RegistrationAllFunctions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
#include "velox/functions/sparksql/DecimalArithmetic.h"
#include "velox/functions/sparksql/Hash.h"
#include "velox/functions/sparksql/Rand.h"
#include "velox/functions/sparksql/Register.h"
Expand Down Expand Up @@ -74,6 +75,14 @@ void registerFunctionOverwrite() {

velox::functions::registerPrestoVectorFunctions();
}

void registerFunctionForConfig() {
const std::string prefix = "not_allow_precision_loss_";
velox::functions::sparksql::registerDecimalAdd(prefix, false);
velox::functions::sparksql::registerDecimalSubtract(prefix, false);
velox::functions::sparksql::registerDecimalMultiply(prefix, false);
velox::functions::sparksql::registerDecimalDivide(prefix, false);
}
} // namespace

void registerAllFunctions() {
Expand All @@ -87,6 +96,7 @@ void registerAllFunctions() {
// Using function overwrite to handle function names mismatch between Spark
// and Velox.
registerFunctionOverwrite();
registerFunctionForConfig();
}

} // namespace gluten
2 changes: 1 addition & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_08_27
VELOX_BRANCH=2024_09_01
VELOX_HOME=""

OS=`uname -s`
Expand Down

0 comments on commit ffe058a

Please sign in to comment.