diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 62da8a72e7e..25b68e86d98 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -4325,10 +4325,6 @@ public boolean queryIndexUseGluten() { return Boolean.parseBoolean(this.getOptional("kylin.query.index-use-gulten", TRUE)); } - public int queryGlutenDataFilterMinimalPosition() { - return Math.max(0, Integer.parseInt(this.getOptional("kylin.query.gluten-datafilter-minimal-position", "0"))); - } - public boolean isPushdownSqlHintsErasingEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.pushdown.sql-hints-erasing.enabled", TRUE)); } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java index 0ba939b251d..ed383cf907f 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExtractLimitInfoTest.java @@ -31,6 +31,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.common.GlutenTestConfig; import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.StaticSQLConf; import org.junit.After; @@ -61,6 +62,7 @@ public static void initSpark() { sparkConf.set("spark.sql.crossJoin.enabled", "true"); sparkConf.set("spark.sql.adaptive.enabled", "false"); sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1"); + GlutenTestConfig.configGluten(sparkConf); ss = SparkSession.builder().config(sparkConf).getOrCreate(); SparderEnv.setSparkSession(ss); } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index b4dc3e981c9..6dfd0690ec5 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import java.{lang, util} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils +import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector} import org.apache.hadoop.fs.Path import org.apache.kylin.common.exception.code.ErrorCodeServer import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException} @@ -36,15 +37,16 @@ import org.apache.kylin.metadata.state.QueryShareStateManager import org.apache.kylin.query.engine.RelColumnMetaDataExtractor import org.apache.kylin.query.engine.exec.ExecuteResult import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow +import org.apache.kylin.query.relnode.ContextUtil import org.apache.kylin.query.util.{AsyncQueryUtil, QueryInterruptChecker, SparkJobTrace, SparkQueryJobManager} import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook} import org.apache.spark.SparkConf import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer import org.apache.spark.sql.hive.QueryMetricUtils import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv} import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer -import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector} import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`iterator asScala` @@ -279,42 +281,10 @@ object ResultPlan extends LogEx { } } - def shouldPushDownFilterFallback(plan: LogicalPlan): Boolean = { - checkDataFilterInternal(plan) || plan.children.exists(shouldPushDownFilterFallback) - } - - def checkDataFilterInternal(p: LogicalPlan): Boolean = p match { - case PhysicalOperation(projects, filters, - l@LogicalRelation(fsRelation@HadoopFsRelation(_: FilePruner, _, _, _, _, _), _, table, _)) => - - val normalizedFilters = filters.map { e => - e transform { - case a: AttributeReference => - a.withName(l.output.find(_.semanticEquals(a)).get.name) - } - } - - val partitionColumns = l.resolve( - fsRelation.partitionSchema, fsRelation.sqlContext.sparkSession.sessionState.analyzer.resolver) - val partitionSet = AttributeSet(partitionColumns) - val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) - val names = fsRelation.dataSchema.map(schema => schema.name) - .slice(0, KylinConfig.getInstanceFromEnv.queryGlutenDataFilterMinimalPosition).seq - // data filter should not contains minimal schema - dataFilters.exists(f => f.references.exists(r => names.contains(r.name))) - case _ => false - } - def getResult(df: DataFrame, rowType: RelDataType): ExecuteResult = withScope(df) { - if (!ContextUtil.getNativeRealizations.isEmpty) { - if (!KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) { - df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") - } else if (KylinConfig.getInstanceFromEnv.queryGlutenDataFilterMinimalPosition > 0 - && shouldPushDownFilterFallback(df.queryExecution.optimizedPlan)) { - df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") - } + if (!ContextUtil.getNativeRealizations.isEmpty && !KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) { + df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false") } - val queryTagInfo = QueryContext.current().getQueryTagInfo if (queryTagInfo.isAsyncQuery) { saveAsyncQueryResult(df, queryTagInfo.getFileFormat, queryTagInfo.getFileEncode, rowType) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala index 1f8614eba94..b12b99ea02d 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala @@ -119,8 +119,7 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { aggregateAttr += aggregateAttributeList(resIdx) resIdx += 1 resIdx - case other => - throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.") + case other => throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.") } } @@ -133,18 +132,12 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { Some("ke_bitmap_or_cardinality") case BinaryType => Some("ke_bitmap_or_data") - case _ => - throw new UnsupportedOperationException( - s"Aggregate function ${aggregateFunc.getClass} does not support the data type " + - s"${countDistinct.dataType}.") + case _ => throw new UnsupportedOperationException("Unsupported data type in count distinct") } case _ => extensionExpressionsMapping.get(aggregateFunc.getClass) } - if (substraitAggFuncName.isEmpty) { - throw new UnsupportedOperationException( - s"Aggregate function ${aggregateFunc.getClass} is not supported.") - } + assert(substraitAggFuncName.isDefined, s"Aggregate function ${aggregateFunc.getClass} is not supported.") (substraitAggFuncName, aggregateFunc.children.map(child => child.dataType)) } } diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala index 65c5dec8795..171d14aa9b1 100644 --- a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala +++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala @@ -82,6 +82,7 @@ object GlutenTestConfig extends Logging{ "5000000000") conf.set("spark.gluten.sql.columnar.maxBatchSize", "32768") conf.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") + conf.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") conf.set("spark.sql.decimalOperations.allowPrecisionLoss", "false") }