From b8fb686fe51258f974e9f38ff7126e4cf8396edb Mon Sep 17 00:00:00 2001 From: lwz9103 Date: Wed, 29 May 2024 17:35:46 +0800 Subject: [PATCH] Fix ResultPlan test coverage 1. Remove datafilter position due to page index not support 2. Add gluten ke config test 3. Enable native page index by default 4. Fix KapExpressionTransformer test coverage --- .../apache/kylin/common/KylinConfigBase.java | 4 -- .../kylin/newten/ExtractLimitInfoTest.java | 2 + .../kylin/query/runtime/plan/ResultPlan.scala | 40 +++---------------- .../gluten/KapExpressionTransformer.scala | 13 ++---- .../spark/sql/common/GlutenTestConfig.scala | 1 + 5 files changed, 11 insertions(+), 49 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 62da8a72e7e..25b68e86d98 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -4325,10 +4325,6 @@ public boolean queryIndexUseGluten() { return Boolean.parseBoolean(this.getOptional("kylin.query.index-use-gulten", TRUE)); } - public int queryGlutenDataFilterMinimalPosition() { - return Math.max(0, Integer.parseInt(this.getOptional("kylin.query.gluten-datafilter-minimal-position", "0"))); - } - public boolean isPushdownSqlHintsErasingEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.pushdown.sql-hints-erasing.enabled", TRUE)); } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java index 0ba939b251d..ed383cf907f 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java @@ -31,6 +31,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.common.GlutenTestConfig; import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.StaticSQLConf; import org.junit.After; @@ -61,6 +62,7 @@ public static void initSpark() { sparkConf.set("spark.sql.crossJoin.enabled", "true"); sparkConf.set("spark.sql.adaptive.enabled", "false"); sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1"); + GlutenTestConfig.configGluten(sparkConf); ss = SparkSession.builder().config(sparkConf).getOrCreate(); SparderEnv.setSparkSession(ss); } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index b4dc3e981c9..6dfd0690ec5 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import java.{lang, util} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils +import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector} import org.apache.hadoop.fs.Path import org.apache.kylin.common.exception.code.ErrorCodeServer import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException} @@ -36,15 +37,16 @@ import org.apache.kylin.metadata.state.QueryShareStateManager import org.apache.kylin.query.engine.RelColumnMetaDataExtractor import org.apache.kylin.query.engine.exec.ExecuteResult import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow +import org.apache.kylin.query.relnode.ContextUtil import org.apache.kylin.query.util.{AsyncQueryUtil, QueryInterruptChecker, SparkJobTrace, SparkQueryJobManager} import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook} import org.apache.spark.SparkConf import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer import org.apache.spark.sql.hive.QueryMetricUtils import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv} import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer -import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector} import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`iterator asScala` @@ -279,42 +281,10 @@ object ResultPlan extends LogEx { } } - def shouldPushDownFilterFallback(plan: LogicalPlan): Boolean = { - checkDataFilterInternal(plan) || plan.children.exists(shouldPushDownFilterFallback) - } - - def checkDataFilterInternal(p: LogicalPlan): Boolean = p match { - case PhysicalOperation(projects, filters, - l@LogicalRelation(fsRelation@HadoopFsRelation(_: FilePruner, _, _, _, _, _), _, table, _)) => - - val normalizedFilters = filters.map { e => - e transform { - case a: AttributeReference => - a.withName(l.output.find(_.semanticEquals(a)).get.name) - } - } - - val partitionColumns = l.resolve( - fsRelation.partitionSchema, fsRelation.sqlContext.sparkSession.sessionState.analyzer.resolver) - val partitionSet = AttributeSet(partitionColumns) - val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) - val names = fsRelation.dataSchema.map(schema => schema.name) - .slice(0, KylinConfig.getInstanceFromEnv.queryGlutenDataFilterMinimalPosition).seq - // data filter should not contains minimal schema - dataFilters.exists(f => f.references.exists(r => names.contains(r.name))) - case _ => false - } - def getResult(df: DataFrame, rowType: RelDataType): ExecuteResult = withScope(df) { - if (!ContextUtil.getNativeRealizations.isEmpty) { - if (!KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) { - df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") - } else if (KylinConfig.getInstanceFromEnv.queryGlutenDataFilterMinimalPosition > 0 - && shouldPushDownFilterFallback(df.queryExecution.optimizedPlan)) { - df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") - } + if (!ContextUtil.getNativeRealizations.isEmpty && !KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) { + df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") } - val queryTagInfo = QueryContext.current().getQueryTagInfo if (queryTagInfo.isAsyncQuery) { saveAsyncQueryResult(df, queryTagInfo.getFileFormat, queryTagInfo.getFileEncode, rowType) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala index 1f8614eba94..b12b99ea02d 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala @@ -119,8 +119,7 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { aggregateAttr += aggregateAttributeList(resIdx) resIdx += 1 resIdx - case other => - throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.") + case other => throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.") } } @@ -133,18 +132,12 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { Some("ke_bitmap_or_cardinality") case BinaryType => Some("ke_bitmap_or_data") - case _ => - throw new UnsupportedOperationException( - s"Aggregate function ${aggregateFunc.getClass} does not support the data type " + - s"${countDistinct.dataType}.") + case _ => throw new UnsupportedOperationException("Unsupported data type in count distinct") } case _ => extensionExpressionsMapping.get(aggregateFunc.getClass) } - if (substraitAggFuncName.isEmpty) { - throw new UnsupportedOperationException( - s"Aggregate function ${aggregateFunc.getClass} is not supported.") - } + assert(substraitAggFuncName.isDefined, s"Aggregate function ${aggregateFunc.getClass} is not supported.") (substraitAggFuncName, aggregateFunc.children.map(child => child.dataType)) } } diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala index 65c5dec8795..171d14aa9b1 100644 --- a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala +++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala @@ -82,6 +82,7 @@ object GlutenTestConfig extends Logging{ "5000000000") conf.set("spark.gluten.sql.columnar.maxBatchSize", "32768") conf.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") + conf.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") conf.set("spark.sql.decimalOperations.allowPrecisionLoss", "false") }