Skip to content

Commit

Permalink
KE-43825 Upgrade gluten (240701) (#31819)
Browse files Browse the repository at this point in the history
1. Disable gluten in analyze table
2. Gluten support delta 2.3.0
- apache/incubator-gluten#5902
- apache/incubator-gluten#5945
3. Support sum0
4. Fix build
- apache/incubator-gluten#5796
- apache/incubator-gluten#5767
5. Fix SlowQueryDetectorTest.testSparderTimeoutCancelJob due to context dirty
6. Cleanup threadlocal contexts
7. Fix GMT+8 not support
8. Fix case class test coverage
9. Add 1 gluten disabled case in analyze table
10.Remove unsupported pushdown filter:
    10.1. when we create KylinFileSourceScanExec, we didn't remove subquery filter.
    10.2. KylinFileSourceScanExec doesn't inherit from FileSourceScanExec, we miss chance to correct push down filter.
11. native support floor_datetime and ceil_datetime
12. native support kap_add_months and kap_months_between
13. native support _ymdint_between
14. native support truncate
15. native support kylin_split_part
16. native support kylin instr
baibaichen authored and loneylee committed Jul 31, 2024
1 parent b8fb686 commit 6682bee
Showing 22 changed files with 312 additions and 79 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -129,9 +129,9 @@

<!-- Spark versions -->
<spark.version>3.3.0-kylin-4.6.26.0</spark.version>
<delta.version>2.2.0</delta.version>
<delta.version>2.3.0</delta.version>

<gluten.version>1.2.0-kylin-240518-SNAPSHOT</gluten.version>
<gluten.version>1.2.0-kylin-240701-SNAPSHOT</gluten.version>
<gluten.deps.scope>compile</gluten.deps.scope>
<substrait.version>0.5.0</substrait.version>
<celeborn.version>0.3.0-incubating</celeborn.version>
Original file line number Diff line number Diff line change
@@ -317,7 +317,6 @@ public void addDataFetchTime(long dataFetchTime) {
@Setter
private List<Long> scanBytes;


@Getter
@Setter
private Boolean glutenFallback;
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.Ignore;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -34,7 +34,7 @@
@MetadataInfo(onlyProps = true)
class TransactionProjectLockTest {

@Test
@Ignore("Deprecated ProjectLock")
void testStravition() {
val threads = Lists.<LogicThread> newArrayList();

Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ public static void checkQueryCanceledOrThreadInterrupted(String cause, String st
"Manually stop the query %s. Caused: %s. Step: %s", entry.getQueryId(), cause, step));
}

if (entry.getPlannerCancelFlag().isCancelRequested() && Thread.currentThread().isInterrupted()) {
if (entry.getPlannerCancelFlag().isCancelRequested() && entry.isTimeoutStop) {
QueryContext.current().getQueryTagInfo().setTimeout(true);
throw new KylinTimeoutException(String.format(Locale.ROOT,
"Run out of time of the query %s. Caused: %s. Step: %s", entry.getQueryId(), cause, step));
Original file line number Diff line number Diff line change
@@ -79,7 +79,8 @@ public static void clearCanceledSlowQueriesStatus() {
public void queryStart(String stopId) {
runningQueries.put(currentThread(), new QueryEntry(System.currentTimeMillis(), currentThread(),
QueryContext.current().getQueryId(), QueryContext.current().getUserSQL(), stopId, false,
QueryContext.current().getQueryTagInfo().isAsyncQuery(), null, CancelFlag.getContextCancelFlag()));
QueryContext.current().getQueryTagInfo().isAsyncQuery(), false, null,
CancelFlag.getContextCancelFlag()));
}

public void addJobIdForAsyncQueryJob(String jobId) {
@@ -188,19 +189,21 @@ public class QueryEntry {
final String stopId;
boolean isStopByUser;
final boolean isAsyncQuery;
boolean isTimeoutStop;
String jobId;
final CancelFlag plannerCancelFlag;

public long getRunningTime() {
return (System.currentTimeMillis() - startTime) / 1000;
}

public boolean setInterruptIfTimeout() {
public synchronized boolean setInterruptIfTimeout() {
if (isAsyncQuery) {
return false;
}
long runningMs = System.currentTimeMillis() - startTime;
if (runningMs >= queryTimeoutMs) {
isTimeoutStop = true;
plannerCancelFlag.requestCancel();
thread.interrupt();
logger.error("Trying to cancel query: {}", thread.getName());
@@ -209,5 +212,9 @@ public boolean setInterruptIfTimeout() {

return false;
}

public synchronized CancelFlag getPlannerCancelFlag() {
return plannerCancelFlag;
}
}
}
Original file line number Diff line number Diff line change
@@ -34,12 +34,31 @@
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.query.relnode.ContextUtil;
import org.apache.kylin.query.util.SlowQueryDetector;
import org.junit.After;
import org.junit.Before;
import org.sparkproject.guava.collect.Sets;

public class NLocalWithSparkSessionTest extends NLocalWithSparkSessionTestBase {

protected IndexDataConstructor indexDataConstructor = new IndexDataConstructor(getProject());

@Before
public void setUp() throws Exception {
super.setUp();
indexDataConstructor = new IndexDataConstructor(getProject());
SlowQueryDetector.getRunningQueries().clear();
ContextUtil.clearThreadLocalContexts();
}

@After
public void tearDown() throws Exception {
this.cleanupTestMetadata();
SlowQueryDetector.getRunningQueries().clear();
ContextUtil.clearThreadLocalContexts();
}

protected void fullBuild(String dfName) throws Exception {
indexDataConstructor.buildDataflow(dfName);
}
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.pushdown.SparkSqlClient;
import org.apache.kylin.query.relnode.ContextUtil;
import org.apache.kylin.query.runtime.plan.ResultPlan;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.query.util.QueryUtil;
@@ -236,6 +237,7 @@ public void testSparderTimeoutCancelJob() throws Exception {
slowQueryDetector.queryStart("");
try {
SparderEnv.cleanCompute();
ContextUtil.clearThreadLocalContexts();
long t = System.currentTimeMillis();
ResultPlan.getResult(mockDf, null);
ExecAndComp.queryModel(getProject(), "select sum(price) from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME");
@@ -244,6 +246,10 @@ public void testSparderTimeoutCancelJob() throws Exception {
logger.error(error);
Assert.fail(error);
} catch (Exception e) {
boolean timeout = QueryContext.current().getQueryTagInfo().isTimeout();
if (!timeout) {
logger.error("Unexpected query exception", e);
}
Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
Assert.assertTrue(e instanceof KylinTimeoutException);
Assert.assertEquals(
Original file line number Diff line number Diff line change
@@ -74,8 +74,6 @@ public class SQLResponse implements Serializable {

private List<Long> scanRows;

private Boolean glutenFallback;

private List<Long> scanBytes;

private String appMasterURL = "";
@@ -212,7 +210,6 @@ public SQLResponse wrapResultOfQueryContext(QueryContext queryContext) {
this.setScanRows(queryContext.getMetrics().getScanRows());
this.setScanBytes(queryContext.getMetrics().getScanBytes());
this.setShufflePartitions(queryContext.getShufflePartitions());
this.setGlutenFallback(queryContext.getMetrics().getGlutenFallback());
return this;
}

10 changes: 5 additions & 5 deletions src/spark-project/engine-spark/pom.xml
Original file line number Diff line number Diff line change
@@ -36,11 +36,6 @@
</properties>

<dependencies>
<!-- FIXME: gluten backends-clickhouse overwrite delta-core, use vanilla delta-core first temporally-->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-metadata</artifactId>
@@ -167,6 +162,11 @@
<artifactId>volcano-client</artifactId>
<version>5.12.2</version>
</dependency>

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
</dependency>
<dependency>
<groupId>com.github.hipjim</groupId>
<artifactId>scala-retry_2.12</artifactId>
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kylin.GlutenDisabled;
import org.apache.kylin.GlutenRunner;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -34,10 +36,12 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import lombok.val;
import lombok.var;

@RunWith(GlutenRunner.class)
public class TableAnalyzerTest extends NLocalWithSparkSessionTestBase {

private NTableMetadataManager tableMgr;
@@ -95,6 +99,7 @@ public void testSampleFullTable() {
}

@Test
@GlutenDisabled("max(col) with null data gluten returns NaN, but spark return null")
public void testSampleTableForColumnOrRowAlwaysNull() {
// case 1: this case test specified column always null, corresponding column is 'CATEG_BUSN_MGR'
TableDesc testCategoryGroupings = tableMgr.getTableDesc("DEFAULT.TEST_CATEGORY_GROUPINGS");
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
package org.apache.kylin.query.pushdown

import org.apache.commons.lang3.StringUtils
import org.apache.gluten.utils.FallbackUtil
import org.apache.gluten.test.FallbackUtil
import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
import org.apache.kylin.guava30.shaded.common.collect.ImmutableList
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.{lang, util}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
import org.apache.commons.io.IOUtils
import org.apache.gluten.utils.{FallbackUtil, QueryPlanSelector}
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.exception.code.ErrorCodeServer
import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException}
@@ -137,8 +137,6 @@ object ResultPlan extends LogEx {
QueryContext.current.record("collect_result")

val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
val glutenFallback = FallbackUtil.hasFallback(df.queryExecution.executedPlan)
QueryContext.current().getMetrics.setGlutenFallback(glutenFallback)
val (jobCount, stageCount, taskCount) = QueryMetricUtils.collectTaskRelatedMetrics(jobGroup, sparkContext)
QueryContext.current().getMetrics.setScanRows(scanRows)

Loading

0 comments on commit 6682bee

Please sign in to comment.