From 0c9f79a6299ceb1efeec6fc90c68e11c3a156094 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 10 Jun 2024 13:15:24 -0600 Subject: [PATCH] chore: disable xxhash64 by default (#548) * disable xxhash64 by default * fix regressions --- .../scala/org/apache/comet/CometConf.scala | 6 ++++ docs/source/user-guide/configs.md | 1 + .../apache/comet/serde/QueryPlanSerde.scala | 32 ++++++++++++------- .../apache/comet/CometExpressionSuite.scala | 2 ++ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 42fb5fb4c..1b40c7cd0 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -401,6 +401,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_XXHASH64_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.xxhash64.enabled") + .doc("The xxhash64 implementation is not optimized yet and may cause performance issues.") + .booleanConf + .createWithDefault(false) + } object ConfigHelpers { diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 104f29ce8..f232dc8b8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -48,3 +48,4 @@ Comet provides the following configuration settings. | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when 'spark.comet.columnar.shuffle.enabled' is true. | 10.0 | +| spark.comet.xxhash64.enabled | The xxhash64 implementation is not optimized yet and may cause performance issues. | false | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7d9bef48c..5a0ad38d7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2101,19 +2101,27 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim scalarExprToProtoWithReturnType("murmur3_hash", IntegerType, exprs :+ seedExpr: _*) case XxHash64(children, seed) => - val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType)) - if (firstUnSupportedInput.isDefined) { - withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}") - return None + if (CometConf.COMET_XXHASH64_ENABLED.get()) { + val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType)) + if (firstUnSupportedInput.isDefined) { + withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}") + return None + } + val exprs = children.map(exprToProtoInternal(_, inputs)) + val seedBuilder = ExprOuterClass.Literal + .newBuilder() + .setDatatype(serializeDataType(LongType).get) + .setLongVal(seed) + val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build()) + // the seed is put at the end of the arguments + scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*) + } else { + withInfo( + expr, + "xxhash64 is disabled by default. " + + s"Set ${CometConf.COMET_XXHASH64_ENABLED.key}=true to enable it.") + None } - val exprs = children.map(exprToProtoInternal(_, inputs)) - val seedBuilder = ExprOuterClass.Literal - .newBuilder() - .setDatatype(serializeDataType(LongType).get) - .setLongVal(seed) - val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build()) - // the seed is put at the end of the arguments - scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*) case Sha2(left, numBits) => if (!numBits.foldable) { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 7e6d2d127..10fbc468f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1506,6 +1506,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_XXHASH64_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { @@ -1538,6 +1539,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_XXHASH64_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) {