diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 0238508d9699..9c1089a35bea 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -491,7 +491,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportTransformWriteFiles: Boolean = true - override def allowDecimalArithmetic: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss + override def allowDecimalArithmetic: Boolean = true override def enableNativeWriteFiles(): Boolean = { GlutenConfig.getConf.enableNativeWriter.getOrElse( diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index ad7dacf113ec..060bbe111265 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -32,6 +32,8 @@ const std::string kCaseSensitive = "spark.sql.caseSensitive"; const std::string kSessionTimezone = "spark.sql.session.timeZone"; +const std::string kAllowPrecisionLoss = "spark.sql.decimalOperations.allowPrecisionLoss"; + const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles"; const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default"; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 296b9415b159..8439545ca382 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -455,6 +455,10 @@ std::unordered_map WholeStageResultIterator::getQueryC // Adjust timestamp according to the above configured session timezone. configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = "true"; + // To align with Spark's behavior, allow decimal precision loss or not. + configs[velox::core::QueryConfig::kSparkDecimalOperationsAllowPrecisionLoss] = + veloxCfg_->get(kAllowPrecisionLoss, "true"); + { // partial aggregation memory config auto offHeapMemory = veloxCfg_->get(kSparkTaskOffHeapMemory, facebook::velox::memory::kMaxMemory); diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 07694a36102d..0d78d8946c76 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,6 +16,7 @@ set -exu + VELOX_REPO=https://github.com/oap-project/velox.git VELOX_BRANCH=2024_07_04 VELOX_HOME="" diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/DecimalArithmeticUtil.scala b/gluten-core/src/main/scala/org/apache/gluten/utils/DecimalArithmeticUtil.scala index 479eb8bb5c29..d3a3373aa058 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/utils/DecimalArithmeticUtil.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/utils/DecimalArithmeticUtil.scala @@ -28,8 +28,14 @@ import org.apache.spark.sql.utils.DecimalTypeUtil object DecimalArithmeticUtil { + val MIN_ADJUSTED_SCALE = 6 + val MAX_PRECISION = 38 + val MAX_SCALE = 38 + // Returns the result decimal type of a decimal arithmetic computing. def getResultType(expr: BinaryArithmetic, type1: DecimalType, type2: DecimalType): DecimalType = { + + val allowPrecisionLoss = SQLConf.get.decimalOperationsAllowPrecisionLoss var resultScale = 0 var resultPrecision = 0 expr match { @@ -45,13 +51,34 @@ object DecimalArithmeticUtil { resultScale = type1.scale + type2.scale resultPrecision = type1.precision + type2.precision + 1 case _: Divide => - resultScale = - Math.max(DecimalType.MINIMUM_ADJUSTED_SCALE, type1.scale + type2.precision + 1) - resultPrecision = type1.precision - type1.scale + type2.scale + resultScale + if (allowPrecisionLoss) { + resultScale = Math.max(MIN_ADJUSTED_SCALE, type1.scale + type2.precision + 1) + resultPrecision = type1.precision - type1.scale + type2.scale + resultScale + } else { + var intDig = Math.min(MAX_SCALE, type1.precision - type1.scale + type2.scale) + var decDig = Math.min(MAX_SCALE, Math.max(6, type1.scale + type2.precision + 1)) + val diff = (intDig + decDig) - MAX_SCALE + if (diff > 0) { + decDig -= diff / 2 + 1 + intDig = MAX_SCALE - decDig + } + resultPrecision = intDig + decDig + resultScale = decDig + } case other => throw new GlutenNotSupportException(s"$other is not supported.") } - DecimalTypeUtil.adjustPrecisionScale(resultPrecision, resultScale) + + if (allowPrecisionLoss) { + DecimalTypeUtil.adjustPrecisionScale(resultPrecision, resultScale) + } else { + bounded(resultPrecision, resultScale) + } + + } + + def bounded(precision: Int, scale: Int): DecimalType = { + DecimalType(Math.min(precision, MAX_PRECISION), Math.min(scale, MAX_SCALE)) } // If casting between DecimalType, unnecessary cast is skipped to avoid data loss, diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 4ef96bec27eb..ef4618fca1d1 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -636,6 +636,7 @@ object GlutenConfig { GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY, SQLConf.LEGACY_SIZE_OF_NULL.key, "spark.io.compression.codec", + "spark.sql.decimalOperations.allowPrecisionLoss", COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key, COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS.key, COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key, @@ -717,6 +718,7 @@ object GlutenConfig { ("spark.hadoop.input.write.timeout", "180000"), ("spark.hadoop.dfs.client.log.severity", "INFO"), ("spark.sql.orc.compression.codec", "snappy"), + ("spark.sql.decimalOperations.allowPrecisionLoss", "true"), ( COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.defaultValueString),