From 4abc92abbafaeda5f09a6e493222ba9ec850a612 Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Wed, 24 Jul 2024 14:37:53 +0800 Subject: [PATCH] cache mergetree data --- backends-clickhouse/pom.xml | 20 ++ .../sql/parser/GlutenClickhouseSqlBase.g4 | 232 +++++++++++++ .../execution/CHNativeCacheManager.java | 24 +- .../clickhouse/CHSparkPlanExecApi.scala | 7 + .../parser/GlutenClickhouseSqlParser.scala | 316 ++++++++++++++++++ .../spark/rpc/GlutenDriverEndpoint.scala | 4 +- .../spark/rpc/GlutenExecutorEndpoint.scala | 10 +- .../apache/spark/rpc/GlutenRpcMessages.scala | 6 +- .../commands/GlutenCHCacheDataCommand.scala | 268 +++++++++++++++ .../v2/clickhouse/metadata/AddFileTags.scala | 11 +- ...enClickHouseMergeTreeCacheDataSSuite.scala | 172 ++++++++++ ...ClickHouseWholeStageTransformerSuite.scala | 6 +- .../Storages/StorageMergeTreeFactory.cpp | 9 +- cpp-ch/local-engine/local_engine_jni.cpp | 2 +- gluten-celeborn/clickhouse/pom.xml | 11 +- .../gluten/backendsapi/SparkPlanExecApi.scala | 4 + .../extension/OthersExtensionOverrides.scala | 3 + pom.xml | 28 ++ 18 files changed, 1107 insertions(+), 26 deletions(-) create mode 100644 backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 create mode 100644 backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala create mode 100644 backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 5672056b41607..0e05efd2b771a 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -247,6 +247,10 @@ + + org.antlr + antlr4-runtime + @@ -365,6 +369,22 @@ + + org.antlr + antlr4-maven-plugin + + + + antlr4 + + + + + true + ../backends-clickhouse/src/main/antlr4 + true + + diff --git a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 new file mode 100644 index 0000000000000..a4ff5112366e7 --- /dev/null +++ b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +grammar GlutenClickhouseSqlBase; + +@members { + /** + * Verify whether current token is a valid decimal token (which contains dot). + * Returns true if the character that follows the token is not a digit or letter or underscore. + * + * For example: + * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'. + * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'. + * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'. + * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is folllowed + * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+' + * which is not a digit or letter or underscore. + */ + public boolean isValidDecimal() { + int nextChar = _input.LA(1); + if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; + } else { + return true; + } + } +} + +tokens { + DELIMITER +} + +singleStatement + : statement ';'* EOF + ; + +statement + : CACHE META? DATA ASYN? SELECT selectedColumns=selectedColumnNames + FROM (path=STRING | table=qualifiedName) (AFTER filter=filterClause)? + (CACHEPROPERTIES cacheProps=propertyList)? #cacheData + | .*? #passThrough + ; + +qualifiedName + : identifier (DOT identifier)* + ; + +selectedColumnNames + : ASTERISK + | identifier (COMMA identifier)* + ; + +filterClause + : TIMESTAMP AS OF timestamp=STRING + | datepartition=identifier AS OF datetime=STRING + ; + +propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + +property + : key=propertyKey (EQ? value=propertyValue)? + ; + +propertyKey + : identifier (DOT identifier)* + | stringLit + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN + | value=stringLit + ; + +stringLit + : STRING + | DOUBLEQUOTED_STRING + ; + +booleanValue + : TRUE | FALSE + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | quotedIdentifier #quotedIdentifierAlternative + | nonReserved #unquotedIdentifier + ; + +quotedIdentifier + : BACKQUOTED_IDENTIFIER + ; + +// Add keywords here so that people's queries don't break if they have a column name as one of +// these tokens +nonReserved + : CACHE | META | ASYN | DATA + | SELECT | FOR | AFTER | CACHEPROPERTIES + | TIMESTAMP | AS | OF | DATE_PARTITION + ; + +// Define how the keywords above should appear in a user's SQL statement. +CACHE: 'CACHE'; +META: 'META'; +ASYN: 'ASYN'; +DATA: 'DATA'; +SELECT: 'SELECT'; +COMMA: ','; +FOR: 'FOR'; +FROM: 'FROM'; +AFTER: 'AFTER'; +CACHEPROPERTIES: 'CACHEPROPERTIES'; +DOT: '.'; +ASTERISK: '*'; +TIMESTAMP: 'TIMESTAMP'; +AS: 'AS'; +OF: 'OF'; +DATE_PARTITION: 'DATE_PARTITION'; +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +TRUE: 'TRUE'; +FALSE: 'FALSE'; + +EQ : '=' | '=='; +NSEQ: '<=>'; +NEQ : '<>'; +NEQJ: '!='; +LTE : '<=' | '!>'; +GTE : '>=' | '!<'; +CONCAT_PIPE: '||'; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +DOUBLEQUOTED_STRING + :'"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : DIGIT+ 'L' + ; + +SMALLINT_LITERAL + : DIGIT+ 'S' + ; + +TINYINT_LITERAL + : DIGIT+ 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +DECIMAL_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +SIMPLE_COMMENT + : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN) + ; + +BRACKETED_COMMENT + : '/*' .*? '*/' -> channel(HIDDEN) + ; + +WS : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java index 2e9507f647b1f..f5f75dc1dca6d 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java @@ -1,11 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.gluten.execution; import java.util.Set; public class CHNativeCacheManager { - public static void cacheParts(String table, Set columns, boolean async) { - nativeCacheParts(table, String.join(",", columns), async); - } + public static void cacheParts(String table, Set columns, boolean async) { + nativeCacheParts(table, String.join(",", columns), async); + } - private static native void nativeCacheParts(String table, String columns, boolean async); + private static native void nativeCacheParts(String table, String columns, boolean async); } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index b8a76b4210c3f..9e214c738731f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -25,6 +25,7 @@ import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcas import org.apache.gluten.extension.columnar.AddFallbackTagRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.gluten.parser.GlutenClickhouseSqlParser import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy} @@ -40,6 +41,7 @@ import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRew import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet} import org.apache.spark.sql.catalyst.optimizer.BuildSide +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning} @@ -611,6 +613,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { override def genExtendedStrategies(): List[SparkSession => Strategy] = List() + override def genInjectExtendedParser() + : List[(SparkSession, ParserInterface) => ParserInterface] = { + List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface)) + } + /** Define backend specfic expression mappings. */ override def extraExpressionMappings: Seq[Sig] = { List( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala new file mode 100644 index 0000000000000..d4184890ddd3d --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.parser + +import org.apache.gluten.sql.parser.{GlutenClickhouseSqlBaseBaseListener, GlutenClickhouseSqlBaseBaseVisitor, GlutenClickhouseSqlBaseLexer, GlutenClickhouseSqlBaseParser} +import org.apache.gluten.sql.parser.GlutenClickhouseSqlBaseParser._ + +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand +import org.apache.spark.sql.internal.VariableSubstitution +import org.apache.spark.sql.types.{DataType, StructType} + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} +import org.antlr.v4.runtime.tree.TerminalNodeImpl + +import java.util.Locale + +import scala.collection.JavaConverters._ + +class GlutenClickhouseSqlParser(spark: SparkSession, delegate: ParserInterface) + extends ParserInterface { + + private val astBuilder = new GlutenClickhouseSqlAstBuilder + private val substitution = new VariableSubstitution + + override def parsePlan(sqlText: String): LogicalPlan = + parse(sqlText) { + parser => + astBuilder.visit(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => delegate.parsePlan(sqlText) + } + } + + protected def parse[T](command: String)(toResult: GlutenClickhouseSqlBaseParser => T): T = { + val lexer = new GlutenClickhouseSqlBaseLexer( + new UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command)))) + lexer.removeErrorListeners() + lexer.addErrorListener(ParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = new GlutenClickhouseSqlBaseParser(tokenStream) + parser.addParseListener(PostProcessor) + parser.removeErrorListeners() + parser.addErrorListener(ParseErrorListener) + + try { + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } catch { + case e: ParseCancellationException => + // if we fail, parse with LL mode + tokenStream.seek(0) // rewind input stream + parser.reset() + + // Try Again. + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } catch { + case e: ParseException if e.command.isDefined => + throw e + case e: ParseException => + throw e.withCommand(command) + case e: AnalysisException => + val position = Origin(e.line, e.startPosition) + throw new ParseException( + command = Option(command), + message = e.message, + start = position, + stop = position, + errorClass = Some("GLUTEN_CH_PARSING_ANALYSIS_ERROR")) + } + } + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + override def parseQuery(sqlText: String): LogicalPlan = { + delegate.parseQuery(sqlText) + } +} + +class GlutenClickhouseSqlAstBuilder extends GlutenClickhouseSqlBaseBaseVisitor[AnyRef] { + + import org.apache.spark.sql.catalyst.parser.ParserUtils._ + + /** Convert a property list into a key-value map. */ + override def visitPropertyList(ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { + property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * A property key can either be String or a collection of dot separated elements. This function + * extracts the property key based on whether its a string literal or a property identifier. + */ + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.stringLit() != null) { + string(visitStringLit(key.stringLit())) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts the + * property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.identifier != null) { + value.identifier.getText + } else if (value.value != null) { + string(visitStringLit(value.value)) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } + + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", + ctx) + } + props + } + + override def visitStringLit(ctx: StringLitContext): Token = { + if (ctx != null) { + if (ctx.STRING != null) { + ctx.STRING.getSymbol + } else { + ctx.DOUBLEQUOTED_STRING.getSymbol + } + } else { + null + } + } + + override def visitSingleStatement( + ctx: GlutenClickhouseSqlBaseParser.SingleStatementContext): AnyRef = withOrigin(ctx) { + visit(ctx.statement).asInstanceOf[LogicalPlan] + } + + override def visitCacheData(ctx: GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef = + withOrigin(ctx) { + val onlyMetaCache = ctx.META != null + val asynExecute = ctx.ASYN != null + val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null) { + if (ctx.filter.TIMESTAMP != null) { + (Some(string(ctx.filter.timestamp)), None, None) + } else if (ctx.filter.datepartition != null && ctx.filter.datetime != null) { + (None, Some(ctx.filter.datepartition.getText), Some(string(ctx.filter.datetime))) + } else { + throw new ParseException(s"Illegal filter value ${ctx.getText}", ctx) + } + } else { + (None, None, None) + } + val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns) + val tablePropertyOverrides = Option(ctx.cacheProps) + .map(visitPropertyKeyValues) + .getOrElse(Map.empty[String, String]) + + GlutenCHCacheDataCommand( + onlyMetaCache, + asynExecute, + selectedColuman, + Option(ctx.path).map(string), + Option(ctx.table).map(visitTableIdentifier), + tsfilter, + partitionColumn, + partitionValue, + tablePropertyOverrides + ) + } + + override def visitPassThrough(ctx: GlutenClickhouseSqlBaseParser.PassThroughContext): AnyRef = + null + + protected def visitTableIdentifier(ctx: QualifiedNameContext): TableIdentifier = withOrigin(ctx) { + ctx.identifier.asScala.toSeq match { + case Seq(tbl) => TableIdentifier(tbl.getText) + case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText)) + // TODO: Spark 3.5 supports catalog parameter + // case Seq(catalog, db, tbl) => + // TableIdentifier(tbl.getText, Some(db.getText), Some(catalog.getText)) + case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx) + } + } + + override def visitSelectedColumnNames(ctx: SelectedColumnNamesContext): Option[Seq[String]] = + withOrigin(ctx) { + if (ctx != null) { + if (ctx.ASTERISK != null) { + // It means select all columns + None + } else if (ctx.identifier != null && !(ctx.identifier).isEmpty) { + Some(ctx.identifier.asScala.map(_.getText).toSeq) + } else { + throw new ParseException(s"Illegal selected column.", ctx) + } + } else { + throw new ParseException(s"Illegal selected column.", ctx) + } + } +} + +case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener { + + /** Remove the back ticks from an Identifier. */ + override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = { + replaceTokenByIdentifier(ctx, 1) { + token => + // Remove the double back ticks in the string. + token.setText(token.getText.replace("``", "`")) + token + } + } + + /** Treat non-reserved keywords as Identifiers. */ + override def exitNonReserved(ctx: NonReservedContext): Unit = { + replaceTokenByIdentifier(ctx, 0)(identity) + } + + private def replaceTokenByIdentifier(ctx: ParserRuleContext, stripMargins: Int)( + f: CommonToken => CommonToken = identity): Unit = { + val parent = ctx.getParent + parent.removeLastChild() + val token = ctx.getChild(0).getPayload.asInstanceOf[Token] + val newToken = new CommonToken( + new org.antlr.v4.runtime.misc.Pair(token.getTokenSource, token.getInputStream), + GlutenClickhouseSqlBaseParser.IDENTIFIER, + token.getChannel, + token.getStartIndex + stripMargins, + token.getStopIndex - stripMargins + ) + parent.addChild(new TerminalNodeImpl(f(newToken))) + } +} + +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume + override def getSourceName(): String = wrapped.getSourceName + override def index(): Int = wrapped.index + override def mark(): Int = wrapped.mark + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size + + override def getText(interval: Interval): String = wrapped.getText(interval) + + override def LA(i: Int): Int = { + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala index 319381f894b85..a061a620d209d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala @@ -103,7 +103,7 @@ object GlutenDriverEndpoint extends Logging with RemovalListener[String, util.Se var glutenDriverEndpointRef: RpcEndpointRef = _ // keep executorRef on memory - private val executorDataMap = new ConcurrentHashMap[String, ExecutorData] + val executorDataMap = new ConcurrentHashMap[String, ExecutorData] // If spark.scheduler.listenerbus.eventqueue.capacity is set too small, // the listener may lose messages. @@ -131,4 +131,4 @@ object GlutenDriverEndpoint extends Logging with RemovalListener[String, util.Se } } -private class ExecutorData(val executorEndpointRef: RpcEndpointRef) {} +class ExecutorData(val executorEndpointRef: RpcEndpointRef) {} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 60bda00579d51..4d90ab6533ba7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -17,6 +17,7 @@ package org.apache.spark.rpc import org.apache.gluten.execution.{CHBroadcastBuildSideCache, CHNativeCacheManager} + import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.GlutenRpcMessages._ @@ -72,8 +73,13 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) => - CHNativeCacheManager.cacheParts(mergeTreeTable, columns, false) - context.reply(CacheLoadSuccess()) + try { + CHNativeCacheManager.cacheParts(mergeTreeTable, columns, false) + context.reply(CacheLoadResult(true)) + } catch { + case _: Exception => + context.reply(CacheLoadResult(false, s"executor: $executorId cache data failed.")) + } case e => logError(s"Received unexpected message. $e") } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala index c61f84770df6b..d675d705f10a2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala @@ -35,8 +35,8 @@ object GlutenRpcMessages { case class GlutenCleanExecutionResource(executionId: String, broadcastHashIds: util.Set[String]) extends GlutenRpcMessage - case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String]) extends GlutenRpcMessage + case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String]) + extends GlutenRpcMessage - case class CacheLoadSuccess() extends GlutenRpcMessage + case class CacheLoadResult(success: Boolean, reason: String = "") extends GlutenRpcMessage } - diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala new file mode 100644 index 0000000000000..a3661ebbc81b7 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.commands + +import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.substrait.rel.ExtensionTableBuilder + +import org.apache.spark.affinity.CHAffinity +import org.apache.spark.rpc.GlutenDriverEndpoint +import org.apache.spark.rpc.GlutenRpcMessages.{CacheLoadResult, GlutenMergeTreeCacheLoad} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal} +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts +import org.apache.spark.sql.types.{BooleanType, StringType} +import org.apache.spark.util.ThreadUtils + +import org.apache.hadoop.fs.Path + +import java.net.URI +import java.util.{ArrayList => JList} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +case class GlutenCHCacheDataCommand( + onlyMetaCache: Boolean, + asynExecute: Boolean, + selectedColuman: Option[Seq[String]], + path: Option[String], + table: Option[TableIdentifier], + tsfilter: Option[String], + partitionColumn: Option[String], + partitionValue: Option[String], + tablePropertyOverrides: Map[String, String] +) extends LeafRunnableCommand { + + override def output: Seq[Attribute] = Seq( + AttributeReference("result", BooleanType, nullable = false)(), + AttributeReference("reason", StringType, nullable = false)()) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val pathToCache = + if (path.nonEmpty) { + new Path(path.get) + } else if (table.nonEmpty) { + DeltaTableIdentifier(sparkSession, table.get) match { + case Some(id) if id.path.nonEmpty => + new Path(id.path.get) + case _ => + new Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location) + } + } else { + throw DeltaErrors.missingTableIdentifierException("CACHE DATA") + } + + val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToCache) + if (baseDeltaPath.isDefined) { + if (baseDeltaPath.get != pathToCache) { + throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get) + } + } + + val deltaLog = DeltaLog.forTable(sparkSession, pathToCache) + if (!deltaLog.tableExists) { + throw DeltaErrors.notADeltaTableException( + "CACHE DATA", + DeltaTableIdentifier(path = Some(pathToCache.toString))) + } + + val snapshot = deltaLog.update() + + require( + snapshot.version >= 0, + "No state defined for this table. Is this really " + + "a Delta table? Refusing to garbage collect.") + + val allColumns = snapshot.dataSchema.fieldNames.toSeq + val selectedColumns = if (selectedColuman.nonEmpty) { + selectedColuman.get + .filter(allColumns.contains(_)) + .map(ConverterUtils.normalizeColName) + .toSeq + } else { + allColumns.map(ConverterUtils.normalizeColName) + } + + val selectedAddFiles = if (tsfilter.isDefined) { + val allParts = snapshot.filesForScan(Seq.empty, false) + allParts.files.filter(_.modificationTime >= tsfilter.get.toLong).toSeq + } else if (partitionColumn.isDefined && partitionValue.isDefined) { + val partitionColumns = snapshot.metadata.partitionSchema.fieldNames + require( + partitionColumns.contains(partitionColumn.get), + s"the partition column ${partitionColumn.get} is invalid.") + val partitionColumnField = snapshot.metadata.partitionSchema(partitionColumn.get) + + val partitionColumnAttr = AttributeReference( + ConverterUtils.normalizeColName(partitionColumn.get), + partitionColumnField.dataType, + partitionColumnField.nullable)() + val isNotNullExpr = IsNotNull(partitionColumnAttr) + val greaterThanOrEqual = GreaterThanOrEqual(partitionColumnAttr, Literal(partitionValue.get)) + snapshot.filesForScan(Seq(isNotNullExpr, greaterThanOrEqual), false).files + } else { + snapshot.filesForScan(Seq.empty, false).files + } + + val executorIdsToAddFiles = + scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]() + val executorIdsToParts = scala.collection.mutable.Map[String, String]() + executorIdsToAddFiles.put( + GlutenCHCacheDataCommand.ALL_EXECUTORS, + new ArrayBuffer[AddMergeTreeParts]()) + selectedAddFiles.foreach( + addFile => { + val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts] + val partName = mergeTreePart.name + val tableUri = URI.create(mergeTreePart.tablePath) + val relativeTablePath = if (tableUri.getPath.startsWith("/")) { + tableUri.getPath.substring(1) + } else tableUri.getPath + + val locations = CHAffinity.getNativeMergeTreePartLocations(partName, relativeTablePath) + + if (locations.isEmpty) { + // non soft affinity + executorIdsToAddFiles + .get(GlutenCHCacheDataCommand.ALL_EXECUTORS) + .get + .append(mergeTreePart) + } else { + locations.foreach( + executor => { + if (!executorIdsToAddFiles.contains(executor)) { + executorIdsToAddFiles.put(executor, new ArrayBuffer[AddMergeTreeParts]()) + } + executorIdsToAddFiles.get(executor).get.append(mergeTreePart) + }) + } + }) + + executorIdsToAddFiles.foreach( + value => { + val parts = value._2 + val executorId = value._1 + if (parts.nonEmpty) { + val onePart = parts(0) + val partNameList = parts.map(_.name).toSeq + // starts and lengths is useless for write + val partRanges = Seq.range(0L, partNameList.length).map(_ => long2Long(0L)).asJava + + val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( + -1, + -1, + onePart.database, + onePart.table, + ClickhouseSnapshot.genSnapshotId(snapshot), + onePart.tablePath, + "", + snapshot.metadata.configuration.getOrElse("orderByKey", ""), + snapshot.metadata.configuration.getOrElse("lowCardKey", ""), + snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""), + snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""), + snapshot.metadata.configuration.getOrElse("setIndexKey", ""), + snapshot.metadata.configuration.getOrElse("primaryKey", ""), + partNameList.asJava, + partRanges, + partRanges, + ConverterUtils.convertNamedStructJson(snapshot.metadata.schema), + snapshot.metadata.configuration.asJava, + new JList[String]() + ) + + executorIdsToParts.put(executorId, extensionTableNode.getExtensionTableStr) + } + }) + + // send rpc call + if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) { + // send all parts to all executors + val tableMessage = executorIdsToParts.get(GlutenCHCacheDataCommand.ALL_EXECUTORS).get + if (asynExecute) { + GlutenDriverEndpoint.executorDataMap.forEach( + (executorId, executor) => { + executor.executorEndpointRef.send( + GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava)) + }) + Seq(Row(true, "")) + } else { + val futureList = ArrayBuffer[Future[CacheLoadResult]]() + val resultList = ArrayBuffer[CacheLoadResult]() + GlutenDriverEndpoint.executorDataMap.forEach( + (executorId, executor) => { + futureList.append( + executor.executorEndpointRef.ask[CacheLoadResult]( + GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava) + )) + }) + futureList.foreach( + f => { + resultList.append(ThreadUtils.awaitResult(f, Duration.Inf)) + }) + if (resultList.exists(!_.success)) { + Seq(Row(false, resultList.filter(!_.success).map(_.reason).mkString(";"))) + } else { + Seq(Row(true, "")) + } + } + } else { + if (asynExecute) { + executorIdsToParts.foreach( + value => { + val executorData = GlutenDriverEndpoint.executorDataMap.get(value._1) + if (executorData != null) { + executorData.executorEndpointRef.send( + GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)) + } + }) + Seq(Row(true, "")) + } else { + val futureList = ArrayBuffer[Future[CacheLoadResult]]() + val resultList = ArrayBuffer[CacheLoadResult]() + executorIdsToParts.foreach( + value => { + val executorData = GlutenDriverEndpoint.executorDataMap.get(value._1) + if (executorData != null) { + futureList.append( + executorData.executorEndpointRef.ask[CacheLoadResult]( + GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava) + )) + } + }) + futureList.foreach( + f => { + resultList.append(ThreadUtils.awaitResult(f, Duration.Inf)) + }) + if (resultList.exists(!_.success)) { + Seq(Row(false, resultList.filter(!_.success).map(_.reason).mkString(";"))) + } else { + Seq(Row(true, "")) + } + } + } + } +} + +object GlutenCHCacheDataCommand { + val ALL_EXECUTORS = "allExecutors" +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala index 8acc23aec2070..71d5c54318348 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/metadata/AddFileTags.scala @@ -33,7 +33,7 @@ class AddMergeTreeParts( val database: String, val table: String, val engine: String, // default is "MergeTree" - override val path: String, // table path + val tablePath: String, // table path val targetNode: String, // the node which the current part is generated val name: String, // part name val uuid: String, @@ -98,7 +98,7 @@ object AddFileTags { database: String, table: String, engine: String, - path: String, + tablePath: String, targetNode: String, name: String, uuid: String, @@ -125,7 +125,7 @@ object AddFileTags { "database" -> database, "table" -> table, "engine" -> engine, - "path" -> path, + "path" -> tablePath, "targetNode" -> targetNode, "partition" -> partition, "uuid" -> uuid, @@ -161,7 +161,7 @@ object AddFileTags { addFile.tags.get("database").get, addFile.tags.get("table").get, addFile.tags.get("engine").get, - addFile.path, + addFile.tags.get("path").get, addFile.tags.get("targetNode").get, addFile.path, addFile.tags.get("uuid").get, @@ -199,6 +199,7 @@ object AddFileTags { mapper.readValue(returnedMetrics, new TypeReference[JList[WriteReturnedMetric]]() {}) var addFiles = new ArrayBuffer[AddFile]() val path = new Path(originPathStr) + val modificationTime = System.currentTimeMillis() addFiles.appendAll(values.asScala.map { value => AddFileTags.partsInfoToAddFile( @@ -213,7 +214,7 @@ object AddFileTags { value.getDiskSize, -1L, -1L, - -1L, + modificationTime, "", -1L, -1L, diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala new file mode 100644 index 0000000000000..59abe4c01fe92 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem + +import java.io.File + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class GlutenClickHouseMergeTreeCacheDataSSuite + extends GlutenClickHouseTPCHAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.io.compression.codec", "LZ4") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", + "false") + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/tmp/ch_path") + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + val conf = new Configuration + conf.set("fs.defaultFS", HDFS_URL) + val fs = FileSystem.get(conf) + fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH)) + FileUtils.forceMkdir(new File(HDFS_CACHE_PATH)) + } + + test("test mergetree table write") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_shipdate) + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main', + | orderByKey='l_linenumber,l_orderkey') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem a + | where a.l_shipdate between date'1995-01-01' and date'1995-01-31' + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + + val res = spark + .sql(s""" + |cache data + | select * from lineitem_mergetree_hdfs + | after l_shipdate AS OF '1995-01-10' + | CACHEPROPERTIES(storage_policy='__hdfs_main', + | aaa='ccc')""".stripMargin) + .collect() + assertResult(true)(res(0).getBoolean(0)) + val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + assertResult(true)(metaPath.exists() && metaPath.isDirectory) + assertResult(22)(metaPath.list().length) + + val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() + assertResult(true)(res1(0).getBoolean(0)) + assertResult(31)(metaPath.list().length) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate >= date'1995-01-10' + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(7898)(addFiles.map(_.rows).sum) + }) + spark.sql("drop table lineitem_mergetree_hdfs purge") + } +} +// scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index 9412326ae342c..d66f386de7888 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -49,7 +49,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/" val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/" - val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020" + val HDFS_URL_ENDPOINT = s"hdfs://192.168.0.158:9000" val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" @@ -82,7 +82,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu "/tmp/user_defined") if (UTSystemParameters.testMergeTreeOnObjectStorage) { conf - .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) + /* .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) @@ -120,7 +120,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu "main") .set( "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk", - "s3_cache") + "s3_cache") */ .set( "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type", "hdfs_gluten") diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp index d2ffc19fb16f0..b638d9bc27eaf 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp @@ -67,7 +67,14 @@ StorageMergeTreeFactory::getStorage(const StorageID& id, const String & snapshot auto new_storage = creator(); if (storage_map->has(table_name) && !storage_map->get(table_name)->second.sameStructWith(merge_tree_table)) { - freeStorage(id); + // freeStorage(id); + if (storage_map->has(table_name)) + storage_map->remove(table_name); + { + std::lock_guard lock(datapart_mutex); + if (datapart_map->has(table_name)) + datapart_map->remove(table_name); + } } if (!storage_map->has(table_name)) diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 914a08a616a98..2e9c2c0fc3528 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1297,7 +1297,7 @@ JNIEXPORT jlong Java_org_apache_gluten_memory_alloc_CHNativeMemoryAllocator_byte LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * env, jstring table_, jstring columns_, jboolean async_) +JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean async_) { LOCAL_ENGINE_JNI_METHOD_START auto table_def = jstring2string(env, table_); diff --git a/gluten-celeborn/clickhouse/pom.xml b/gluten-celeborn/clickhouse/pom.xml index f17f5968d3511..4c7902c927379 100755 --- a/gluten-celeborn/clickhouse/pom.xml +++ b/gluten-celeborn/clickhouse/pom.xml @@ -70,14 +70,15 @@ org.apache.spark spark-hive_${scala.binary.version} ${spark.version} + + + org.antlr + * + + test-jar test - - org.apache.spark - spark-hive_${scala.binary.version} - test - org.apache.hive.hcatalog hive-hcatalog-core diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 273443f647ab6..8f24afae1da48 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BuildSide +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -468,6 +469,9 @@ trait SparkPlanExecApi { def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] + def genInjectExtendedParser(): List[(SparkSession, ParserInterface) => ParserInterface] = + List.empty + def genGetStructFieldTransformer( substraitExprName: String, childTransformer: ExpressionTransformer, diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala index 0897f411fce51..f2ccf6e81ca17 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/OthersExtensionOverrides.scala @@ -23,6 +23,9 @@ import org.apache.spark.sql.SparkSessionExtensions object OthersExtensionOverrides extends GlutenSparkExtensionsInjector { override def inject(extensions: SparkSessionExtensions): Unit = { + BackendsApiManager.getSparkPlanExecApiInstance + .genInjectExtendedParser() + .foreach(extensions.injectParser) BackendsApiManager.getSparkPlanExecApiInstance .genExtendedAnalyzers() .foreach(extensions.injectResolutionRule) diff --git a/pom.xml b/pom.xml index 4f8bd3e14f61f..125ccd4286e28 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ 2.7.4 2.0.7 2.20.0 + 4.9.3 UTF-8 UTF-8 spark-sql-columnar @@ -261,6 +262,7 @@ delta-core 2.0.1 20 + 4.8 @@ -275,6 +277,7 @@ delta-core 2.3.0 23 + 4.8 @@ -288,6 +291,7 @@ delta-core 2.4.0 24 + 4.9.3 @@ -303,6 +307,7 @@ 32 2.15.1 3.3.4 + 4.9.3 @@ -531,6 +536,10 @@ org.apache.hadoop hadoop-client-runtime + + org.antlr + * + @@ -542,6 +551,10 @@ org.apache.arrow * + + org.antlr + * + provided @@ -624,6 +637,10 @@ org.apache.arrow * + + org.antlr + * + test-jar test @@ -731,6 +748,12 @@ maven-source-plugin 3.2.1 + + org.antlr + antlr4-runtime + ${antlr4.version} + provided + @@ -983,6 +1006,11 @@ protobuf-maven-plugin 0.5.1 + + org.antlr + antlr4-maven-plugin + ${antlr4.version} +