From 6682bee22320e9ed29cf539951b1064e83e6ebe4 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Wed, 5 Jun 2024 10:15:05 +0800 Subject: [PATCH] KE-43825 Upgrade gluten (240701) (#31819) 1. Disable gluten in analyze table 2. Gluten support delta 2.3.0 - https://github.com/apache/incubator-gluten/pull/5902 - https://github.com/apache/incubator-gluten/pull/5945 3. Support sum0 4. Fix build - https://github.com/apache/incubator-gluten/pull/5796 - https://github.com/apache/incubator-gluten/pull/5767 5. Fix SlowQueryDetectorTest.testSparderTimeoutCancelJob due to context dirty 6. Cleanup threadlocal contexts 7. Fix GMT+8 not support 8. Fix case class test coverage 9. Add 1 gluten disabled case in analyze table 10.Remove unsupported pushdown filter: 10.1. when we create KylinFileSourceScanExec, we didn't remove subquery filter. 10.2. KylinFileSourceScanExec doesn't inherit from FileSourceScanExec, we miss chance to correct push down filter. 11. native support floor_datetime and ceil_datetime 12. native support kap_add_months and kap_months_between 13. native support _ymdint_between 14. native support truncate 15. native support kylin_split_part 16. native support kylin instr --- pom.xml | 4 +- .../org/apache/kylin/common/QueryContext.java | 1 - .../TransactionProjectLockTest.java | 4 +- .../query/util/QueryInterruptChecker.java | 2 +- .../kylin/query/util/SlowQueryDetector.java | 11 +- .../spark/NLocalWithSparkSessionTest.java | 19 ++++ .../kylin/newten/SlowQueryDetectorTest.java | 6 + .../kylin/rest/response/SQLResponse.java | 3 - src/spark-project/engine-spark/pom.xml | 10 +- .../java/org/apache/kylin/GlutenDisabled.java | 0 .../java/org/apache/kylin/GlutenRunner.java | 0 .../stats/analyzer/TableAnalyzerTest.java | 5 + .../kylin/query/pushdown/SparkSqlClient.scala | 2 +- .../kylin/query/runtime/plan/ResultPlan.scala | 4 +- .../gluten/KapExpressionTransformer.scala | 106 ++++++++++++++---- .../gluten/TimestampAddTransformer.scala | 20 ++-- .../apache/spark/sql/udf/CeilFloorTest.scala | 53 ++++++++- src/spark-project/spark-common/pom.xml | 34 +++++- .../ConvertKylinFileSourceToGlutenRule.scala | 36 +++--- .../KylinFileSourceScanExecTransformer.scala | 1 + .../spark/sql/common/GlutenTestConfig.scala | 1 - .../spark/sql/common/GlutenTestUtil.scala | 69 ++++++++++++ 22 files changed, 312 insertions(+), 79 deletions(-) rename src/{kylin-it => spark-project/engine-spark}/src/test/java/org/apache/kylin/GlutenDisabled.java (100%) rename src/{kylin-it => spark-project/engine-spark}/src/test/java/org/apache/kylin/GlutenRunner.java (100%) create mode 100644 src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala diff --git a/pom.xml b/pom.xml index ae6964f1dd1..c04454dcc0a 100644 --- a/pom.xml +++ b/pom.xml @@ -129,9 +129,9 @@ 3.3.0-kylin-4.6.26.0 - 2.2.0 + 2.3.0 - 1.2.0-kylin-240518-SNAPSHOT + 1.2.0-kylin-240701-SNAPSHOT compile 0.5.0 0.3.0-incubating diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index bb8dd9d0853..73569985ba4 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -317,7 +317,6 @@ public void addDataFetchTime(long dataFetchTime) { @Setter private List scanBytes; - @Getter @Setter private Boolean glutenFallback; diff --git a/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java b/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java index ce86a924868..06b7316ed69 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java @@ -23,7 +23,7 @@ import org.assertj.core.util.Lists; import org.awaitility.Awaitility; import org.junit.Assert; -import org.junit.jupiter.api.Test; +import org.junit.Ignore; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -34,7 +34,7 @@ @MetadataInfo(onlyProps = true) class TransactionProjectLockTest { - @Test + @Ignore("Deprecated ProjectLock") void testStravition() { val threads = Lists. newArrayList(); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java index ab82d9303dc..7b638c73e4f 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java @@ -76,7 +76,7 @@ public static void checkQueryCanceledOrThreadInterrupted(String cause, String st "Manually stop the query %s. Caused: %s. Step: %s", entry.getQueryId(), cause, step)); } - if (entry.getPlannerCancelFlag().isCancelRequested() && Thread.currentThread().isInterrupted()) { + if (entry.getPlannerCancelFlag().isCancelRequested() && entry.isTimeoutStop) { QueryContext.current().getQueryTagInfo().setTimeout(true); throw new KylinTimeoutException(String.format(Locale.ROOT, "Run out of time of the query %s. Caused: %s. Step: %s", entry.getQueryId(), cause, step)); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java index 46c6ac4e33c..f82c66a66bc 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java @@ -79,7 +79,8 @@ public static void clearCanceledSlowQueriesStatus() { public void queryStart(String stopId) { runningQueries.put(currentThread(), new QueryEntry(System.currentTimeMillis(), currentThread(), QueryContext.current().getQueryId(), QueryContext.current().getUserSQL(), stopId, false, - QueryContext.current().getQueryTagInfo().isAsyncQuery(), null, CancelFlag.getContextCancelFlag())); + QueryContext.current().getQueryTagInfo().isAsyncQuery(), false, null, + CancelFlag.getContextCancelFlag())); } public void addJobIdForAsyncQueryJob(String jobId) { @@ -188,6 +189,7 @@ public class QueryEntry { final String stopId; boolean isStopByUser; final boolean isAsyncQuery; + boolean isTimeoutStop; String jobId; final CancelFlag plannerCancelFlag; @@ -195,12 +197,13 @@ public long getRunningTime() { return (System.currentTimeMillis() - startTime) / 1000; } - public boolean setInterruptIfTimeout() { + public synchronized boolean setInterruptIfTimeout() { if (isAsyncQuery) { return false; } long runningMs = System.currentTimeMillis() - startTime; if (runningMs >= queryTimeoutMs) { + isTimeoutStop = true; plannerCancelFlag.requestCancel(); thread.interrupt(); logger.error("Trying to cancel query: {}", thread.getName()); @@ -209,5 +212,9 @@ public boolean setInterruptIfTimeout() { return false; } + + public synchronized CancelFlag getPlannerCancelFlag() { + return plannerCancelFlag; + } } } diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java index 651c8a05467..84b043f9fda 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java @@ -34,12 +34,31 @@ import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.query.relnode.ContextUtil; +import org.apache.kylin.query.util.SlowQueryDetector; +import org.junit.After; +import org.junit.Before; import org.sparkproject.guava.collect.Sets; public class NLocalWithSparkSessionTest extends NLocalWithSparkSessionTestBase { protected IndexDataConstructor indexDataConstructor = new IndexDataConstructor(getProject()); + @Before + public void setUp() throws Exception { + super.setUp(); + indexDataConstructor = new IndexDataConstructor(getProject()); + SlowQueryDetector.getRunningQueries().clear(); + ContextUtil.clearThreadLocalContexts(); + } + + @After + public void tearDown() throws Exception { + this.cleanupTestMetadata(); + SlowQueryDetector.getRunningQueries().clear(); + ContextUtil.clearThreadLocalContexts(); + } + protected void fullBuild(String dfName) throws Exception { indexDataConstructor.buildDataflow(dfName); } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java index a0c8cfa7c46..bd6abb62bf1 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java @@ -34,6 +34,7 @@ import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.query.pushdown.SparkSqlClient; +import org.apache.kylin.query.relnode.ContextUtil; import org.apache.kylin.query.runtime.plan.ResultPlan; import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.query.util.QueryUtil; @@ -236,6 +237,7 @@ public void testSparderTimeoutCancelJob() throws Exception { slowQueryDetector.queryStart(""); try { SparderEnv.cleanCompute(); + ContextUtil.clearThreadLocalContexts(); long t = System.currentTimeMillis(); ResultPlan.getResult(mockDf, null); ExecAndComp.queryModel(getProject(), "select sum(price) from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME"); @@ -244,6 +246,10 @@ public void testSparderTimeoutCancelJob() throws Exception { logger.error(error); Assert.fail(error); } catch (Exception e) { + boolean timeout = QueryContext.current().getQueryTagInfo().isTimeout(); + if (!timeout) { + logger.error("Unexpected query exception", e); + } Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout()); Assert.assertTrue(e instanceof KylinTimeoutException); Assert.assertEquals( diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 4566916d35c..6d677066671 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -74,8 +74,6 @@ public class SQLResponse implements Serializable { private List scanRows; - private Boolean glutenFallback; - private List scanBytes; private String appMasterURL = ""; @@ -212,7 +210,6 @@ public SQLResponse wrapResultOfQueryContext(QueryContext queryContext) { this.setScanRows(queryContext.getMetrics().getScanRows()); this.setScanBytes(queryContext.getMetrics().getScanBytes()); this.setShufflePartitions(queryContext.getShufflePartitions()); - this.setGlutenFallback(queryContext.getMetrics().getGlutenFallback()); return this; } diff --git a/src/spark-project/engine-spark/pom.xml b/src/spark-project/engine-spark/pom.xml index 823c03983a1..25a9a462a0a 100644 --- a/src/spark-project/engine-spark/pom.xml +++ b/src/spark-project/engine-spark/pom.xml @@ -36,11 +36,6 @@ - - - io.delta - delta-core_2.12 - org.apache.kylin kylin-core-metadata @@ -167,6 +162,11 @@ volcano-client 5.12.2 + + + io.delta + delta-core_2.12 + com.github.hipjim scala-retry_2.12 diff --git a/src/kylin-it/src/test/java/org/apache/kylin/GlutenDisabled.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenDisabled.java similarity index 100% rename from src/kylin-it/src/test/java/org/apache/kylin/GlutenDisabled.java rename to src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenDisabled.java diff --git a/src/kylin-it/src/test/java/org/apache/kylin/GlutenRunner.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenRunner.java similarity index 100% rename from src/kylin-it/src/test/java/org/apache/kylin/GlutenRunner.java rename to src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenRunner.java diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java index 8ac12727b9f..b4b0da3588b 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.stream.Collectors; +import org.apache.kylin.GlutenDisabled; +import org.apache.kylin.GlutenRunner; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase; import org.apache.kylin.engine.spark.utils.SparkConfHelper; import org.apache.kylin.metadata.model.ColumnDesc; @@ -34,10 +36,12 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import lombok.val; import lombok.var; +@RunWith(GlutenRunner.class) public class TableAnalyzerTest extends NLocalWithSparkSessionTestBase { private NTableMetadataManager tableMgr; @@ -95,6 +99,7 @@ public void testSampleFullTable() { } @Test + @GlutenDisabled("max(col) with null data gluten returns NaN, but spark return null") public void testSampleTableForColumnOrRowAlwaysNull() { // case 1: this case test specified column always null, corresponding column is 'CATEG_BUSN_MGR' TableDesc testCategoryGroupings = tableMgr.getTableDesc("DEFAULT.TEST_CATEGORY_GROUPINGS"); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index e30aa7947b1..b27934a978b 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -19,7 +19,7 @@ package org.apache.kylin.query.pushdown import org.apache.commons.lang3.StringUtils -import org.apache.gluten.utils.FallbackUtil +import org.apache.gluten.test.FallbackUtil import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} import org.apache.kylin.guava30.shaded.common.collect.ImmutableList diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 6dfd0690ec5..31b139615fd 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import java.{lang, util} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils -import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector} +import org.apache.gluten.utils.QueryPlanSelector import org.apache.hadoop.fs.Path import org.apache.kylin.common.exception.code.ErrorCodeServer import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException} @@ -137,8 +137,6 @@ object ResultPlan extends LogEx { QueryContext.current.record("collect_result") val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan) - val glutenFallback = FallbackUtil.hasFallback(df.queryExecution.executedPlan) - QueryContext.current().getMetrics.setGlutenFallback(glutenFallback) val (jobCount, stageCount, taskCount) = QueryMetricUtils.collectTaskRelatedMetrics(jobGroup, sparkContext) QueryContext.current().getMetrics.setScanRows(scanRows) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala index b12b99ea02d..bda7b4fb198 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala @@ -16,52 +16,62 @@ */ package org.apache.spark.sql.catalyst.expressions.gluten -import com.google.common.collect.Lists import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.expression._ import org.apache.gluten.extension.ExpressionExtensionTrait -import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.spark.internal.Logging +import org.apache.spark.sql.KapFunctions.TRUNCATE import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.types.{BinaryType, DataType, LongType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.udaf._ import scala.collection.mutable.ListBuffer -case class KeBitmapFunctionTransformer( - substraitExprName: String, - child: ExpressionTransformer, - original: Expression) extends ExpressionTransformerWithOrigin with Logging { +class KeBitmapFunctionTransformer( + val substraitExprName: String, + val child: ExpressionTransformer, + val original: Expression) extends UnaryExpressionTransformer with Logging { +} - override def doTransform(args: java.lang.Object): ExpressionNode = { - val childNode = child.doTransform(args) - val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]] - val functionId = ExpressionBuilder.newScalarFunction( - functionMap, - ConverterUtils.makeFuncName( - substraitExprName, - original.children.map(_.dataType), - ConverterUtils.FunctionConfig.OPT)) +class FloorDateTimeTransformer( + val left: ExpressionTransformer, + val right: ExpressionTransformer, + val original: FloorDateTime +) extends ExpressionTransformer with Logging { + + override def substraitExprName: String = "date_trunc" - val expressionNodes = Lists.newArrayList(childNode) - val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable) - ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode) + override def children: Seq[ExpressionTransformer] = { + if (original.timeZoneId.isDefined) { + Seq(right, left, LiteralTransformer(original.timeZoneId.get)) + } else { + Seq(right, left) + } } + } case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { /** Generate the extension expressions list, format: Sig[XXXExpression]("XXXExpressionName") */ def expressionSigList: Seq[Sig] = Seq( - Sig[FloorDateTime]("floor_datetime"), + Sig[FloorDateTime]("date_trunc"), + Sig[CeilDateTime]("ceil_datetime"), + Sig[KapAddMonths]("kap_add_months"), + Sig[KapSubtractMonths]("kap_months_between"), + Sig[YMDintBetween]("kap_ymd_int_between"), + Sig[TRUNCATE]("truncate"), + Sig[KylinSplitPart]("kylin_split_part"), + Sig[KylinInstr]("kylin_instr"), Sig[PreciseCardinality]("ke_bitmap_cardinality"), Sig[PreciseCountDistinctDecode]("ke_bitmap_cardinality"), Sig[ReusePreciseCountDistinct]("ke_bitmap_or_data"), Sig[PreciseCountDistinctAndValue]("ke_bitmap_and_value"), Sig[PreciseCountDistinctAndArray]("ke_bitmap_and_ids"), Sig[PreciseCountDistinct]("ke_bitmap_or_cardinality"), - Sig[KylinTimestampAdd]("kylin_timestamp_add") + Sig[KylinTimestampAdd]("kylin_timestamp_add"), + Sig[Sum0]("sum0") ) /** Replace extension expression to transformer. */ @@ -70,21 +80,21 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { expr: Expression, attributeSeq: Seq[Attribute]): ExpressionTransformer = expr match { case preciseCardinality: PreciseCardinality => - KeBitmapFunctionTransformer( + new KeBitmapFunctionTransformer( substraitExprName, ExpressionConverter .replaceWithExpressionTransformer(preciseCardinality.child, attributeSeq), preciseCardinality ) case preciseCountDistinctDecode: PreciseCountDistinctDecode => - KeBitmapFunctionTransformer( + new KeBitmapFunctionTransformer( substraitExprName, ExpressionConverter .replaceWithExpressionTransformer(preciseCountDistinctDecode.child, attributeSeq), preciseCountDistinctDecode ) case kylinTimestampAdd: KylinTimestampAdd => - TimestampAddTransformer( + new TimestampAddTransformer( ExpressionConverter .replaceWithExpressionTransformer(kylinTimestampAdd.left, attributeSeq), ExpressionConverter @@ -93,6 +103,54 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait { .replaceWithExpressionTransformer(kylinTimestampAdd.right, attributeSeq), kylinTimestampAdd ) + case ceilDateTime: CeilDateTime => + val floorTime = FloorDateTime(ceilDateTime.left, ceilDateTime.right, ceilDateTime.timeZoneId) + val floorAddUnitTime = KylinTimestampAdd(ceilDateTime.right, Literal(1L), floorTime) + val equalsExp = If(EqualTo(ceilDateTime.left, floorTime), floorTime, floorAddUnitTime) + ExpressionConverter.replaceWithExpressionTransformer(equalsExp, attributeSeq) + case floorDateTime: FloorDateTime => + new FloorDateTimeTransformer( + ExpressionConverter.replaceWithExpressionTransformer(floorDateTime.left, attributeSeq), + ExpressionConverter.replaceWithExpressionTransformer(floorDateTime.right, attributeSeq), + floorDateTime + ) + case kylinSplitPart: KylinSplitPart if kylinSplitPart.second.isInstanceOf[Literal] => + new GenericExpressionTransformer( + substraitExprName, + Seq( + ExpressionConverter.replaceWithExpressionTransformer(kylinSplitPart.first, attributeSeq), + LiteralTransformer(kylinSplitPart.second.asInstanceOf[Literal]), + ExpressionConverter.replaceWithExpressionTransformer(kylinSplitPart.third, attributeSeq) + ), + kylinSplitPart + ) + case kylinInstr: KylinInstr => + val stringLocate = StringLocate(kylinInstr.second, kylinInstr.first, kylinInstr.third) + ExpressionConverter.replaceWithExpressionTransformer(stringLocate, attributeSeq) + case kapAddMonths: KapAddMonths => + val addMonths = KylinTimestampAdd(Literal("month"), kapAddMonths.right, kapAddMonths.left) + val equalsExp = If(EqualTo(kapAddMonths.left, LastDay(kapAddMonths.left)), LastDay(addMonths), addMonths) + ExpressionConverter.replaceWithExpressionTransformer(equalsExp, attributeSeq) + + case kapSubtractMonths: KapSubtractMonths => + GenericExpressionTransformer("kap_months_between", + Seq( + ExpressionConverter.replaceWithExpressionTransformer(kapSubtractMonths.right, attributeSeq), + ExpressionConverter.replaceWithExpressionTransformer(kapSubtractMonths.left, attributeSeq)), + kapSubtractMonths) + case kapYmdIntBetween: YMDintBetween => + GenericExpressionTransformer("kap_ymd_int_between", + Seq( + ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.left, attributeSeq), + ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.right, attributeSeq)), + kapYmdIntBetween) + case truncate: TRUNCATE => + GenericExpressionTransformer( + "truncate", + Seq( + ExpressionConverter.replaceWithExpressionTransformer(truncate.left, attributeSeq), + ExpressionConverter.replaceWithExpressionTransformer(truncate.right, attributeSeq)), + truncate) case _ => throw new UnsupportedOperationException( s"${expr.getClass} or $expr is not currently supported.") diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala index 22aec4f74e9..938830fcc12 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.gluten import org.apache.gluten.expression.ConverterUtils.FunctionConfig -import org.apache.gluten.expression.{ConverterUtils, ExpressionTransformer, ExpressionTransformerWithOrigin} +import org.apache.gluten.expression.{ConverterUtils, ExpressionTransformer} import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.kylin.guava30.shaded.common.collect.Lists import org.apache.spark.internal.Logging @@ -28,12 +28,12 @@ import org.apache.spark.sql.udf.TimestampAddImpl import java.time.ZoneId import java.util.Locale -case class TimestampAddTransformer( - left: ExpressionTransformer, - mid: ExpressionTransformer, - right: ExpressionTransformer, - original: KylinTimestampAdd - ) extends ExpressionTransformerWithOrigin with Logging { +class TimestampAddTransformer( + val left: ExpressionTransformer, + val mid: ExpressionTransformer, + val right: ExpressionTransformer, + val original: KylinTimestampAdd +) extends ExpressionTransformer with Logging { override def doTransform(args: Object): ExpressionNode = { val unitNode = left.doTransform(args) val increaseNode = mid.doTransform(args) @@ -75,4 +75,10 @@ case class TimestampAddTransformer( original.right.dataType } } + + override def substraitExprName: String = "timestamp_add" + + override def children: Seq[ExpressionTransformer] = { + Seq(left, mid, right) + } } diff --git a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala index b4a1ab06bd9..14b7de1f50b 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala @@ -18,12 +18,16 @@ package org.apache.spark.sql.udf -import org.apache.spark.sql.FunctionEntity import org.apache.spark.sql.catalyst.expressions.ExpressionUtils.expression import org.apache.spark.sql.catalyst.expressions.{CeilDateTime, FloorDateTime} -import org.apache.spark.sql.common.{SharedSparkSession, SparderBaseFunSuite} +import org.apache.spark.sql.common.{GlutenTestUtil, SharedSparkSession, SparderBaseFunSuite} +import org.apache.spark.sql.execution.ProjectExec +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{FunctionEntity, Row} import org.scalatest.BeforeAndAfterAll +import java.sql.{Date, Timestamp} + class CeilFloorTest extends SparderBaseFunSuite with SharedSparkSession with BeforeAndAfterAll { override def beforeAll(): Unit = { super.beforeAll() @@ -114,8 +118,49 @@ class CeilFloorTest extends SparderBaseFunSuite with SharedSparkSession with Bef query("select floor(-3.12)", "[-4]") } - def query(sql: String, expected: String): Unit = { - val result = spark.sql(sql).collect().map(row => row.toString()).mkString + test("test temp dataset") { + val schema = StructType(List( + StructField("row_id", LongType), + StructField("date_col", DateType), + StructField("timestamp_col", TimestampType) + )) + val rdd = sc.parallelize(Seq( + Row(1L, Date.valueOf("2024-01-01"), Timestamp.valueOf("2024-01-01 01:01:01.001")), + Row(2L, Date.valueOf("2024-02-29"), Timestamp.valueOf("2024-02-29 02:00:00.000")), + Row(3L, Date.valueOf("2024-03-31"), Timestamp.valueOf("2024-03-31 00:00:00.000")), + Row(4L, Date.valueOf("2024-04-01"), Timestamp.valueOf("2024-04-01 00:00:00.001")), + Row(5L, Date.valueOf("2024-04-30"), Timestamp.valueOf("2024-04-30 01:01:01.001")) + )) + spark.sqlContext.createDataFrame(rdd, schema).createTempView("temp_data") + query("select floor_datetime(date_col, 'year') from temp_data where row_id = 1", "[2024-01-01 00:00:00.0]", true) + query("select ceil_datetime(date_col, 'year') from temp_data where row_id = 1", "[2024-01-01 00:00:00.0]", true) + query("select ceil_datetime(timestamp_col, 'year') from temp_data where row_id = 1", "[2025-01-01 00:00:00.0]", true) + query("select floor_datetime(timestamp_col, 'quarter') from temp_data where row_id = 4", "[2024-04-01 00:00:00.0]", true) + query("select ceil_datetime(date_col, 'quarter') from temp_data where row_id = 4", "[2024-04-01 00:00:00.0]", true) + query("select ceil_datetime(timestamp_col, 'quarter') from temp_data where row_id = 4", "[2024-07-01 00:00:00.0]", true) + query("select floor_datetime(timestamp_col, 'month') from temp_data where row_id = 5", "[2024-04-01 00:00:00.0]", true) + query("select ceil_datetime(date_col, 'month') from temp_data where row_id = 5", "[2024-05-01 00:00:00.0]", true) + query("select ceil_datetime(timestamp_col, 'month') from temp_data where row_id = 5", "[2024-05-01 00:00:00.0]", true) + query("select floor_datetime(timestamp_col, 'week') from temp_data where row_id = 5", "[2024-04-29 00:00:00.0]", true) + query("select ceil_datetime(date_col, 'week') from temp_data where row_id = 5", "[2024-05-06 00:00:00.0]", true) + query("select ceil_datetime(timestamp_col, 'week') from temp_data where row_id = 5", "[2024-05-06 00:00:00.0]", true) + query("select floor_datetime(timestamp_col, 'day') from temp_data where row_id = 3", "[2024-03-31 00:00:00.0]", true) + query("select ceil_datetime(date_col, 'day') from temp_data where row_id = 3", "[2024-03-31 00:00:00.0]", true) + query("select ceil_datetime(timestamp_col, 'day') from temp_data where row_id = 3", "[2024-03-31 00:00:00.0]", true) + query("select floor_datetime(timestamp_col, 'hour') from temp_data where row_id = 5", "[2024-04-30 01:00:00.0]", true) + query("select ceil_datetime(timestamp_col, 'hour') from temp_data where row_id = 5", "[2024-04-30 02:00:00.0]", true) + query("select floor_datetime(timestamp_col, 'minute') from temp_data where row_id = 5", "[2024-04-30 01:01:00.0]", true) + query("select ceil_datetime(timestamp_col, 'minute') from temp_data where row_id = 5", "[2024-04-30 01:02:00.0]", true) + query("select floor_datetime(timestamp_col, 'second') from temp_data where row_id = 5", "[2024-04-30 01:01:01.0]", true) + query("select ceil_datetime(timestamp_col, 'second') from temp_data where row_id = 5", "[2024-04-30 01:01:02.0]", true) + } + + def query(sql: String, expected: String, fallBackCheck: Boolean = false): Unit = { + val df = spark.sql(sql) + if (fallBackCheck && GlutenTestUtil.glutenEnabled(spark)) { + assert(!GlutenTestUtil.hasFallbackOnStep(df.queryExecution.executedPlan, classOf[ProjectExec])) + } + val result = df.collect().map(row => row.toString()).mkString if (!result.equals(expected)) { print(sql) print(result) diff --git a/src/spark-project/spark-common/pom.xml b/src/spark-project/spark-common/pom.xml index 16a39cafe8c..c7ebb301be6 100644 --- a/src/spark-project/spark-common/pom.xml +++ b/src/spark-project/spark-common/pom.xml @@ -36,11 +36,6 @@ - - - io.delta - delta-core_2.12 - org.apache.kylin kylin-core-metadata @@ -95,6 +90,10 @@ it.unimi.dsi fastutil + + io.delta + delta-core_2.12 + com.github.hipjim @@ -146,6 +145,31 @@ + + org.apache.gluten + gluten-core + ${gluten.version} + test-jar + ${gluten.deps.scope} + + + org.apache.gluten + spark-sql-columnar-shims-spark32 + + + org.apache.gluten + spark-sql-columnar-shims-spark33 + + + org.apache.gluten + spark-sql-columnar-shims-spark34 + + + org.apache.gluten + spark-sql-columnar-shims-spark35 + + + org.apache.gluten gluten-ui diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala index 7b8bb5ff796..84e69f6fa1e 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala @@ -19,15 +19,27 @@ package org.apache.spark.sql.execution.gluten import org.apache.gluten.execution.FileSourceScanExecTransformer - +import org.apache.gluten.extension.GlutenPlan import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.utils.PushDownUtil import org.apache.spark.sql.execution.{KylinFileSourceScanExec, LayoutFileSourceScanExec, SparkPlan} -case class ConvertKylinFileSourceToGlutenRule(session: SparkSession) extends Rule[SparkPlan] { +class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends Rule[SparkPlan] { + + private def tryReturnGlutenPlan(glutenPlan: GlutenPlan, originPlan: SparkPlan): SparkPlan = { + if (glutenPlan.doValidate().isValid) { + logDebug(s"Columnar Processing for ${originPlan.getClass} is currently supported.") + glutenPlan + } else { + logDebug(s"Columnar Processing for ${originPlan.getClass} is currently unsupported.") + originPlan + } + } override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { case f: KylinFileSourceScanExec => + // convert to Gluten transformer val transformer = new KylinFileSourceScanExecTransformer( f.relation, @@ -37,19 +49,13 @@ case class ConvertKylinFileSourceToGlutenRule(session: SparkSession) extends Rul None, f.optionalShardSpec, f.optionalNumCoalescedBuckets, - f.dataFilters, + PushDownUtil.removeNotSupportPushDownFilters(f.conf, f.output, f.dataFilters), f.tableIdentifier, f.disableBucketedScan, f.sourceScanRows ) // Transformer validate - if (transformer.doValidate().isValid) { - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - transformer - } else { - logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - f - } + tryReturnGlutenPlan(transformer, f) case l: LayoutFileSourceScanExec => // convert to Gluten transformer @@ -60,17 +66,11 @@ case class ConvertKylinFileSourceToGlutenRule(session: SparkSession) extends Rul l.partitionFilters, l.optionalBucketSet, l.optionalNumCoalescedBuckets, - l.dataFilters, + PushDownUtil.removeNotSupportPushDownFilters(l.conf, l.output, l.dataFilters), l.tableIdentifier, l.disableBucketedScan ) // Transformer validate - if (transformer.doValidate().isValid) { - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - transformer - } else { - logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - l - } + tryReturnGlutenPlan(transformer, l) } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala index e0211e26607..6de7384312d 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala @@ -61,6 +61,7 @@ class KylinFileSourceScanExecTransformer(@transient relation: HadoopFsRelation, override def getPartitions: Seq[InputPartition] = BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq( relation, + requiredSchema, selectedPartitions, output, bucketedScan, diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala index 171d14aa9b1..e748876cc56 100644 --- a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala +++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala @@ -83,7 +83,6 @@ object GlutenTestConfig extends Logging{ conf.set("spark.gluten.sql.columnar.maxBatchSize", "32768") conf.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") conf.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") - conf.set("spark.sql.decimalOperations.allowPrecisionLoss", "false") } } diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala new file mode 100644 index 00000000000..6774c60f511 --- /dev/null +++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.common + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.extension.GlutenPlan +import org.apache.gluten.test.FallbackUtil +import org.apache.gluten.test.FallbackUtil.collectWithSubqueries +import org.apache.gluten.utils.QueryPlanSelector +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec + +object GlutenTestUtil { + + def hasFallback(plan: SparkPlan): Boolean = FallbackUtil.hasFallback(plan) + + def hasFallbackOnStep[T <: SparkPlan](plan: SparkPlan, clazz: Class[T]): Boolean = { + var fallbackOperator: Seq[SparkPlan] = null + if (plan.isInstanceOf[AdaptiveSparkPlanExec]) { + fallbackOperator = collectWithSubqueries(plan) { + case plan if !plan.isInstanceOf[GlutenPlan] && !FallbackUtil.skip(plan) => + plan + } + } else { + fallbackOperator = plan.collectWithSubqueries { + case plan if !plan.isInstanceOf[GlutenPlan] && !FallbackUtil.skip(plan) => + plan + } + } + if (fallbackOperator.nonEmpty && fallbackOperator.map(_.getClass).contains(clazz)) { + true + } else { + false + } + } + + def glutenEnabled(spark: SparkSession): Boolean = { + spark.conf.get("spark.plugins", "").contains("GlutenPlugin") && + spark.conf.get( + GlutenConfig.GLUTEN_ENABLE_KEY, + GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString).toBoolean && glutenEnabledForCurrentThread(spark) + } + + private def glutenEnabledForCurrentThread(spark: SparkSession): Boolean = { + val enabled = + spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY) + if (enabled != null) { + enabled.toBoolean + } else { + true + } + } +}