diff --git a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4 b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4 new file mode 100644 index 000000000000..abdb0cdbf81e --- /dev/null +++ b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4 @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +grammar GlutenCacheFileSqlBase; + +@members { + /** + * Verify whether current token is a valid decimal token (which contains dot). + * Returns true if the character that follows the token is not a digit or letter or underscore. + * + * For example: + * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'. + * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'. + * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'. + * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is folllowed + * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+' + * which is not a digit or letter or underscore. + */ + public boolean isValidDecimal() { + int nextChar = _input.LA(1); + if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; + } else { + return true; + } + } +} + +tokens { + DELIMITER +} + +singleStatement + : statement ';'* EOF + ; + +statement + : CACHE FILES ASYNC? SELECT selectedColumns=selectedColumnNames + FROM (path=STRING) + (CACHEPROPERTIES cacheProps=propertyList)? #cacheFiles + | .*? #passThrough + ; + +selectedColumnNames + : ASTERISK + | identifier (COMMA identifier)* + ; + +propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + +property + : key=propertyKey (EQ? value=propertyValue)? + ; + +propertyKey + : identifier (DOT identifier)* + | stringLit + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN + | value=stringLit + ; + +stringLit + : STRING + | DOUBLEQUOTED_STRING + ; + +booleanValue + : TRUE | FALSE + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | quotedIdentifier #quotedIdentifierAlternative + | nonReserved #unquotedIdentifier + ; + +quotedIdentifier + : BACKQUOTED_IDENTIFIER + ; + +// Add keywords here so that people's queries don't break if they have a column name as one of +// these tokens +nonReserved + : CACHE | FILES | ASYNC + | SELECT | FOR | AFTER | CACHEPROPERTIES + | TIMESTAMP | AS | OF | DATE_PARTITION + | + ; + +// Define how the keywords above should appear in a user's SQL statement. +CACHE: 'CACHE'; +META: 'META'; +ASYNC: 'ASYNC'; +SELECT: 'SELECT'; +COMMA: ','; +FOR: 'FOR'; +FROM: 'FROM'; +AFTER: 'AFTER'; +CACHEPROPERTIES: 'CACHEPROPERTIES'; +DOT: '.'; +ASTERISK: '*'; +TIMESTAMP: 'TIMESTAMP'; +AS: 'AS'; +OF: 'OF'; +DATE_PARTITION: 'DATE_PARTITION'; +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +TRUE: 'TRUE'; +FALSE: 'FALSE'; +FILES: 'FILES'; + +EQ : '=' | '=='; +NSEQ: '<=>'; +NEQ : '<>'; +NEQJ: '!='; +LTE : '<=' | '!>'; +GTE : '>=' | '!<'; +CONCAT_PIPE: '||'; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +DOUBLEQUOTED_STRING + :'"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : DIGIT+ 'L' + ; + +SMALLINT_LITERAL + : DIGIT+ 'S' + ; + +TINYINT_LITERAL + : DIGIT+ 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +DECIMAL_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +SIMPLE_COMMENT + : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN) + ; + +BRACKETED_COMMENT + : '/*' .*? '*/' -> channel(HIDDEN) + ; + +WS : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; diff --git a/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala new file mode 100644 index 000000000000..48d26498f121 --- /dev/null +++ b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.parser + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.{DataType, StructType} + +class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface) + extends GlutenCacheFileSqlParserBase { + + override def parsePlan(sqlText: String): LogicalPlan = + parse(sqlText) { + parser => + astBuilder.visit(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => delegate.parsePlan(sqlText) + } + } + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } +} diff --git a/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala new file mode 100644 index 000000000000..9a0cde772843 --- /dev/null +++ b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.parser + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.{DataType, StructType} + +class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface) + extends GlutenCacheFileSqlParserBase { + + override def parsePlan(sqlText: String): LogicalPlan = + parse(sqlText) { + parser => + astBuilder.visit(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => delegate.parsePlan(sqlText) + } + } + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + override def parseQuery(sqlText: String): LogicalPlan = { + delegate.parseQuery(sqlText) + } +} diff --git a/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala new file mode 100644 index 000000000000..9a0cde772843 --- /dev/null +++ b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.parser + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.{DataType, StructType} + +class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface) + extends GlutenCacheFileSqlParserBase { + + override def parsePlan(sqlText: String): LogicalPlan = + parse(sqlText) { + parser => + astBuilder.visit(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => delegate.parsePlan(sqlText) + } + } + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + override def parseQuery(sqlText: String): LogicalPlan = { + delegate.parseQuery(sqlText) + } +} diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java index 7b765924fa0d..4033d8c6b1cc 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java @@ -30,4 +30,9 @@ public static CacheResult getCacheStatus(String jobId) { } private static native CacheResult nativeGetCacheStatus(String jobId); + + public static native String nativeCacheFiles(byte[] files); + + // only for ut + public static native void removeFiles(String file, String cacheName); } diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java index 0fa69e0d0b1f..b6d538039ec4 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java @@ -16,7 +16,9 @@ */ package org.apache.gluten.execution; -public class CacheResult { +import java.io.Serializable; + +public class CacheResult implements Serializable { public enum Status { RUNNING(0), SUCCESS(1), diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java index f95aaa323c10..39dd94965835 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java @@ -35,6 +35,24 @@ public class MetricsStep { @JsonProperty("selected_marks") protected long selectedMarks; + @JsonProperty("read_cache_hits") + protected long readCacheHits; + + @JsonProperty("miss_cache_hits") + protected long missCacheHits; + + @JsonProperty("read_cache_bytes") + protected long readCacheBytes; + + @JsonProperty("read_miss_bytes") + protected long readMissBytes; + + @JsonProperty("read_cache_millisecond") + protected long readCacheMillisecond; + + @JsonProperty("miss_cache_millisecond") + protected long missCacheMillisecond; + public String getName() { return name; } @@ -82,4 +100,52 @@ public long getTotalMarksPk() { public long getSelectedMarksPk() { return selectedMarksPk; } + + public long getReadCacheHits() { + return readCacheHits; + } + + public void setReadCacheHits(long readCacheHits) { + this.readCacheHits = readCacheHits; + } + + public long getMissCacheHits() { + return missCacheHits; + } + + public void setMissCacheHits(long missCacheHits) { + this.missCacheHits = missCacheHits; + } + + public long getReadCacheBytes() { + return readCacheBytes; + } + + public void setReadCacheBytes(long readCacheBytes) { + this.readCacheBytes = readCacheBytes; + } + + public long getReadMissBytes() { + return readMissBytes; + } + + public void setReadMissBytes(long readMissBytes) { + this.readMissBytes = readMissBytes; + } + + public long getReadCacheMillisecond() { + return readCacheMillisecond; + } + + public void setReadCacheMillisecond(long readCacheMillisecond) { + this.readCacheMillisecond = readCacheMillisecond; + } + + public long getMissCacheMillisecond() { + return missCacheMillisecond; + } + + public void setMissCacheMillisecond(long missCacheMillisecond) { + this.missCacheMillisecond = missCacheMillisecond; + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 7519580b9cb7..c77d5726222c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -22,6 +22,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.memory.CHThreadGroup import org.apache.gluten.metrics.{IMetrics, NativeMetrics} +import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel._ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat @@ -164,6 +165,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val paths = new JArrayList[String]() val starts = new JArrayList[JLong]() val lengths = new JArrayList[JLong]() + val fileSizes = new JArrayList[JLong]() + val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] f.files.foreach { file => @@ -173,6 +176,16 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { // TODO: Support custom partition location val partitionColumn = new JHashMap[String, String]() partitionColumns.add(partitionColumn) + val (fileSize, modificationTime) = + SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file) + (fileSize, modificationTime) match { + case (Some(size), Some(time)) => + fileSizes.add(JLong.valueOf(size)) + modificationTimes.add(JLong.valueOf(time)) + case _ => + fileSizes.add(0) + modificationTimes.add(0) + } } val preferredLocations = CHAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations()) @@ -181,8 +194,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { paths, starts, lengths, - new JArrayList[JLong](), - new JArrayList[JLong](), + fileSizes, + modificationTimes, partitionColumns, new JArrayList[JMap[String, String]](), fileFormat, diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index 9058bffd8d57..c53448cdd858 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -128,7 +128,25 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"), "selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected marks"), - "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary") + "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary"), + "readCacheHits" -> SQLMetrics.createMetric( + sparkContext, + "Number of times the read from filesystem cache hit the cache"), + "missCacheHits" -> SQLMetrics.createMetric( + sparkContext, + "Number of times the read from filesystem cache miss the cache"), + "readCacheBytes" -> SQLMetrics.createSizeMetric( + sparkContext, + "Bytes read from filesystem cache"), + "readMissBytes" -> SQLMetrics.createSizeMetric( + sparkContext, + "Bytes read from filesystem cache source (from remote fs, etc)"), + "readCacheMillisecond" -> SQLMetrics.createTimingMetric( + sparkContext, + "Time reading from filesystem cache"), + "missCacheMillisecond" -> SQLMetrics.createTimingMetric( + sparkContext, + "Time reading from filesystem cache source (from remote filesystem, etc)") ) override def genFileSourceScanTransformerMetricsUpdater( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 177d6a6f0f4c..f4a7522d30c2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -24,7 +24,7 @@ import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector} import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector} -import org.apache.gluten.parser.GlutenClickhouseSqlParser +import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} @@ -44,6 +44,8 @@ private object CHRuleApi { def injectSpark(injector: SparkInjector): Unit = { // Regular Spark rules. injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply) + injector.injectParser( + (spark, parserInterface) => new GlutenCacheFilesSqlParser(spark, parserInterface)) injector.injectParser( (spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface)) injector.injectResolutionRule( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala index f44c5ed1a1dd..4dcae8feb92b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala @@ -35,9 +35,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val extraTime: SQLMetric = metrics("extraTime") val inputWaitTime: SQLMetric = metrics("inputWaitTime") val outputWaitTime: SQLMetric = metrics("outputWaitTime") - val selected_marks_pk: SQLMetric = metrics("selectedMarksPk") - val selected_marks: SQLMetric = metrics("selectedMarks") - val total_marks_pk: SQLMetric = metrics("totalMarksPk") + val selectedMarksPK: SQLMetric = metrics("selectedMarksPk") + val selectedMarks: SQLMetric = metrics("selectedMarks") + val totalMarksPK: SQLMetric = metrics("totalMarksPk") + val readCacheHits: SQLMetric = metrics("readCacheHits") + val missCacheHits: SQLMetric = metrics("missCacheHits") + val readCacheBytes: SQLMetric = metrics("readCacheBytes") + val readMissBytes: SQLMetric = metrics("readMissBytes") + val readCacheMillisecond: SQLMetric = metrics("readCacheMillisecond") + val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value) @@ -56,9 +62,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric metricsData.getSteps.forEach( step => { - selected_marks_pk += step.selectedMarksPk - selected_marks += step.selectedMarks - total_marks_pk += step.totalMarksPk + selectedMarksPK += step.selectedMarksPk + selectedMarks += step.selectedMarks + totalMarksPK += step.totalMarksPk + readCacheHits += step.readCacheHits + missCacheHits += step.missCacheHits + readCacheBytes += step.readCacheBytes + readMissBytes += step.readMissBytes + readCacheMillisecond += step.readCacheMillisecond + missCacheMillisecond += step.missCacheMillisecond }) MetricsUtil.updateExtraTimeMetric( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala new file mode 100644 index 000000000000..b031dcf7a1b4 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.parser + +import org.apache.gluten.sql.parser.{GlutenCacheFileSqlBaseBaseListener, GlutenCacheFileSqlBaseBaseVisitor, GlutenCacheFileSqlBaseLexer, GlutenCacheFileSqlBaseParser} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.execution.commands.GlutenCacheFilesCommand +import org.apache.spark.sql.internal.VariableSubstitution + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.ParseCancellationException +import org.antlr.v4.runtime.tree.TerminalNodeImpl + +import java.util.Locale + +import scala.collection.JavaConverters._ + +trait GlutenCacheFileSqlParserBase extends ParserInterface { + protected val astBuilder = new GlutenCacheFileSqlAstBuilder + protected val substitution = new VariableSubstitution + + protected def parse[T](command: String)(toResult: GlutenCacheFileSqlBaseParser => T): T = { + val lexer = new GlutenCacheFileSqlBaseLexer( + new UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command)))) + lexer.removeErrorListeners() + lexer.addErrorListener(ParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = new GlutenCacheFileSqlBaseParser(tokenStream) + + parser.addParseListener(GlutenCacheFileSqlPostProcessor) + parser.removeErrorListeners() + parser.addErrorListener(ParseErrorListener) + + try { + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } catch { + case e: ParseCancellationException => + // if we fail, parse with LL mode + tokenStream.seek(0) // rewind input stream + parser.reset() + + // Try Again. + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } catch { + case e: ParseException if e.command.isDefined => + throw e + case e: ParseException => + throw e.withCommand(command) + case e: AnalysisException => + val position = Origin(e.line, e.startPosition) + throw new ParseException( + command = Option(command), + message = e.message, + start = position, + stop = position, + errorClass = Some("GLUTEN_CACHE_FILE_PARSING_ANALYSIS_ERROR")) + } + } +} + +class GlutenCacheFileSqlAstBuilder extends GlutenCacheFileSqlBaseBaseVisitor[AnyRef] { + import org.apache.spark.sql.catalyst.parser.ParserUtils._ + + /** Convert a property list into a key-value map. */ + override def visitPropertyList( + ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String, String] = + withOrigin(ctx) { + val properties = ctx.property.asScala.map { + property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * A property key can either be String or a collection of dot separated elements. This function + * extracts the property key based on whether its a string literal or a property identifier. + */ + override def visitPropertyKey(key: GlutenCacheFileSqlBaseParser.PropertyKeyContext): String = { + if (key.stringLit() != null) { + string(visitStringLit(key.stringLit())) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts the + * property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue( + value: GlutenCacheFileSqlBaseParser.PropertyValueContext): String = { + if (value == null) { + null + } else if (value.identifier != null) { + value.identifier.getText + } else if (value.value != null) { + string(visitStringLit(value.value)) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } + + def visitPropertyKeyValues( + ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", + ctx) + } + props + } + + override def visitStringLit(ctx: GlutenCacheFileSqlBaseParser.StringLitContext): Token = { + if (ctx != null) { + if (ctx.STRING != null) { + ctx.STRING.getSymbol + } else { + ctx.DOUBLEQUOTED_STRING.getSymbol + } + } else { + null + } + } + + override def visitSingleStatement( + ctx: GlutenCacheFileSqlBaseParser.SingleStatementContext): AnyRef = withOrigin(ctx) { + visit(ctx.statement).asInstanceOf[LogicalPlan] + } + + override def visitCacheFiles(ctx: GlutenCacheFileSqlBaseParser.CacheFilesContext): AnyRef = + withOrigin(ctx) { + val asynExecute = ctx.ASYNC != null + val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns) + val propertyOverrides = Option(ctx.cacheProps) + .map(visitPropertyKeyValues) + .getOrElse(Map.empty[String, String]) + val path = ctx.path.getText + + GlutenCacheFilesCommand( + asynExecute, + selectedColuman, + path.substring(1, path.length - 1), + propertyOverrides + ) + } + + override def visitPassThrough(ctx: GlutenCacheFileSqlBaseParser.PassThroughContext): AnyRef = + null + + override def visitSelectedColumnNames( + ctx: GlutenCacheFileSqlBaseParser.SelectedColumnNamesContext): Option[Seq[String]] = + withOrigin(ctx) { + if (ctx != null) { + if (ctx.ASTERISK != null) { + // It means select all columns + None + } else if (ctx.identifier != null && !(ctx.identifier).isEmpty) { + Some(ctx.identifier.asScala.map(_.getText).toSeq) + } else { + throw new ParseException(s"Illegal selected column.", ctx) + } + } else { + throw new ParseException(s"Illegal selected column.", ctx) + } + } +} + +case object GlutenCacheFileSqlPostProcessor extends GlutenCacheFileSqlBaseBaseListener { + + /** Remove the back ticks from an Identifier. */ + override def exitQuotedIdentifier( + ctx: GlutenCacheFileSqlBaseParser.QuotedIdentifierContext): Unit = { + replaceTokenByIdentifier(ctx, 1) { + token => + // Remove the double back ticks in the string. + token.setText(token.getText.replace("``", "`")) + token + } + } + + /** Treat non-reserved keywords as Identifiers. */ + override def exitNonReserved(ctx: GlutenCacheFileSqlBaseParser.NonReservedContext): Unit = { + replaceTokenByIdentifier(ctx, 0)(identity) + } + + private def replaceTokenByIdentifier(ctx: ParserRuleContext, stripMargins: Int)( + f: CommonToken => CommonToken = identity): Unit = { + val parent = ctx.getParent + parent.removeLastChild() + val token = ctx.getChild(0).getPayload.asInstanceOf[Token] + val newToken = new CommonToken( + new org.antlr.v4.runtime.misc.Pair(token.getTokenSource, token.getInputStream), + GlutenCacheFileSqlBaseParser.IDENTIFIER, + token.getChannel, + token.getStartIndex + stripMargins, + token.getStopIndex - stripMargins + ) + parent.addChild(new TerminalNodeImpl(f(newToken))) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala index 18fc102bec3d..4a3883c8cc2b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.VariableSubstitution import org.antlr.v4.runtime._ import org.antlr.v4.runtime.atn.PredictionMode -import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} +import org.antlr.v4.runtime.misc.ParseCancellationException import org.antlr.v4.runtime.tree.TerminalNodeImpl import java.util.Locale @@ -256,21 +256,3 @@ case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener { parent.addChild(new TerminalNodeImpl(f(newToken))) } } - -class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { - override def consume(): Unit = wrapped.consume - override def getSourceName(): String = wrapped.getSourceName - override def index(): Int = wrapped.index - override def mark(): Int = wrapped.mark - override def release(marker: Int): Unit = wrapped.release(marker) - override def seek(where: Int): Unit = wrapped.seek(where) - override def size(): Int = wrapped.size - - override def getText(interval: Interval): String = wrapped.getText(interval) - - override def LA(i: Int): Int = { - val la = wrapped.LA(i) - if (la == 0 || la == IntStream.EOF) la - else Character.toUpperCase(la) - } -} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala new file mode 100644 index 000000000000..6ee2aac81c74 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.parser + +import org.antlr.v4.runtime.{CharStream, CodePointCharStream, IntStream} +import org.antlr.v4.runtime.misc.Interval + +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume + override def getSourceName(): String = wrapped.getSourceName + override def index(): Int = wrapped.index + override def mark(): Int = wrapped.mark + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size + + override def getText(interval: Interval): String = wrapped.getText(interval) + + override def LA(i: Int): Int = { + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 8a3bde235887..7f2b94eea314 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -79,9 +79,20 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) context.reply( CacheJobInfo(status = false, "", s"executor: $executorId cache data failed.")) } - case GlutenMergeTreeCacheLoadStatus(jobId) => + case GlutenCacheLoadStatus(jobId) => val status = CHNativeCacheManager.getCacheStatus(jobId) context.reply(status) + case GlutenFilesCacheLoad(files) => + try { + val jobId = CHNativeCacheManager.nativeCacheFiles(files) + context.reply(CacheJobInfo(status = true, jobId)) + } catch { + case e: Exception => + context.reply( + CacheJobInfo( + status = false, + s"executor: $executorId cache data failed. ${e.getMessage}")) + } case e => logError(s"Received unexpected message. $e") } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala index 800b15b9949b..e596e94fed72 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala @@ -39,8 +39,12 @@ object GlutenRpcMessages { case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String]) extends GlutenRpcMessage - case class GlutenMergeTreeCacheLoadStatus(jobId: String) + case class GlutenCacheLoadStatus(jobId: String) case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "") extends GlutenRpcMessage + + case class GlutenFilesCacheLoad(files: Array[Byte]) extends GlutenRpcMessage + + case class GlutenFilesCacheLoadStatus(jobId: String) } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index f32d22d5eac0..bb3cb5acce37 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -16,24 +16,20 @@ */ package org.apache.spark.sql.execution.commands -import org.apache.gluten.exception.GlutenException -import org.apache.gluten.execution.CacheResult -import org.apache.gluten.execution.CacheResult.Status import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.substrait.rel.ExtensionTableBuilder import org.apache.spark.affinity.CHAffinity import org.apache.spark.rpc.GlutenDriverEndpoint -import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad, GlutenMergeTreeCacheLoadStatus} +import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal} import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand -import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.{checkExecutorId, collectJobTriggerResult, toExecutorId, waitAllJobFinish, waitRpcResults} +import org.apache.spark.sql.execution.commands.GlutenCacheBase._ import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} -import org.apache.spark.util.ThreadUtils import org.apache.hadoop.fs.Path @@ -43,7 +39,6 @@ import java.util.{ArrayList => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future -import scala.concurrent.duration.Duration case class GlutenCHCacheDataCommand( onlyMetaCache: Boolean, @@ -140,9 +135,7 @@ case class GlutenCHCacheDataCommand( val executorIdsToAddFiles = scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]() val executorIdsToParts = scala.collection.mutable.Map[String, String]() - executorIdsToAddFiles.put( - GlutenCHCacheDataCommand.ALL_EXECUTORS, - new ArrayBuffer[AddMergeTreeParts]()) + executorIdsToAddFiles.put(ALL_EXECUTORS, new ArrayBuffer[AddMergeTreeParts]()) selectedAddFiles.foreach( addFile => { val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts] @@ -156,7 +149,7 @@ case class GlutenCHCacheDataCommand( if (locations.isEmpty) { // non soft affinity - executorIdsToAddFiles(GlutenCHCacheDataCommand.ALL_EXECUTORS) + executorIdsToAddFiles(ALL_EXECUTORS) .append(mergeTreePart) } else { locations.foreach( @@ -205,9 +198,9 @@ case class GlutenCHCacheDataCommand( } }) val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]() - if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) { + if (executorIdsToParts.contains(ALL_EXECUTORS)) { // send all parts to all executors - val tableMessage = executorIdsToParts(GlutenCHCacheDataCommand.ALL_EXECUTORS) + val tableMessage = executorIdsToParts(ALL_EXECUTORS) GlutenDriverEndpoint.executorDataMap.forEach( (executorId, executor) => { futureList.append( @@ -230,86 +223,7 @@ case class GlutenCHCacheDataCommand( ))) }) } - val resultList = waitRpcResults(futureList) - if (asynExecute) { - val res = collectJobTriggerResult(resultList) - Seq(Row(res._1, res._2.mkString(";"))) - } else { - val res = waitAllJobFinish(resultList) - Seq(Row(res._1, res._2)) - } - } - -} - -object GlutenCHCacheDataCommand { - private val ALL_EXECUTORS = "allExecutors" - - private def toExecutorId(executorId: String): String = - executorId.split("_").last - - def waitAllJobFinish(jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, String) = { - val res = collectJobTriggerResult(jobs) - var status = res._1 - val messages = res._2 - jobs.foreach( - job => { - if (status) { - var complete = false - while (!complete) { - Thread.sleep(5000) - val future_result = GlutenDriverEndpoint.executorDataMap - .get(toExecutorId(job._1)) - .executorEndpointRef - .ask[CacheResult](GlutenMergeTreeCacheLoadStatus(job._2.jobId)) - val result = ThreadUtils.awaitResult(future_result, Duration.Inf) - result.getStatus match { - case Status.ERROR => - status = false - messages.append( - s"executor : {}, failed with message: {};", - job._1, - result.getMessage) - complete = true - case Status.SUCCESS => - complete = true - case _ => - // still running - } - } - } - }) - (status, messages.mkString(";")) - } - - private def collectJobTriggerResult(jobs: ArrayBuffer[(String, CacheJobInfo)]) = { - var status = true - val messages = ArrayBuffer[String]() - jobs.foreach( - job => { - if (!job._2.status) { - messages.append(job._2.reason) - status = false - } - }) - (status, messages) - } - private def waitRpcResults = (futureList: ArrayBuffer[(String, Future[CacheJobInfo])]) => { - val resultList = ArrayBuffer[(String, CacheJobInfo)]() - futureList.foreach( - f => { - resultList.append((f._1, ThreadUtils.awaitResult(f._2, Duration.Inf))) - }) - resultList + getResult(futureList, asynExecute) } - - private def checkExecutorId(executorId: String): Unit = { - if (!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) { - throw new GlutenException( - s"executor $executorId not found," + - s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}") - } - } - } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala new file mode 100644 index 000000000000..c4e9f51bce63 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.commands + +import org.apache.gluten.exception.GlutenException +import org.apache.gluten.execution.CacheResult +import org.apache.gluten.execution.CacheResult.Status + +import org.apache.spark.rpc.GlutenDriverEndpoint +import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenCacheLoadStatus} +import org.apache.spark.sql.Row +import org.apache.spark.util.ThreadUtils + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +object GlutenCacheBase { + def ALL_EXECUTORS: String = "allExecutors" + + def toExecutorId(executorId: String): String = + executorId.split("_").last + + protected def waitRpcResults + : ArrayBuffer[(String, Future[CacheJobInfo])] => ArrayBuffer[(String, CacheJobInfo)] = + (futureList: ArrayBuffer[(String, Future[CacheJobInfo])]) => { + val resultList = ArrayBuffer[(String, CacheJobInfo)]() + futureList.foreach( + f => { + resultList.append((f._1, ThreadUtils.awaitResult(f._2, Duration.Inf))) + }) + resultList + } + + def checkExecutorId(executorId: String): Unit = { + if (!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) { + throw new GlutenException( + s"executor $executorId not found," + + s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}") + } + } + + def waitAllJobFinish( + jobs: ArrayBuffer[(String, CacheJobInfo)], + ask: (String, String) => Future[CacheResult]): (Boolean, String) = { + val res = collectJobTriggerResult(jobs) + var status = res._1 + val messages = res._2 + jobs.foreach( + job => { + if (status) { + var complete = false + while (!complete) { + Thread.sleep(5000) + val future_result = ask(job._1, job._2.jobId) + val result = ThreadUtils.awaitResult(future_result, Duration.Inf) + result.getStatus match { + case Status.ERROR => + status = false + messages.append( + s"executor : {}, failed with message: {};", + job._1, + result.getMessage) + complete = true + case Status.SUCCESS => + complete = true + case _ => + // still running + } + } + } + }) + (status, messages.mkString(";")) + } + + def collectJobTriggerResult( + jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, ArrayBuffer[String]) = { + var status = true + val messages = ArrayBuffer[String]() + jobs.foreach( + job => { + if (!job._2.status) { + messages.append(job._2.reason) + status = false + } + }) + (status, messages) + } + + def getResult( + futureList: ArrayBuffer[(String, Future[CacheJobInfo])], + async: Boolean): Seq[Row] = { + val resultList = waitRpcResults(futureList) + if (async) { + val res = collectJobTriggerResult(resultList) + Seq(Row(res._1, res._2.mkString(";"))) + } else { + val fetchStatus: (String, String) => Future[CacheResult] = + (executorId: String, jobId: String) => { + GlutenDriverEndpoint.executorDataMap + .get(toExecutorId(executorId)) + .executorEndpointRef + .ask[CacheResult](GlutenCacheLoadStatus(jobId)) + } + val res = waitAllJobFinish(resultList, fetchStatus) + Seq(Row(res._1, res._2)) + } + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala new file mode 100644 index 000000000000..0a08df7cebad --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.commands + +import org.apache.gluten.substrait.rel.LocalFilesBuilder +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.affinity.CHAffinity +import org.apache.spark.rpc.GlutenDriverEndpoint +import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoad} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.commands.GlutenCacheBase._ +import org.apache.spark.sql.types.{BooleanType, StringType} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +import java.io.FileNotFoundException +import java.lang.{Long => JLong} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future + +case class GlutenCacheFilesCommand( + async: Boolean, + selectedColumn: Option[Seq[String]], + filePath: String, + propertyOverrides: Map[String, String] +) extends LeafRunnableCommand { + + override def output: Seq[Attribute] = Seq( + AttributeReference("result", BooleanType, nullable = false)(), + AttributeReference("reason", StringType, nullable = false)()) + + override def run(session: SparkSession): Seq[Row] = { + val targetFile = new Path(filePath) + val hadoopConf: Configuration = session.sparkContext.hadoopConfiguration + val fs = targetFile.getFileSystem(hadoopConf) + if (!fs.exists(targetFile)) { + throw new FileNotFoundException(filePath) + } + + val recursive = + if ("true".equalsIgnoreCase(propertyOverrides.getOrElse("recursive", "false"))) { + true + } else { + false + } + + val files: Seq[FileStatus] = listFiles(targetFile, recursive, fs) + val executorIdsToFiles = + scala.collection.mutable.Map[String, ArrayBuffer[FileStatus]]() + executorIdsToFiles.put(ALL_EXECUTORS, new ArrayBuffer[FileStatus]()) + + files.foreach( + fileStatus => { + val locations = CHAffinity.getHostLocations(fileStatus.getPath.toUri.toASCIIString) + if (locations.isEmpty) { + executorIdsToFiles(ALL_EXECUTORS).append(fileStatus) + } else { + locations.foreach( + executor => { + if (!executorIdsToFiles.contains(executor)) { + executorIdsToFiles.put(executor, new ArrayBuffer[FileStatus]()) + } + executorIdsToFiles(executor).append(fileStatus) + }) + } + }) + + val executorIdsToLocalFiles = executorIdsToFiles + .filter(_._2.nonEmpty) + .map { + case (executorId, fileStatusArray) => + val paths = new JArrayList[String]() + val starts = new JArrayList[JLong]() + val lengths = new JArrayList[JLong]() + val partitionColumns = new JArrayList[JMap[String, String]] + + fileStatusArray.foreach( + fileStatus => { + paths.add(fileStatus.getPath.toUri.toASCIIString) + starts.add(JLong.valueOf(0)) + lengths.add(JLong.valueOf(fileStatus.getLen)) + partitionColumns.add(new JHashMap[String, String]()) + }) + + val localFile = LocalFilesBuilder.makeLocalFiles( + null, + paths, + starts, + lengths, + lengths, + new JArrayList[JLong](), + partitionColumns, + new JArrayList[JMap[String, String]](), + ReadFileFormat.ParquetReadFormat, // ignore format in backend + new JArrayList[String](), + new JHashMap[String, String]() + ) + + (executorId, localFile) + } + .toMap + + val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]() + val fileNodeOption = executorIdsToLocalFiles.get(ALL_EXECUTORS) + if (fileNodeOption.isDefined) { + GlutenDriverEndpoint.executorDataMap.forEach( + (executorId, executor) => { + futureList.append( + ( + executorId, + executor.executorEndpointRef.ask[CacheJobInfo]( + GlutenFilesCacheLoad(fileNodeOption.get.toProtobuf.toByteArray)))) + }) + } else { + executorIdsToLocalFiles.foreach { + case (executorId, fileNode) => + checkExecutorId(executorId) + val executor = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(executorId)) + futureList.append( + ( + executorId, + executor.executorEndpointRef.ask[CacheJobInfo]( + GlutenFilesCacheLoad(fileNode.toProtobuf.toByteArray)))) + } + } + + getResult(futureList, async) + } + + private def listFiles(targetFile: Path, recursive: Boolean, fs: FileSystem): Seq[FileStatus] = { + val dirContents = fs + .listStatus(targetFile) + .flatMap(f => addInputPathRecursively(fs, f, recursive)) + .filter(isNonEmptyDataFile) + .toSeq + dirContents + } + + private def addInputPathRecursively( + fs: FileSystem, + files: FileStatus, + recursive: Boolean): Seq[FileStatus] = { + if (files.isFile) { + Seq(files) + } else if (recursive) { + fs.listStatus(files.getPath) + .flatMap( + file => { + if (file.isFile) { + Seq(file) + } else { + addInputPathRecursively(fs, file, recursive) + } + }) + } else { + Seq() + } + } + + private def isNonEmptyDataFile(f: FileStatus): Boolean = { + if (!f.isFile || f.getLen == 0) { + false + } else { + val name = f.getPath.getName + !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) + } + } +} diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index f914eaa1860a..dfc5fbd3b37e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -41,6 +41,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu version(0) + "." + version(1) } + val CH_CONFIG_PREFIX: String = "spark.gluten.sql.columnar.backend.ch.runtime_config" + val CH_SETTING_PREFIX: String = "spark.gluten.sql.columnar.backend.ch.runtime_settings" + val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/" val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/" val S3_ENDPOINT = "s3://127.0.0.1:9000/" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala new file mode 100644 index 000000000000..ed8dcbbc1803 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.tpch + +import org.apache.gluten.execution.{CHNativeCacheManager, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +import org.apache.hadoop.fs.Path + +class GlutenClickHouseHDFSSuite + extends GlutenClickHouseTPCHAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data" + override protected val tpchQueries: String = + rootPath + "../../../../gluten-core/src/test/resources/tpch-queries" + override protected val queriesResults: String = rootPath + "queries-output" + + private val hdfsCachePath = "/tmp/gluten_hdfs_cache/" + private val cache_name = "gluten_cache" + + /** Run Gluten + ClickHouse Backend with SortShuffleManager */ + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "sort") + .set("spark.io.compression.codec", "snappy") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.adaptive.enabled", "true") + .set(s"$CH_CONFIG_PREFIX.use_local_format", "true") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true") + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name) + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath) + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi") + .set(s"$CH_CONFIG_PREFIX.reuse_disk_cache", "false") + .set("spark.sql.adaptive.enabled", "false") + } + + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + override def beforeAll(): Unit = { + super.beforeAll() + deleteCache() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + deleteCache() + } + + private def deleteCache(): Unit = { + val targetFile = new Path(tablesPath) + val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf()) + fs.listStatus(targetFile) + .foreach( + table => { + if (table.isDirectory) { + fs.listStatus(table.getPath) + .foreach( + data => { + if (data.isFile) { + CHNativeCacheManager + .removeFiles(data.getPath.toUri.getPath.substring(1), cache_name) + } + }) + } + }) + clearDataPath(hdfsCachePath) + } + + val runWithoutCache: () => Unit = () => { + runTPCHQuery(6) { + df => + val plans = df.queryExecution.executedPlan.collect { + case scanExec: FileSourceScanExecTransformer => scanExec + } + assert(plans.size == 1) + assert(plans.head.metrics("readMissBytes").value != 0) + } + } + + val runWithCache: () => Unit = () => { + runTPCHQuery(6) { + df => + val plans = df.queryExecution.executedPlan.collect { + case scanExec: FileSourceScanExecTransformer => scanExec + } + assert(plans.size == 1) + assert(plans.head.metrics("readMissBytes").value == 0) + assert(plans.head.metrics("readCacheBytes").value != 0) + } + } + + test("test hdfs cache") { + runWithoutCache() + runWithCache() + } + + test("test cache file command") { + runSql( + s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'", + noFallBack = false) { _ => } + runWithCache() + } + + test("test no cache by query") { + withSQLConf( + s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache" -> "true") { + runWithoutCache() + } + + runWithoutCache() + runWithCache() + } +} diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 8e07eea011b8..9558bf957d4a 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -573,6 +573,18 @@ std::vector BackendInitializerUtil::wrapDiskPathConfig( std::vector changed_paths; if (path_prefix.empty() && path_suffix.empty()) return changed_paths; + + auto change_func = [&](String key) -> void + { + if (const String value = config.getString(key, ""); value != "") + { + const String change_value = path_prefix + value + path_suffix; + config.setString(key, change_value); + changed_paths.emplace_back(change_value); + LOG_INFO(getLogger("BackendInitializerUtil"), "Change config `{}` from '{}' to {}.", key, value, change_value); + } + }; + Poco::Util::AbstractConfiguration::Keys disks; std::unordered_set disk_types = {"s3_gluten", "hdfs_gluten", "cache"}; config.keys("storage_configuration.disks", disks); @@ -586,26 +598,14 @@ std::vector BackendInitializerUtil::wrapDiskPathConfig( if (!disk_types.contains(disk_type)) return; if (disk_type == "cache") - { - String path = config.getString(disk_prefix + ".path", ""); - if (!path.empty()) - { - String final_path = path_prefix + path + path_suffix; - config.setString(disk_prefix + ".path", final_path); - changed_paths.emplace_back(final_path); - } - } + change_func(disk_prefix + ".path"); else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten") - { - String metadata_path = config.getString(disk_prefix + ".metadata_path", ""); - if (!metadata_path.empty()) - { - String final_path = path_prefix + metadata_path + path_suffix; - config.setString(disk_prefix + ".metadata_path", final_path); - changed_paths.emplace_back(final_path); - } - } + change_func(disk_prefix + ".metadata_path"); }); + + change_func("path"); + change_func("gluten_cache.local.path"); + return changed_paths; } diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index c91b7264db31..a92155d14ea1 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -195,6 +195,8 @@ class BackendInitializerUtil inline static const String GLUTEN_TASK_OFFHEAP = "spark.gluten.memory.task.offHeap.size.in.bytes"; + inline static const String GLUTEN_LOCAL_CACHE_PREFIX = "gluten_cache.local."; + /// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which /// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written. /// So we need to get the spark user from env and pass it to libhdfs3. diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index d0e2e9dee8b5..38c4ce162138 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace local_engine { @@ -134,13 +135,17 @@ struct HdfsConfig { inline static const String HDFS_ASYNC = "hdfs.enable_async_io"; - bool hdfs_async = true; + bool hdfs_async; - static HdfsConfig loadFromContext(DB::ContextPtr context) + static HdfsConfig loadFromContext(const Poco::Util::AbstractConfiguration & config, const DB::ReadSettings & read_settings) { - HdfsConfig config; - config.hdfs_async = context->getConfigRef().getBool(HDFS_ASYNC, true); - return config; + HdfsConfig hdfs; + if (read_settings.enable_filesystem_cache) + hdfs.hdfs_async = false; + else + hdfs.hdfs_async = config.getBool(HDFS_ASYNC, true); + + return hdfs; } }; @@ -159,10 +164,17 @@ struct S3Config static S3Config loadFromContext(DB::ContextPtr context) { S3Config config; - config.s3_local_cache_enabled = context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false); - config.s3_local_cache_max_size = context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB); - config.s3_local_cache_cache_path = context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, ""); - config.s3_gcs_issue_compose_request = context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false); + + if (context->getConfigRef().has("S3_LOCAL_CACHE_ENABLE")) + { + LOG_WARNING(&Poco::Logger::get("S3Config"), "Config {} has deprecated.", S3_LOCAL_CACHE_ENABLE); + + config.s3_local_cache_enabled = context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false); + config.s3_local_cache_max_size = context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB); + config.s3_local_cache_cache_path = context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, ""); + config.s3_gcs_issue_compose_request = context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false); + } + return config; } }; diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index e5f5dd5dccdb..ff9c151159a6 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -77,14 +77,19 @@ int64_t QueryContextManager::initializeQuery() DB::ContextMutablePtr QueryContextManager::currentQueryContext() { - if (!CurrentThread::getGroup()) - { - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); - } + auto thread_group = currentThreadGroup(); int64_t id = reinterpret_cast(CurrentThread::getGroup().get()); return query_map.get(id)->query_context; } +std::shared_ptr QueryContextManager::currentThreadGroup() +{ + if (auto thread_group = CurrentThread::getGroup()) + return thread_group; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); +} + void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) { if (!CurrentThread::getGroup()) diff --git a/cpp-ch/local-engine/Common/QueryContext.h b/cpp-ch/local-engine/Common/QueryContext.h index 0fbf4977321f..4770327d1715 100644 --- a/cpp-ch/local-engine/Common/QueryContext.h +++ b/cpp-ch/local-engine/Common/QueryContext.h @@ -30,6 +30,7 @@ class QueryContextManager } int64_t initializeQuery(); DB::ContextMutablePtr currentQueryContext(); + static std::shared_ptr currentThreadGroup(); void logCurrentPerformanceCounters(ProfileEvents::Counters& counters); size_t currentPeakMemory(int64_t id); void finalizeQuery(int64_t id); diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 7b8b4cfd95a8..e138642607c4 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -15,15 +15,63 @@ * limitations under the License. */ #include "RelMetric.h" + #include #include #include +#include +#include using namespace rapidjson; +namespace ProfileEvents +{ +extern const Event FileSegmentWaitReadBufferMicroseconds; +extern const Event FileSegmentReadMicroseconds; +extern const Event FileSegmentCacheWriteMicroseconds; +extern const Event FileSegmentPredownloadMicroseconds; +extern const Event FileSegmentUsedBytes; + +extern const Event CachedReadBufferReadFromSourceMicroseconds; +extern const Event CachedReadBufferReadFromCacheMicroseconds; +extern const Event CachedReadBufferCacheWriteMicroseconds; +extern const Event CachedReadBufferReadFromSourceBytes; +extern const Event CachedReadBufferReadFromCacheBytes; +extern const Event CachedReadBufferCacheWriteBytes; +extern const Event CachedReadBufferCreateBufferMicroseconds; + +extern const Event CachedReadBufferReadFromCacheHits; +extern const Event CachedReadBufferReadFromCacheMisses; +} + namespace local_engine { +static void writeCacheHits(Writer & writer) +{ + const auto thread_group = QueryContextManager::currentThreadGroup(); + auto & counters = thread_group->performance_counters; + auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load(); + auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load(); + auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load(); + auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load(); + auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000; + auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000; + + writer.Key("read_cache_hits"); + writer.Uint64(read_cache_hits); + writer.Key("miss_cache_hits"); + writer.Uint64(miss_cache_hits); + writer.Key("read_cache_bytes"); + writer.Uint64(read_cache_bytes); + writer.Key("read_miss_bytes"); + writer.Uint64(read_miss_bytes); + writer.Key("read_cache_millisecond"); + writer.Uint64(read_cache_millisecond); + writer.Key("miss_cache_millisecond"); + writer.Uint64(miss_cache_millisecond); +} + RelMetric::RelMetric(size_t id_, const String & name_, std::vector & steps_) : id(id_), name(name_), steps(steps_) { } @@ -117,7 +165,7 @@ void RelMetric::serialize(Writer & writer, bool) const } writer.EndArray(); - if (auto read_mergetree = dynamic_cast(step)) + if (auto read_mergetree = dynamic_cast(step)) { auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk; auto selected_marks = read_mergetree->getAnalysisResult().selected_marks; @@ -128,6 +176,11 @@ void RelMetric::serialize(Writer & writer, bool) const writer.Uint64(selected_marks); writer.Key("total_marks_pk"); writer.Uint64(total_marks_pk); + writeCacheHits(writer); + } + else if (dynamic_cast(step)) + { + writeCacheHits(writer); } writer.EndObject(); diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index a97f0c72ada4..0dc852a90110 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -16,20 +16,21 @@ */ #include "CacheManager.h" +#include #include #include #include -#include +#include #include -#include -#include +#include #include #include #include #include +#include #include +#include #include -#include #include @@ -178,4 +179,62 @@ jobject CacheManager::getCacheStatus(JNIEnv * env, const String & jobId) } return env->NewObject(cache_result_class, cache_result_constructor, status, charTojstring(env, message.c_str())); } + +Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder) +{ + auto task = [file, read_buffer_builder, context = this->context]() + { + LOG_INFO(getLogger("CacheManager"), "Loading cache file {}", file.uri_file()); + + try + { + std::unique_ptr rb = read_buffer_builder->build(file); + while (!rb->eof()) + rb->ignoreAll(); + } + catch (std::exception & e) + { + LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n {}", file.uri_file(), e.what()); + std::rethrow_exception(std::current_exception()); + } + }; + + return std::move(task); +} + +JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos) +{ + JobId id = toString(UUIDHelpers::generateV4()); + Job job(id); + + if (file_infos.items_size()) + { + const Poco::URI file_uri(file_infos.items().Get(0).uri_file()); + const auto read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context); + + if (read_buffer_builder->file_cache) + for (const auto & file : file_infos.items()) + job.addTask(cacheFile(file, read_buffer_builder)); + else + LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because cache not enabled."); + } + + auto & scheduler = JobScheduler::instance(); + scheduler.scheduleJob(std::move(job)); + return id; +} + +void CacheManager::removeFiles(String file, String cache_name) +{ + // only for ut + for (const auto & [name, file_cache] : FileCacheFactory::instance().getAll()) + { + if (name != cache_name) + continue; + + if (const auto cache = file_cache->cache) + cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id); + } +} + } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index b88a3ea03e4e..6335f86bb162 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -15,9 +15,13 @@ * limitations under the License. */ #pragma once + +#include + #include #include #include +#include namespace local_engine { @@ -40,6 +44,10 @@ class CacheManager { Task cachePart(const MergeTreeTable& table, const MergeTreePart& part, const std::unordered_set& columns); JobId cacheParts(const String& table_def, const std::unordered_set& columns); static jobject getCacheStatus(JNIEnv * env, const String& jobId); + + Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder); + JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos); + static void removeFiles(String file, String cache_name); private: CacheManager() = default; DB::ContextMutablePtr context; diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index da15890070b0..b32073db53c4 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -35,7 +35,7 @@ #include #include #include -#include +#include #include #include #include @@ -49,7 +49,6 @@ #include #include #include -#include #include #include @@ -77,8 +76,6 @@ namespace ErrorCodes } } -namespace fs = std::filesystem; - namespace local_engine { template @@ -205,6 +202,7 @@ class LocalFileReadBufferBuilder : public ReadBufferBuilder #if USE_HDFS class HDFSFileReadBufferBuilder : public ReadBufferBuilder { + using ReadBufferCreator = std::function(bool restricted_seek, const DB::StoredObject & object)>; public: explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_), context(context_) { } ~HDFSFileReadBufferBuilder() override = default; @@ -212,18 +210,21 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override { - auto config = HdfsConfig::loadFromContext(context); + DB::ReadSettings read_settings = getReadSettings(context); + auto & config = context->getConfigRef(); + auto hdfs_config = HdfsConfig::loadFromContext(config, read_settings); Poco::URI file_uri(file_info.uri_file()); std::string uri_path = "hdfs://" + file_uri.getHost(); if (file_uri.getPort()) - uri_path += ":" + std::to_string(file_uri.getPort()); + uri_path += ":" + std::to_string(static_cast(file_uri.getPort())); - DB::ReadSettings read_settings; - std::unique_ptr read_buffer; + size_t read_util_position = 0; + size_t read_begin = 0; if (set_read_util_position) { std::pair start_end_pos - = adjustFileReadStartAndEndPos(file_info.start(), file_info.start() + file_info.length(), uri_path, file_uri.getPath()); + = adjustFileReadStartAndEndPos(file_info.start(), file_info.start() + file_info.length(), uri_path, file_uri.getPath()); + LOG_DEBUG( &Poco::Logger::get("ReadBufferBuilder"), "File read start and end position adjusted from {},{} to {},{}", @@ -232,34 +233,57 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder start_end_pos.first, start_end_pos.second); - auto read_buffer_impl = std::make_unique( - uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, start_end_pos.second, true); - if (config.hdfs_async) - { - auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - read_buffer = std::make_unique(pool_reader, read_settings, std::move(read_buffer_impl)); - } - else - read_buffer = std::move(read_buffer_impl); + read_begin = start_end_pos.first; + read_util_position = start_end_pos.second; + } - if (auto * seekable_in = dynamic_cast(read_buffer.get())) - if (start_end_pos.first) - seekable_in->seek(start_end_pos.first, SEEK_SET); + size_t file_size = 0; + if (file_info.has_properties()) + file_size = file_info.properties().filesize(); + + std::unique_ptr read_buffer; + + if (hdfs_config.hdfs_async) + { + std::optional size = std::nullopt; + if (file_size) + size = file_size; + + auto read_buffer_impl = std::make_shared( + uri_path, file_uri.getPath(), config, read_settings, read_util_position, true, size); + auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); + read_buffer = std::make_unique(pool_reader, read_settings, std::move(read_buffer_impl)); } else { - auto read_buffer_impl - = std::make_unique(uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, 0, true); - if (config.hdfs_async) + if (!file_size) { - read_buffer = std::make_unique( - context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), - read_settings, - std::move(read_buffer_impl)); + // only for spark3.2 file partition not contained file size + // so first compute file size first + auto read_buffer_impl = std::make_unique( + uri_path, file_uri.getPath(), config, read_settings, read_util_position, true); + file_size = read_buffer_impl->getFileSize(); } - else - read_buffer = std::move(read_buffer_impl); + + ReadBufferCreator hdfs_read_buffer_creator + = [this, hdfs_uri = uri_path, hdfs_file_path = file_uri.getPath(), read_settings, &config, read_util_position]( + bool /* restricted_seek */, const DB::StoredObject & object) -> std::unique_ptr + { + return std::make_unique( + hdfs_uri, hdfs_file_path, config, read_settings, read_util_position, true, object.bytes_size); + }; + + DB::StoredObjects stored_objects{DB::StoredObject{file_uri.getPath().substr(1), "", file_size}}; + auto cache_hdfs_read = std::make_unique( + std::move(hdfs_read_buffer_creator), stored_objects, "hdfs:", read_settings, nullptr, /* use_external_buffer */ false); + cache_hdfs_read->setReadUntilPosition(read_util_position); + read_buffer = std::move(cache_hdfs_read); } + + if (set_read_util_position && read_begin) + if (auto * seekable_in = dynamic_cast(read_buffer.get())) + seekable_in->seek(read_begin, SEEK_SET); + return read_buffer; } @@ -367,6 +391,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder result.second = get_next_line_pos(fs.get(), fin, read_end_pos, hdfs_file_size); return result; } + private: DB::ContextPtr context; }; @@ -382,23 +407,19 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { auto config = S3Config::loadFromContext(context); - new_settings = context->getReadSettings(); - new_settings.enable_filesystem_cache = config.s3_local_cache_enabled; - - if (new_settings.enable_filesystem_cache) + // use gluten cache config is first priority + if (!file_cache && config.s3_local_cache_enabled) { DB::FileCacheSettings file_cache_settings; file_cache_settings.max_size = config.s3_local_cache_max_size; auto cache_base_path = config.s3_local_cache_cache_path; - if (!fs::exists(cache_base_path)) - fs::create_directories(cache_base_path); + if (!std::filesystem::exists(cache_base_path)) + std::filesystem::create_directories(cache_base_path); file_cache_settings.base_path = cache_base_path; file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, ""); file_cache->initialize(); - - new_settings.remote_fs_cache = file_cache; } } @@ -407,6 +428,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override { + DB::ReadSettings read_settings = getReadSettings(context); Poco::URI file_uri(file_info.uri_file()); // file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet const std::string& bucket = file_uri.getHost(); @@ -416,9 +438,8 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder size_t object_size = object_info.size; Int64 object_modified_time = object_info.last_modification_time; - if (new_settings.enable_filesystem_cache) + if (read_settings.enable_filesystem_cache) { - auto file_cache_key = DB::FileCacheKey(key); auto last_cache_time = files_cache_time_map.get(file_cache_key); // quick check @@ -436,7 +457,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder } auto read_buffer_creator - = [bucket, client, this](bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr + = [bucket, client, read_settings, this](bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr { return std::make_unique( client, @@ -444,7 +465,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder object.remote_path, "", DB::S3::RequestSettings(), - new_settings, + read_settings, /* use_external_buffer */ true, /* offset */ 0, /* read_until_position */0, @@ -453,11 +474,11 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}}; auto s3_impl = std::make_unique( - std::move(read_buffer_creator), stored_objects, "s3:" + bucket + "/", new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true); + std::move(read_buffer_creator), stored_objects, "s3:" + bucket + "/", read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true); auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); auto async_reader - = std::make_unique(std::move(s3_impl), pool_reader, new_settings, nullptr, nullptr); + = std::make_unique(std::move(s3_impl), pool_reader, read_settings, nullptr, nullptr); if (set_read_util_position) { @@ -478,7 +499,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder async_reader->setReadUntilEnd(); } - if (new_settings.remote_fs_prefetch) + if (read_settings.remote_fs_prefetch) async_reader->prefetch(Priority{}); return async_reader; @@ -488,7 +509,6 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder static const std::string SHARED_CLIENT_KEY; static ConcurrentLRU> per_bucket_clients; static FileCacheConcurrentMap files_cache_time_map; - DB::ReadSettings new_settings; DB::FileCachePtr file_cache; std::string & stripQuote(std::string & s) @@ -732,6 +752,57 @@ void registerReadBufferBuilders() #endif } +ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr context_) : context(context_) +{ + const auto & config = context->getConfigRef(); + if (config.getBool("gluten_cache.local.enabled", false)) + { + DB::FileCacheSettings file_cache_settings; + + file_cache_settings.loadFromConfig(config, "gluten_cache.local"); + + if (std::filesystem::path(file_cache_settings.base_path).is_relative()) + file_cache_settings.base_path = std::filesystem::path(context->getPath()) / "caches" / file_cache_settings.base_path; + + if (!std::filesystem::exists(file_cache_settings.base_path)) + std::filesystem::create_directories(file_cache_settings.base_path); + + auto name = config.getString("gluten_cache.local.name"); + auto * config_prefix = ""; + file_cache = DB::FileCacheFactory::instance().getOrCreate(name, file_cache_settings, config_prefix); + file_cache->initialize(); + } +} + +DB::ReadSettings ReadBufferBuilder::getReadSettings(DB::ContextPtr context) const +{ + DB::ReadSettings read_settings = context->getReadSettings(); + if (file_cache) + { + read_settings.enable_filesystem_cache = true; + read_settings.remote_fs_cache = file_cache; + } + else + { + read_settings.enable_filesystem_cache = false; + } + + return read_settings; +} + + +std::unique_ptr +ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) +{ + auto in = build(file_info, set_read_util_position); + + /// Wrap the read buffer with compression method if exists + Poco::URI file_uri(file_info.uri_file()); + DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto"); + return compression != DB::CompressionMethod::None ? DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) : std::move(in); +} + + ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance() { static ReadBufferBuilderFactory instance; diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h index f5218f0aa5de..92d8d41c1290 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h @@ -15,21 +15,21 @@ * limitations under the License. */ #pragma once + #include #include #include -#include -#include -#include #include -#include + namespace local_engine { + class ReadBufferBuilder { public: - explicit ReadBufferBuilder(DB::ContextPtr context_) : context(context_) { } + explicit ReadBufferBuilder(DB::ContextPtr context_); + virtual ~ReadBufferBuilder() = default; /// build a new read buffer @@ -37,19 +37,14 @@ class ReadBufferBuilder build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false) = 0; /// build a new read buffer, consider compression method - std::unique_ptr buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false) - { - auto in = build(file_info, set_read_util_position); - - /// Wrap the read buffer with compression method if exists - Poco::URI file_uri(file_info.uri_file()); - DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto"); - return compression != DB::CompressionMethod::None ? DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) - : std::move(in); - } + std::unique_ptr buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false); protected: + DB::ReadSettings getReadSettings(DB::ContextPtr context) const; DB::ContextPtr context; + +public: + DB::FileCachePtr file_cache = nullptr; }; using ReadBufferBuilderPtr = std::shared_ptr; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 3c3d6d4f89c2..f27da2f92048 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1292,6 +1292,30 @@ JNIEXPORT jobject Java_org_apache_gluten_execution_CHNativeCacheManager_nativeGe return local_engine::CacheManager::instance().getCacheStatus(env, jstring2string(env, id)); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); } + +JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheFiles(JNIEnv * env, jobject, jbyteArray files) +{ + LOCAL_ENGINE_JNI_METHOD_START + const auto files_bytes = local_engine::getByteArrayElementsSafe(env, files); + const std::string::size_type files_bytes_size = files_bytes.length(); + std::string_view files_view = {reinterpret_cast(files_bytes.elems()), files_bytes_size}; + substrait::ReadRel::LocalFiles local_files = local_engine::BinaryToMessage(files_view); + + auto jobId = local_engine::CacheManager::instance().cacheFiles(local_files); + return local_engine::charTojstring(env, jobId.c_str()); + LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); +} + +JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, jobject, jstring file_, jstring cache_name_) +{ + LOCAL_ENGINE_JNI_METHOD_START + auto file = jstring2string(env, file_); + auto cache_name = jstring2string(env, cache_name_); + + local_engine::CacheManager::removeFiles(file, cache_name); + LOCAL_ENGINE_JNI_METHOD_END(env, ); +} + #ifdef __cplusplus }