diff --git a/pom.xml b/pom.xml
index ae6964f1dd1..c04454dcc0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,9 +129,9 @@
3.3.0-kylin-4.6.26.0
- 2.2.0
+ 2.3.0
- 1.2.0-kylin-240518-SNAPSHOT
+ 1.2.0-kylin-240701-SNAPSHOT
compile
0.5.0
0.3.0-incubating
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index bb8dd9d0853..73569985ba4 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -317,7 +317,6 @@ public void addDataFetchTime(long dataFetchTime) {
@Setter
private List scanBytes;
-
@Getter
@Setter
private Boolean glutenFallback;
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java b/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java
index ce86a924868..06b7316ed69 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/persistence/transaction/TransactionProjectLockTest.java
@@ -23,7 +23,7 @@
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.Assert;
-import org.junit.jupiter.api.Test;
+import org.junit.Ignore;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -34,7 +34,7 @@
@MetadataInfo(onlyProps = true)
class TransactionProjectLockTest {
- @Test
+ @Ignore("Deprecated ProjectLock")
void testStravition() {
val threads = Lists. newArrayList();
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
index ab82d9303dc..7b638c73e4f 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
@@ -76,7 +76,7 @@ public static void checkQueryCanceledOrThreadInterrupted(String cause, String st
"Manually stop the query %s. Caused: %s. Step: %s", entry.getQueryId(), cause, step));
}
- if (entry.getPlannerCancelFlag().isCancelRequested() && Thread.currentThread().isInterrupted()) {
+ if (entry.getPlannerCancelFlag().isCancelRequested() && entry.isTimeoutStop) {
QueryContext.current().getQueryTagInfo().setTimeout(true);
throw new KylinTimeoutException(String.format(Locale.ROOT,
"Run out of time of the query %s. Caused: %s. Step: %s", entry.getQueryId(), cause, step));
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
index 46c6ac4e33c..f82c66a66bc 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
@@ -79,7 +79,8 @@ public static void clearCanceledSlowQueriesStatus() {
public void queryStart(String stopId) {
runningQueries.put(currentThread(), new QueryEntry(System.currentTimeMillis(), currentThread(),
QueryContext.current().getQueryId(), QueryContext.current().getUserSQL(), stopId, false,
- QueryContext.current().getQueryTagInfo().isAsyncQuery(), null, CancelFlag.getContextCancelFlag()));
+ QueryContext.current().getQueryTagInfo().isAsyncQuery(), false, null,
+ CancelFlag.getContextCancelFlag()));
}
public void addJobIdForAsyncQueryJob(String jobId) {
@@ -188,6 +189,7 @@ public class QueryEntry {
final String stopId;
boolean isStopByUser;
final boolean isAsyncQuery;
+ boolean isTimeoutStop;
String jobId;
final CancelFlag plannerCancelFlag;
@@ -195,12 +197,13 @@ public long getRunningTime() {
return (System.currentTimeMillis() - startTime) / 1000;
}
- public boolean setInterruptIfTimeout() {
+ public synchronized boolean setInterruptIfTimeout() {
if (isAsyncQuery) {
return false;
}
long runningMs = System.currentTimeMillis() - startTime;
if (runningMs >= queryTimeoutMs) {
+ isTimeoutStop = true;
plannerCancelFlag.requestCancel();
thread.interrupt();
logger.error("Trying to cancel query: {}", thread.getName());
@@ -209,5 +212,9 @@ public boolean setInterruptIfTimeout() {
return false;
}
+
+ public synchronized CancelFlag getPlannerCancelFlag() {
+ return plannerCancelFlag;
+ }
}
}
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
index 651c8a05467..84b043f9fda 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
@@ -34,12 +34,31 @@
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.query.relnode.ContextUtil;
+import org.apache.kylin.query.util.SlowQueryDetector;
+import org.junit.After;
+import org.junit.Before;
import org.sparkproject.guava.collect.Sets;
public class NLocalWithSparkSessionTest extends NLocalWithSparkSessionTestBase {
protected IndexDataConstructor indexDataConstructor = new IndexDataConstructor(getProject());
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ indexDataConstructor = new IndexDataConstructor(getProject());
+ SlowQueryDetector.getRunningQueries().clear();
+ ContextUtil.clearThreadLocalContexts();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.cleanupTestMetadata();
+ SlowQueryDetector.getRunningQueries().clear();
+ ContextUtil.clearThreadLocalContexts();
+ }
+
protected void fullBuild(String dfName) throws Exception {
indexDataConstructor.buildDataflow(dfName);
}
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
index a0c8cfa7c46..bd6abb62bf1 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
@@ -34,6 +34,7 @@
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.pushdown.SparkSqlClient;
+import org.apache.kylin.query.relnode.ContextUtil;
import org.apache.kylin.query.runtime.plan.ResultPlan;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.query.util.QueryUtil;
@@ -236,6 +237,7 @@ public void testSparderTimeoutCancelJob() throws Exception {
slowQueryDetector.queryStart("");
try {
SparderEnv.cleanCompute();
+ ContextUtil.clearThreadLocalContexts();
long t = System.currentTimeMillis();
ResultPlan.getResult(mockDf, null);
ExecAndComp.queryModel(getProject(), "select sum(price) from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME");
@@ -244,6 +246,10 @@ public void testSparderTimeoutCancelJob() throws Exception {
logger.error(error);
Assert.fail(error);
} catch (Exception e) {
+ boolean timeout = QueryContext.current().getQueryTagInfo().isTimeout();
+ if (!timeout) {
+ logger.error("Unexpected query exception", e);
+ }
Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
Assert.assertTrue(e instanceof KylinTimeoutException);
Assert.assertEquals(
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 4566916d35c..6d677066671 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -74,8 +74,6 @@ public class SQLResponse implements Serializable {
private List scanRows;
- private Boolean glutenFallback;
-
private List scanBytes;
private String appMasterURL = "";
@@ -212,7 +210,6 @@ public SQLResponse wrapResultOfQueryContext(QueryContext queryContext) {
this.setScanRows(queryContext.getMetrics().getScanRows());
this.setScanBytes(queryContext.getMetrics().getScanBytes());
this.setShufflePartitions(queryContext.getShufflePartitions());
- this.setGlutenFallback(queryContext.getMetrics().getGlutenFallback());
return this;
}
diff --git a/src/spark-project/engine-spark/pom.xml b/src/spark-project/engine-spark/pom.xml
index 823c03983a1..25a9a462a0a 100644
--- a/src/spark-project/engine-spark/pom.xml
+++ b/src/spark-project/engine-spark/pom.xml
@@ -36,11 +36,6 @@
-
-
- io.delta
- delta-core_2.12
-
org.apache.kylin
kylin-core-metadata
@@ -167,6 +162,11 @@
volcano-client
5.12.2
+
+
+ io.delta
+ delta-core_2.12
+
com.github.hipjim
scala-retry_2.12
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/GlutenDisabled.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenDisabled.java
similarity index 100%
rename from src/kylin-it/src/test/java/org/apache/kylin/GlutenDisabled.java
rename to src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenDisabled.java
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/GlutenRunner.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenRunner.java
similarity index 100%
rename from src/kylin-it/src/test/java/org/apache/kylin/GlutenRunner.java
rename to src/spark-project/engine-spark/src/test/java/org/apache/kylin/GlutenRunner.java
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
index 8ac12727b9f..b4b0da3588b 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
@@ -22,6 +22,8 @@
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.kylin.GlutenDisabled;
+import org.apache.kylin.GlutenRunner;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -34,10 +36,12 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import lombok.val;
import lombok.var;
+@RunWith(GlutenRunner.class)
public class TableAnalyzerTest extends NLocalWithSparkSessionTestBase {
private NTableMetadataManager tableMgr;
@@ -95,6 +99,7 @@ public void testSampleFullTable() {
}
@Test
+ @GlutenDisabled("max(col) with null data gluten returns NaN, but spark return null")
public void testSampleTableForColumnOrRowAlwaysNull() {
// case 1: this case test specified column always null, corresponding column is 'CATEG_BUSN_MGR'
TableDesc testCategoryGroupings = tableMgr.getTableDesc("DEFAULT.TEST_CATEGORY_GROUPINGS");
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index e30aa7947b1..b27934a978b 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -19,7 +19,7 @@
package org.apache.kylin.query.pushdown
import org.apache.commons.lang3.StringUtils
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
import org.apache.kylin.guava30.shaded.common.collect.ImmutableList
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 6dfd0690ec5..31b139615fd 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.{lang, util}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
import org.apache.commons.io.IOUtils
-import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector}
+import org.apache.gluten.utils.QueryPlanSelector
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.exception.code.ErrorCodeServer
import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException}
@@ -137,8 +137,6 @@ object ResultPlan extends LogEx {
QueryContext.current.record("collect_result")
val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
- val glutenFallback = FallbackUtil.hasFallback(df.queryExecution.executedPlan)
- QueryContext.current().getMetrics.setGlutenFallback(glutenFallback)
val (jobCount, stageCount, taskCount) = QueryMetricUtils.collectTaskRelatedMetrics(jobGroup, sparkContext)
QueryContext.current().getMetrics.setScanRows(scanRows)
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
index b12b99ea02d..bda7b4fb198 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
@@ -16,52 +16,62 @@
*/
package org.apache.spark.sql.catalyst.expressions.gluten
-import com.google.common.collect.Lists
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression._
import org.apache.gluten.extension.ExpressionExtensionTrait
-import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.KapFunctions.TRUNCATE
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.types.{BinaryType, DataType, LongType}
+import org.apache.spark.sql.types._
import org.apache.spark.sql.udaf._
import scala.collection.mutable.ListBuffer
-case class KeBitmapFunctionTransformer(
- substraitExprName: String,
- child: ExpressionTransformer,
- original: Expression) extends ExpressionTransformerWithOrigin with Logging {
+class KeBitmapFunctionTransformer(
+ val substraitExprName: String,
+ val child: ExpressionTransformer,
+ val original: Expression) extends UnaryExpressionTransformer with Logging {
+}
- override def doTransform(args: java.lang.Object): ExpressionNode = {
- val childNode = child.doTransform(args)
- val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
- val functionId = ExpressionBuilder.newScalarFunction(
- functionMap,
- ConverterUtils.makeFuncName(
- substraitExprName,
- original.children.map(_.dataType),
- ConverterUtils.FunctionConfig.OPT))
+class FloorDateTimeTransformer(
+ val left: ExpressionTransformer,
+ val right: ExpressionTransformer,
+ val original: FloorDateTime
+) extends ExpressionTransformer with Logging {
+
+ override def substraitExprName: String = "date_trunc"
- val expressionNodes = Lists.newArrayList(childNode)
- val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
- ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode)
+ override def children: Seq[ExpressionTransformer] = {
+ if (original.timeZoneId.isDefined) {
+ Seq(right, left, LiteralTransformer(original.timeZoneId.get))
+ } else {
+ Seq(right, left)
+ }
}
+
}
case class CustomerExpressionTransformer() extends ExpressionExtensionTrait {
/** Generate the extension expressions list, format: Sig[XXXExpression]("XXXExpressionName") */
def expressionSigList: Seq[Sig] = Seq(
- Sig[FloorDateTime]("floor_datetime"),
+ Sig[FloorDateTime]("date_trunc"),
+ Sig[CeilDateTime]("ceil_datetime"),
+ Sig[KapAddMonths]("kap_add_months"),
+ Sig[KapSubtractMonths]("kap_months_between"),
+ Sig[YMDintBetween]("kap_ymd_int_between"),
+ Sig[TRUNCATE]("truncate"),
+ Sig[KylinSplitPart]("kylin_split_part"),
+ Sig[KylinInstr]("kylin_instr"),
Sig[PreciseCardinality]("ke_bitmap_cardinality"),
Sig[PreciseCountDistinctDecode]("ke_bitmap_cardinality"),
Sig[ReusePreciseCountDistinct]("ke_bitmap_or_data"),
Sig[PreciseCountDistinctAndValue]("ke_bitmap_and_value"),
Sig[PreciseCountDistinctAndArray]("ke_bitmap_and_ids"),
Sig[PreciseCountDistinct]("ke_bitmap_or_cardinality"),
- Sig[KylinTimestampAdd]("kylin_timestamp_add")
+ Sig[KylinTimestampAdd]("kylin_timestamp_add"),
+ Sig[Sum0]("sum0")
)
/** Replace extension expression to transformer. */
@@ -70,21 +80,21 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait {
expr: Expression,
attributeSeq: Seq[Attribute]): ExpressionTransformer = expr match {
case preciseCardinality: PreciseCardinality =>
- KeBitmapFunctionTransformer(
+ new KeBitmapFunctionTransformer(
substraitExprName,
ExpressionConverter
.replaceWithExpressionTransformer(preciseCardinality.child, attributeSeq),
preciseCardinality
)
case preciseCountDistinctDecode: PreciseCountDistinctDecode =>
- KeBitmapFunctionTransformer(
+ new KeBitmapFunctionTransformer(
substraitExprName,
ExpressionConverter
.replaceWithExpressionTransformer(preciseCountDistinctDecode.child, attributeSeq),
preciseCountDistinctDecode
)
case kylinTimestampAdd: KylinTimestampAdd =>
- TimestampAddTransformer(
+ new TimestampAddTransformer(
ExpressionConverter
.replaceWithExpressionTransformer(kylinTimestampAdd.left, attributeSeq),
ExpressionConverter
@@ -93,6 +103,54 @@ case class CustomerExpressionTransformer() extends ExpressionExtensionTrait {
.replaceWithExpressionTransformer(kylinTimestampAdd.right, attributeSeq),
kylinTimestampAdd
)
+ case ceilDateTime: CeilDateTime =>
+ val floorTime = FloorDateTime(ceilDateTime.left, ceilDateTime.right, ceilDateTime.timeZoneId)
+ val floorAddUnitTime = KylinTimestampAdd(ceilDateTime.right, Literal(1L), floorTime)
+ val equalsExp = If(EqualTo(ceilDateTime.left, floorTime), floorTime, floorAddUnitTime)
+ ExpressionConverter.replaceWithExpressionTransformer(equalsExp, attributeSeq)
+ case floorDateTime: FloorDateTime =>
+ new FloorDateTimeTransformer(
+ ExpressionConverter.replaceWithExpressionTransformer(floorDateTime.left, attributeSeq),
+ ExpressionConverter.replaceWithExpressionTransformer(floorDateTime.right, attributeSeq),
+ floorDateTime
+ )
+ case kylinSplitPart: KylinSplitPart if kylinSplitPart.second.isInstanceOf[Literal] =>
+ new GenericExpressionTransformer(
+ substraitExprName,
+ Seq(
+ ExpressionConverter.replaceWithExpressionTransformer(kylinSplitPart.first, attributeSeq),
+ LiteralTransformer(kylinSplitPart.second.asInstanceOf[Literal]),
+ ExpressionConverter.replaceWithExpressionTransformer(kylinSplitPart.third, attributeSeq)
+ ),
+ kylinSplitPart
+ )
+ case kylinInstr: KylinInstr =>
+ val stringLocate = StringLocate(kylinInstr.second, kylinInstr.first, kylinInstr.third)
+ ExpressionConverter.replaceWithExpressionTransformer(stringLocate, attributeSeq)
+ case kapAddMonths: KapAddMonths =>
+ val addMonths = KylinTimestampAdd(Literal("month"), kapAddMonths.right, kapAddMonths.left)
+ val equalsExp = If(EqualTo(kapAddMonths.left, LastDay(kapAddMonths.left)), LastDay(addMonths), addMonths)
+ ExpressionConverter.replaceWithExpressionTransformer(equalsExp, attributeSeq)
+
+ case kapSubtractMonths: KapSubtractMonths =>
+ GenericExpressionTransformer("kap_months_between",
+ Seq(
+ ExpressionConverter.replaceWithExpressionTransformer(kapSubtractMonths.right, attributeSeq),
+ ExpressionConverter.replaceWithExpressionTransformer(kapSubtractMonths.left, attributeSeq)),
+ kapSubtractMonths)
+ case kapYmdIntBetween: YMDintBetween =>
+ GenericExpressionTransformer("kap_ymd_int_between",
+ Seq(
+ ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.left, attributeSeq),
+ ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.right, attributeSeq)),
+ kapYmdIntBetween)
+ case truncate: TRUNCATE =>
+ GenericExpressionTransformer(
+ "truncate",
+ Seq(
+ ExpressionConverter.replaceWithExpressionTransformer(truncate.left, attributeSeq),
+ ExpressionConverter.replaceWithExpressionTransformer(truncate.right, attributeSeq)),
+ truncate)
case _ =>
throw new UnsupportedOperationException(
s"${expr.getClass} or $expr is not currently supported.")
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala
index 22aec4f74e9..938830fcc12 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/TimestampAddTransformer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions.gluten
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
-import org.apache.gluten.expression.{ConverterUtils, ExpressionTransformer, ExpressionTransformerWithOrigin}
+import org.apache.gluten.expression.{ConverterUtils, ExpressionTransformer}
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode}
import org.apache.kylin.guava30.shaded.common.collect.Lists
import org.apache.spark.internal.Logging
@@ -28,12 +28,12 @@ import org.apache.spark.sql.udf.TimestampAddImpl
import java.time.ZoneId
import java.util.Locale
-case class TimestampAddTransformer(
- left: ExpressionTransformer,
- mid: ExpressionTransformer,
- right: ExpressionTransformer,
- original: KylinTimestampAdd
- ) extends ExpressionTransformerWithOrigin with Logging {
+class TimestampAddTransformer(
+ val left: ExpressionTransformer,
+ val mid: ExpressionTransformer,
+ val right: ExpressionTransformer,
+ val original: KylinTimestampAdd
+) extends ExpressionTransformer with Logging {
override def doTransform(args: Object): ExpressionNode = {
val unitNode = left.doTransform(args)
val increaseNode = mid.doTransform(args)
@@ -75,4 +75,10 @@ case class TimestampAddTransformer(
original.right.dataType
}
}
+
+ override def substraitExprName: String = "timestamp_add"
+
+ override def children: Seq[ExpressionTransformer] = {
+ Seq(left, mid, right)
+ }
}
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala
index b4a1ab06bd9..14b7de1f50b 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/udf/CeilFloorTest.scala
@@ -18,12 +18,16 @@
package org.apache.spark.sql.udf
-import org.apache.spark.sql.FunctionEntity
import org.apache.spark.sql.catalyst.expressions.ExpressionUtils.expression
import org.apache.spark.sql.catalyst.expressions.{CeilDateTime, FloorDateTime}
-import org.apache.spark.sql.common.{SharedSparkSession, SparderBaseFunSuite}
+import org.apache.spark.sql.common.{GlutenTestUtil, SharedSparkSession, SparderBaseFunSuite}
+import org.apache.spark.sql.execution.ProjectExec
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{FunctionEntity, Row}
import org.scalatest.BeforeAndAfterAll
+import java.sql.{Date, Timestamp}
+
class CeilFloorTest extends SparderBaseFunSuite with SharedSparkSession with BeforeAndAfterAll {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -114,8 +118,49 @@ class CeilFloorTest extends SparderBaseFunSuite with SharedSparkSession with Bef
query("select floor(-3.12)", "[-4]")
}
- def query(sql: String, expected: String): Unit = {
- val result = spark.sql(sql).collect().map(row => row.toString()).mkString
+ test("test temp dataset") {
+ val schema = StructType(List(
+ StructField("row_id", LongType),
+ StructField("date_col", DateType),
+ StructField("timestamp_col", TimestampType)
+ ))
+ val rdd = sc.parallelize(Seq(
+ Row(1L, Date.valueOf("2024-01-01"), Timestamp.valueOf("2024-01-01 01:01:01.001")),
+ Row(2L, Date.valueOf("2024-02-29"), Timestamp.valueOf("2024-02-29 02:00:00.000")),
+ Row(3L, Date.valueOf("2024-03-31"), Timestamp.valueOf("2024-03-31 00:00:00.000")),
+ Row(4L, Date.valueOf("2024-04-01"), Timestamp.valueOf("2024-04-01 00:00:00.001")),
+ Row(5L, Date.valueOf("2024-04-30"), Timestamp.valueOf("2024-04-30 01:01:01.001"))
+ ))
+ spark.sqlContext.createDataFrame(rdd, schema).createTempView("temp_data")
+ query("select floor_datetime(date_col, 'year') from temp_data where row_id = 1", "[2024-01-01 00:00:00.0]", true)
+ query("select ceil_datetime(date_col, 'year') from temp_data where row_id = 1", "[2024-01-01 00:00:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'year') from temp_data where row_id = 1", "[2025-01-01 00:00:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'quarter') from temp_data where row_id = 4", "[2024-04-01 00:00:00.0]", true)
+ query("select ceil_datetime(date_col, 'quarter') from temp_data where row_id = 4", "[2024-04-01 00:00:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'quarter') from temp_data where row_id = 4", "[2024-07-01 00:00:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'month') from temp_data where row_id = 5", "[2024-04-01 00:00:00.0]", true)
+ query("select ceil_datetime(date_col, 'month') from temp_data where row_id = 5", "[2024-05-01 00:00:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'month') from temp_data where row_id = 5", "[2024-05-01 00:00:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'week') from temp_data where row_id = 5", "[2024-04-29 00:00:00.0]", true)
+ query("select ceil_datetime(date_col, 'week') from temp_data where row_id = 5", "[2024-05-06 00:00:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'week') from temp_data where row_id = 5", "[2024-05-06 00:00:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'day') from temp_data where row_id = 3", "[2024-03-31 00:00:00.0]", true)
+ query("select ceil_datetime(date_col, 'day') from temp_data where row_id = 3", "[2024-03-31 00:00:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'day') from temp_data where row_id = 3", "[2024-03-31 00:00:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'hour') from temp_data where row_id = 5", "[2024-04-30 01:00:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'hour') from temp_data where row_id = 5", "[2024-04-30 02:00:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'minute') from temp_data where row_id = 5", "[2024-04-30 01:01:00.0]", true)
+ query("select ceil_datetime(timestamp_col, 'minute') from temp_data where row_id = 5", "[2024-04-30 01:02:00.0]", true)
+ query("select floor_datetime(timestamp_col, 'second') from temp_data where row_id = 5", "[2024-04-30 01:01:01.0]", true)
+ query("select ceil_datetime(timestamp_col, 'second') from temp_data where row_id = 5", "[2024-04-30 01:01:02.0]", true)
+ }
+
+ def query(sql: String, expected: String, fallBackCheck: Boolean = false): Unit = {
+ val df = spark.sql(sql)
+ if (fallBackCheck && GlutenTestUtil.glutenEnabled(spark)) {
+ assert(!GlutenTestUtil.hasFallbackOnStep(df.queryExecution.executedPlan, classOf[ProjectExec]))
+ }
+ val result = df.collect().map(row => row.toString()).mkString
if (!result.equals(expected)) {
print(sql)
print(result)
diff --git a/src/spark-project/spark-common/pom.xml b/src/spark-project/spark-common/pom.xml
index 16a39cafe8c..c7ebb301be6 100644
--- a/src/spark-project/spark-common/pom.xml
+++ b/src/spark-project/spark-common/pom.xml
@@ -36,11 +36,6 @@
-
-
- io.delta
- delta-core_2.12
-
org.apache.kylin
kylin-core-metadata
@@ -95,6 +90,10 @@
it.unimi.dsi
fastutil
+
+ io.delta
+ delta-core_2.12
+
com.github.hipjim
@@ -146,6 +145,31 @@
+
+ org.apache.gluten
+ gluten-core
+ ${gluten.version}
+ test-jar
+ ${gluten.deps.scope}
+
+
+ org.apache.gluten
+ spark-sql-columnar-shims-spark32
+
+
+ org.apache.gluten
+ spark-sql-columnar-shims-spark33
+
+
+ org.apache.gluten
+ spark-sql-columnar-shims-spark34
+
+
+ org.apache.gluten
+ spark-sql-columnar-shims-spark35
+
+
+
org.apache.gluten
gluten-ui
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
index 7b8bb5ff796..84e69f6fa1e 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
@@ -19,15 +19,27 @@
package org.apache.spark.sql.execution.gluten
import org.apache.gluten.execution.FileSourceScanExecTransformer
-
+import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.utils.PushDownUtil
import org.apache.spark.sql.execution.{KylinFileSourceScanExec, LayoutFileSourceScanExec, SparkPlan}
-case class ConvertKylinFileSourceToGlutenRule(session: SparkSession) extends Rule[SparkPlan] {
+class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends Rule[SparkPlan] {
+
+ private def tryReturnGlutenPlan(glutenPlan: GlutenPlan, originPlan: SparkPlan): SparkPlan = {
+ if (glutenPlan.doValidate().isValid) {
+ logDebug(s"Columnar Processing for ${originPlan.getClass} is currently supported.")
+ glutenPlan
+ } else {
+ logDebug(s"Columnar Processing for ${originPlan.getClass} is currently unsupported.")
+ originPlan
+ }
+ }
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case f: KylinFileSourceScanExec =>
+
// convert to Gluten transformer
val transformer = new KylinFileSourceScanExecTransformer(
f.relation,
@@ -37,19 +49,13 @@ case class ConvertKylinFileSourceToGlutenRule(session: SparkSession) extends Rul
None,
f.optionalShardSpec,
f.optionalNumCoalescedBuckets,
- f.dataFilters,
+ PushDownUtil.removeNotSupportPushDownFilters(f.conf, f.output, f.dataFilters),
f.tableIdentifier,
f.disableBucketedScan,
f.sourceScanRows
)
// Transformer validate
- if (transformer.doValidate().isValid) {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
- transformer
- } else {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.")
- f
- }
+ tryReturnGlutenPlan(transformer, f)
case l: LayoutFileSourceScanExec =>
// convert to Gluten transformer
@@ -60,17 +66,11 @@ case class ConvertKylinFileSourceToGlutenRule(session: SparkSession) extends Rul
l.partitionFilters,
l.optionalBucketSet,
l.optionalNumCoalescedBuckets,
- l.dataFilters,
+ PushDownUtil.removeNotSupportPushDownFilters(l.conf, l.output, l.dataFilters),
l.tableIdentifier,
l.disableBucketedScan
)
// Transformer validate
- if (transformer.doValidate().isValid) {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
- transformer
- } else {
- logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.")
- l
- }
+ tryReturnGlutenPlan(transformer, l)
}
}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala
index e0211e26607..6de7384312d 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/KylinFileSourceScanExecTransformer.scala
@@ -61,6 +61,7 @@ class KylinFileSourceScanExecTransformer(@transient relation: HadoopFsRelation,
override def getPartitions: Seq[InputPartition] =
BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
relation,
+ requiredSchema,
selectedPartitions,
output,
bucketedScan,
diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
index 171d14aa9b1..e748876cc56 100644
--- a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
+++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestConfig.scala
@@ -83,7 +83,6 @@ object GlutenTestConfig extends Logging{
conf.set("spark.gluten.sql.columnar.maxBatchSize", "32768")
conf.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
conf.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
- conf.set("spark.sql.decimalOperations.allowPrecisionLoss", "false")
}
}
diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
new file mode 100644
index 00000000000..6774c60f511
--- /dev/null
+++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.common
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.test.FallbackUtil
+import org.apache.gluten.test.FallbackUtil.collectWithSubqueries
+import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+
+object GlutenTestUtil {
+
+ def hasFallback(plan: SparkPlan): Boolean = FallbackUtil.hasFallback(plan)
+
+ def hasFallbackOnStep[T <: SparkPlan](plan: SparkPlan, clazz: Class[T]): Boolean = {
+ var fallbackOperator: Seq[SparkPlan] = null
+ if (plan.isInstanceOf[AdaptiveSparkPlanExec]) {
+ fallbackOperator = collectWithSubqueries(plan) {
+ case plan if !plan.isInstanceOf[GlutenPlan] && !FallbackUtil.skip(plan) =>
+ plan
+ }
+ } else {
+ fallbackOperator = plan.collectWithSubqueries {
+ case plan if !plan.isInstanceOf[GlutenPlan] && !FallbackUtil.skip(plan) =>
+ plan
+ }
+ }
+ if (fallbackOperator.nonEmpty && fallbackOperator.map(_.getClass).contains(clazz)) {
+ true
+ } else {
+ false
+ }
+ }
+
+ def glutenEnabled(spark: SparkSession): Boolean = {
+ spark.conf.get("spark.plugins", "").contains("GlutenPlugin") &&
+ spark.conf.get(
+ GlutenConfig.GLUTEN_ENABLE_KEY,
+ GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString).toBoolean && glutenEnabledForCurrentThread(spark)
+ }
+
+ private def glutenEnabledForCurrentThread(spark: SparkSession): Boolean = {
+ val enabled =
+ spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+ if (enabled != null) {
+ enabled.toBoolean
+ } else {
+ true
+ }
+ }
+}