Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-2874][VL] support allowDecimalPrecisionLoss #2895

Merged
merged 8 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def supportTransformWriteFiles: Boolean = true

override def allowDecimalArithmetic: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss
override def allowDecimalArithmetic: Boolean = true

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const std::string kCaseSensitive = "spark.sql.caseSensitive";

const std::string kSessionTimezone = "spark.sql.session.timeZone";

const std::string kAllowPrecisionLoss = "spark.sql.decimalOperations.allowPrecisionLoss";

const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles";

const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default";
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
// Adjust timestamp according to the above configured session timezone.
configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = "true";

// To align with Spark's behavior, allow decimal precision loss or not.
configs[velox::core::QueryConfig::kSparkDecimalOperationsAllowPrecisionLoss] =
veloxCfg_->get<std::string>(kAllowPrecisionLoss, "true");

{
// partial aggregation memory config
auto offHeapMemory = veloxCfg_->get<int64_t>(kSparkTaskOffHeapMemory, facebook::velox::memory::kMaxMemory);
Expand Down
1 change: 1 addition & 0 deletions ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

set -exu


VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_07_04
VELOX_HOME=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ import org.apache.spark.sql.utils.DecimalTypeUtil

object DecimalArithmeticUtil {

val MIN_ADJUSTED_SCALE = 6
val MAX_PRECISION = 38
val MAX_SCALE = 38

// Returns the result decimal type of a decimal arithmetic computing.
def getResultType(expr: BinaryArithmetic, type1: DecimalType, type2: DecimalType): DecimalType = {

val allowPrecisionLoss = SQLConf.get.decimalOperationsAllowPrecisionLoss
var resultScale = 0
var resultPrecision = 0
expr match {
Expand All @@ -45,13 +51,34 @@ object DecimalArithmeticUtil {
resultScale = type1.scale + type2.scale
resultPrecision = type1.precision + type2.precision + 1
case _: Divide =>
resultScale =
Math.max(DecimalType.MINIMUM_ADJUSTED_SCALE, type1.scale + type2.precision + 1)
resultPrecision = type1.precision - type1.scale + type2.scale + resultScale
if (allowPrecisionLoss) {
resultScale = Math.max(MIN_ADJUSTED_SCALE, type1.scale + type2.precision + 1)
resultPrecision = type1.precision - type1.scale + type2.scale + resultScale
} else {
var intDig = Math.min(MAX_SCALE, type1.precision - type1.scale + type2.scale)
var decDig = Math.min(MAX_SCALE, Math.max(6, type1.scale + type2.precision + 1))
val diff = (intDig + decDig) - MAX_SCALE
if (diff > 0) {
decDig -= diff / 2 + 1
intDig = MAX_SCALE - decDig
}
resultPrecision = intDig + decDig
resultScale = decDig
}
case other =>
throw new GlutenNotSupportException(s"$other is not supported.")
}
DecimalTypeUtil.adjustPrecisionScale(resultPrecision, resultScale)

if (allowPrecisionLoss) {
DecimalTypeUtil.adjustPrecisionScale(resultPrecision, resultScale)
} else {
bounded(resultPrecision, resultScale)
}

}

def bounded(precision: Int, scale: Int): DecimalType = {
DecimalType(Math.min(precision, MAX_PRECISION), Math.min(scale, MAX_SCALE))
}

// If casting between DecimalType, unnecessary cast is skipped to avoid data loss,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ object GlutenConfig {
GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY,
SQLConf.LEGACY_SIZE_OF_NULL.key,
"spark.io.compression.codec",
"spark.sql.decimalOperations.allowPrecisionLoss",
COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key,
COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS.key,
COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key,
Expand Down Expand Up @@ -717,6 +718,7 @@ object GlutenConfig {
("spark.hadoop.input.write.timeout", "180000"),
("spark.hadoop.dfs.client.log.severity", "INFO"),
("spark.sql.orc.compression.codec", "snappy"),
("spark.sql.decimalOperations.allowPrecisionLoss", "true"),
(
COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key,
COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.defaultValueString),
Expand Down
Loading