Skip to content

Commit

Permalink
Fix ResultPlan test coverage
Browse files Browse the repository at this point in the history
1. Remove datafilter position due to page index not support
2. Add gluten ke config test
3. Enable native page index by default
4. Fix KapExpressionTransformer test coverage
  • Loading branch information
lwz9103 authored and loneylee committed Jul 31, 2024
1 parent 05ba863 commit b8fb686
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4325,10 +4325,6 @@ public boolean queryIndexUseGluten() {
return Boolean.parseBoolean(this.getOptional("kylin.query.index-use-gulten", TRUE));
}

public int queryGlutenDataFilterMinimalPosition() {
return Math.max(0, Integer.parseInt(this.getOptional("kylin.query.gluten-datafilter-minimal-position", "0")));
}

public boolean isPushdownSqlHintsErasingEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.pushdown.sql-hints-erasing.enabled", TRUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.common.GlutenTestConfig;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.junit.After;
Expand Down Expand Up @@ -61,6 +62,7 @@ public static void initSpark() {
sparkConf.set("spark.sql.crossJoin.enabled", "true");
sparkConf.set("spark.sql.adaptive.enabled", "false");
sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1");
GlutenTestConfig.configGluten(sparkConf);
ss = SparkSession.builder().config(sparkConf).getOrCreate();
SparderEnv.setSparkSession(ss);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.{lang, util}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
import org.apache.commons.io.IOUtils
import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector}
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.exception.code.ErrorCodeServer
import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException}
Expand All @@ -36,15 +37,16 @@ import org.apache.kylin.metadata.state.QueryShareStateManager
import org.apache.kylin.query.engine.RelColumnMetaDataExtractor
import org.apache.kylin.query.engine.exec.ExecuteResult
import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow
import org.apache.kylin.query.relnode.ContextUtil
import org.apache.kylin.query.util.{AsyncQueryUtil, QueryInterruptChecker, SparkJobTrace, SparkQueryJobManager}
import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer
import org.apache.spark.sql.hive.QueryMetricUtils
import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv}
import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer
import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector}

import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`iterator asScala`
Expand Down Expand Up @@ -279,42 +281,10 @@ object ResultPlan extends LogEx {
}
}

def shouldPushDownFilterFallback(plan: LogicalPlan): Boolean = {
checkDataFilterInternal(plan) || plan.children.exists(shouldPushDownFilterFallback)
}

def checkDataFilterInternal(p: LogicalPlan): Boolean = p match {
case PhysicalOperation(projects, filters,
l@LogicalRelation(fsRelation@HadoopFsRelation(_: FilePruner, _, _, _, _, _), _, table, _)) =>

val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}

val partitionColumns = l.resolve(
fsRelation.partitionSchema, fsRelation.sqlContext.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
val names = fsRelation.dataSchema.map(schema => schema.name)
.slice(0, KylinConfig.getInstanceFromEnv.queryGlutenDataFilterMinimalPosition).seq
// data filter should not contains minimal schema
dataFilters.exists(f => f.references.exists(r => names.contains(r.name)))
case _ => false
}

def getResult(df: DataFrame, rowType: RelDataType): ExecuteResult = withScope(df) {
if (!ContextUtil.getNativeRealizations.isEmpty) {
if (!KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) {
df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
} else if (KylinConfig.getInstanceFromEnv.queryGlutenDataFilterMinimalPosition > 0
&& shouldPushDownFilterFallback(df.queryExecution.optimizedPlan)) {
df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
}
if (!ContextUtil.getNativeRealizations.isEmpty && !KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) {
df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
}

val queryTagInfo = QueryContext.current().getQueryTagInfo
if (queryTagInfo.isAsyncQuery) {
saveAsyncQueryResult(df, queryTagInfo.getFileFormat, queryTagInfo.getFileEncode, rowType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait {
aggregateAttr += aggregateAttributeList(resIdx)
resIdx += 1
resIdx
case other =>
throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.")
case other => throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.")
}
}

Expand All @@ -133,18 +132,12 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait {
Some("ke_bitmap_or_cardinality")
case BinaryType =>
Some("ke_bitmap_or_data")
case _ =>
throw new UnsupportedOperationException(
s"Aggregate function ${aggregateFunc.getClass} does not support the data type " +
s"${countDistinct.dataType}.")
case _ => throw new UnsupportedOperationException("Unsupported data type in count distinct")
}
case _ =>
extensionExpressionsMapping.get(aggregateFunc.getClass)
}
if (substraitAggFuncName.isEmpty) {
throw new UnsupportedOperationException(
s"Aggregate function ${aggregateFunc.getClass} is not supported.")
}
assert(substraitAggFuncName.isDefined, s"Aggregate function ${aggregateFunc.getClass} is not supported.")
(substraitAggFuncName, aggregateFunc.children.map(child => child.dataType))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object GlutenTestConfig extends Logging{
"5000000000")
conf.set("spark.gluten.sql.columnar.maxBatchSize", "32768")
conf.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
conf.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
conf.set("spark.sql.decimalOperations.allowPrecisionLoss", "false")
}

Expand Down

0 comments on commit b8fb686

Please sign in to comment.