diff --git a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4 b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
new file mode 100644
index 0000000000000..abdb0cdbf81ef
--- /dev/null
+++ b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar GlutenCacheFileSqlBase;
+
+@members {
+ /**
+ * Verify whether current token is a valid decimal token (which contains dot).
+ * Returns true if the character that follows the token is not a digit or letter or underscore.
+ *
+ * For example:
+ * For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'.
+ * For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'.
+ * For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'.
+ * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is folllowed
+ * by a space. 34.E2 is a valid decimal token because it is followed by symbol '+'
+ * which is not a digit or letter or underscore.
+ */
+ public boolean isValidDecimal() {
+ int nextChar = _input.LA(1);
+ if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' ||
+ nextChar == '_') {
+ return false;
+ } else {
+ return true;
+ }
+ }
+}
+
+tokens {
+ DELIMITER
+}
+
+singleStatement
+ : statement ';'* EOF
+ ;
+
+statement
+ : CACHE FILES ASYNC? SELECT selectedColumns=selectedColumnNames
+ FROM (path=STRING)
+ (CACHEPROPERTIES cacheProps=propertyList)? #cacheFiles
+ | .*? #passThrough
+ ;
+
+selectedColumnNames
+ : ASTERISK
+ | identifier (COMMA identifier)*
+ ;
+
+propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | stringLit
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN
+ | value=stringLit
+ ;
+
+stringLit
+ : STRING
+ | DOUBLEQUOTED_STRING
+ ;
+
+booleanValue
+ : TRUE | FALSE
+ ;
+
+identifier
+ : IDENTIFIER #unquotedIdentifier
+ | quotedIdentifier #quotedIdentifierAlternative
+ | nonReserved #unquotedIdentifier
+ ;
+
+quotedIdentifier
+ : BACKQUOTED_IDENTIFIER
+ ;
+
+// Add keywords here so that people's queries don't break if they have a column name as one of
+// these tokens
+nonReserved
+ : CACHE | FILES | ASYNC
+ | SELECT | FOR | AFTER | CACHEPROPERTIES
+ | TIMESTAMP | AS | OF | DATE_PARTITION
+ |
+ ;
+
+// Define how the keywords above should appear in a user's SQL statement.
+CACHE: 'CACHE';
+META: 'META';
+ASYNC: 'ASYNC';
+SELECT: 'SELECT';
+COMMA: ',';
+FOR: 'FOR';
+FROM: 'FROM';
+AFTER: 'AFTER';
+CACHEPROPERTIES: 'CACHEPROPERTIES';
+DOT: '.';
+ASTERISK: '*';
+TIMESTAMP: 'TIMESTAMP';
+AS: 'AS';
+OF: 'OF';
+DATE_PARTITION: 'DATE_PARTITION';
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+FILES: 'FILES';
+
+EQ : '=' | '==';
+NSEQ: '<=>';
+NEQ : '<>';
+NEQJ: '!=';
+LTE : '<=' | '!>';
+GTE : '>=' | '!<';
+CONCAT_PIPE: '||';
+
+STRING
+ : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+ | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+ ;
+
+DOUBLEQUOTED_STRING
+ :'"' ( ~('"'|'\\') | ('\\' .) )* '"'
+ ;
+
+BIGINT_LITERAL
+ : DIGIT+ 'L'
+ ;
+
+SMALLINT_LITERAL
+ : DIGIT+ 'S'
+ ;
+
+TINYINT_LITERAL
+ : DIGIT+ 'Y'
+ ;
+
+INTEGER_VALUE
+ : DIGIT+
+ ;
+
+DECIMAL_VALUE
+ : DIGIT+ EXPONENT
+ | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}?
+ ;
+
+DOUBLE_LITERAL
+ : DIGIT+ EXPONENT? 'D'
+ | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+ ;
+
+BIGDECIMAL_LITERAL
+ : DIGIT+ EXPONENT? 'BD'
+ | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+ ;
+
+IDENTIFIER
+ : (LETTER | DIGIT | '_')+
+ ;
+
+BACKQUOTED_IDENTIFIER
+ : '`' ( ~'`' | '``' )* '`'
+ ;
+
+fragment DECIMAL_DIGITS
+ : DIGIT+ '.' DIGIT*
+ | '.' DIGIT+
+ ;
+
+fragment EXPONENT
+ : 'E' [+-]? DIGIT+
+ ;
+
+fragment DIGIT
+ : [0-9]
+ ;
+
+fragment LETTER
+ : [A-Z]
+ ;
+
+SIMPLE_COMMENT
+ : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
+ ;
+
+BRACKETED_COMMENT
+ : '/*' .*? '*/' -> channel(HIDDEN)
+ ;
+
+WS : [ \r\n\t]+ -> channel(HIDDEN)
+ ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+ : .
+ ;
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 0000000000000..48d26498f1214
--- /dev/null
+++ b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenCacheFileSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+}
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 0000000000000..9a0cde7728434
--- /dev/null
+++ b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenCacheFileSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 0000000000000..9a0cde7728434
--- /dev/null
+++ b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenCacheFileSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 7b765924fa0da..4033d8c6b1ccc 100644
--- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
+++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -30,4 +30,9 @@ public static CacheResult getCacheStatus(String jobId) {
}
private static native CacheResult nativeGetCacheStatus(String jobId);
+
+ public static native String nativeCacheFiles(byte[] files);
+
+ // only for ut
+ public static native void removeFiles(String file, String cacheName);
}
diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
index 0fa69e0d0b1fd..b6d538039ec42 100644
--- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
+++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
@@ -16,7 +16,9 @@
*/
package org.apache.gluten.execution;
-public class CacheResult {
+import java.io.Serializable;
+
+public class CacheResult implements Serializable {
public enum Status {
RUNNING(0),
SUCCESS(1),
diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
index f95aaa323c109..39dd949658358 100644
--- a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
+++ b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
@@ -35,6 +35,24 @@ public class MetricsStep {
@JsonProperty("selected_marks")
protected long selectedMarks;
+ @JsonProperty("read_cache_hits")
+ protected long readCacheHits;
+
+ @JsonProperty("miss_cache_hits")
+ protected long missCacheHits;
+
+ @JsonProperty("read_cache_bytes")
+ protected long readCacheBytes;
+
+ @JsonProperty("read_miss_bytes")
+ protected long readMissBytes;
+
+ @JsonProperty("read_cache_millisecond")
+ protected long readCacheMillisecond;
+
+ @JsonProperty("miss_cache_millisecond")
+ protected long missCacheMillisecond;
+
public String getName() {
return name;
}
@@ -82,4 +100,52 @@ public long getTotalMarksPk() {
public long getSelectedMarksPk() {
return selectedMarksPk;
}
+
+ public long getReadCacheHits() {
+ return readCacheHits;
+ }
+
+ public void setReadCacheHits(long readCacheHits) {
+ this.readCacheHits = readCacheHits;
+ }
+
+ public long getMissCacheHits() {
+ return missCacheHits;
+ }
+
+ public void setMissCacheHits(long missCacheHits) {
+ this.missCacheHits = missCacheHits;
+ }
+
+ public long getReadCacheBytes() {
+ return readCacheBytes;
+ }
+
+ public void setReadCacheBytes(long readCacheBytes) {
+ this.readCacheBytes = readCacheBytes;
+ }
+
+ public long getReadMissBytes() {
+ return readMissBytes;
+ }
+
+ public void setReadMissBytes(long readMissBytes) {
+ this.readMissBytes = readMissBytes;
+ }
+
+ public long getReadCacheMillisecond() {
+ return readCacheMillisecond;
+ }
+
+ public void setReadCacheMillisecond(long readCacheMillisecond) {
+ this.readCacheMillisecond = readCacheMillisecond;
+ }
+
+ public long getMissCacheMillisecond() {
+ return missCacheMillisecond;
+ }
+
+ public void setMissCacheMillisecond(long missCacheMillisecond) {
+ this.missCacheMillisecond = missCacheMillisecond;
+ }
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 7519580b9cb74..c77d5726222c4 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -164,6 +165,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
+ val fileSizes = new JArrayList[JLong]()
+ val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
f.files.foreach {
file =>
@@ -173,6 +176,16 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
// TODO: Support custom partition location
val partitionColumn = new JHashMap[String, String]()
partitionColumns.add(partitionColumn)
+ val (fileSize, modificationTime) =
+ SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
+ (fileSize, modificationTime) match {
+ case (Some(size), Some(time)) =>
+ fileSizes.add(JLong.valueOf(size))
+ modificationTimes.add(JLong.valueOf(time))
+ case _ =>
+ fileSizes.add(0)
+ modificationTimes.add(0)
+ }
}
val preferredLocations =
CHAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())
@@ -181,8 +194,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
paths,
starts,
lengths,
- new JArrayList[JLong](),
- new JArrayList[JLong](),
+ fileSizes,
+ modificationTimes,
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 9058bffd8d57a..c53448cdd8586 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -128,7 +128,25 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"),
"selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected marks"),
- "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary")
+ "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary"),
+ "readCacheHits" -> SQLMetrics.createMetric(
+ sparkContext,
+ "Number of times the read from filesystem cache hit the cache"),
+ "missCacheHits" -> SQLMetrics.createMetric(
+ sparkContext,
+ "Number of times the read from filesystem cache miss the cache"),
+ "readCacheBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "Bytes read from filesystem cache"),
+ "readMissBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "Bytes read from filesystem cache source (from remote fs, etc)"),
+ "readCacheMillisecond" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "Time reading from filesystem cache"),
+ "missCacheMillisecond" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "Time reading from filesystem cache source (from remote filesystem, etc)")
)
override def genFileSourceScanTransformerMetricsUpdater(
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 8fdc2645a5fb1..db2414be48b0a 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.extension.{CommonSubexpressionEliminateRule, CountDisti
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
-import org.apache.gluten.parser.GlutenClickhouseSqlParser
+import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
@@ -622,7 +622,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
override def genInjectExtendedParser()
: List[(SparkSession, ParserInterface) => ParserInterface] = {
- List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
+ List(
+ (spark, parserInterface) => new GlutenCacheFilesSqlParser(spark, parserInterface),
+ (spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface)
+ )
}
/** Define backend specfic expression mappings. */
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index f44c5ed1a1dde..4dcae8feb92bb 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -35,9 +35,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
- val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
- val selected_marks: SQLMetric = metrics("selectedMarks")
- val total_marks_pk: SQLMetric = metrics("totalMarksPk")
+ val selectedMarksPK: SQLMetric = metrics("selectedMarksPk")
+ val selectedMarks: SQLMetric = metrics("selectedMarks")
+ val totalMarksPK: SQLMetric = metrics("totalMarksPk")
+ val readCacheHits: SQLMetric = metrics("readCacheHits")
+ val missCacheHits: SQLMetric = metrics("missCacheHits")
+ val readCacheBytes: SQLMetric = metrics("readCacheBytes")
+ val readMissBytes: SQLMetric = metrics("readMissBytes")
+ val readCacheMillisecond: SQLMetric = metrics("readCacheMillisecond")
+ val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond")
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -56,9 +62,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
metricsData.getSteps.forEach(
step => {
- selected_marks_pk += step.selectedMarksPk
- selected_marks += step.selectedMarks
- total_marks_pk += step.totalMarksPk
+ selectedMarksPK += step.selectedMarksPk
+ selectedMarks += step.selectedMarks
+ totalMarksPK += step.totalMarksPk
+ readCacheHits += step.readCacheHits
+ missCacheHits += step.missCacheHits
+ readCacheBytes += step.readCacheBytes
+ readMissBytes += step.readMissBytes
+ readCacheMillisecond += step.readCacheMillisecond
+ missCacheMillisecond += step.missCacheMillisecond
})
MetricsUtil.updateExtraTimeMetric(
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
new file mode 100644
index 0000000000000..b031dcf7a1b49
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.gluten.sql.parser.{GlutenCacheFileSqlBaseBaseListener, GlutenCacheFileSqlBaseBaseVisitor, GlutenCacheFileSqlBaseLexer, GlutenCacheFileSqlBaseParser}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.execution.commands.GlutenCacheFilesCommand
+import org.apache.spark.sql.internal.VariableSubstitution
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.ParseCancellationException
+import org.antlr.v4.runtime.tree.TerminalNodeImpl
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+trait GlutenCacheFileSqlParserBase extends ParserInterface {
+ protected val astBuilder = new GlutenCacheFileSqlAstBuilder
+ protected val substitution = new VariableSubstitution
+
+ protected def parse[T](command: String)(toResult: GlutenCacheFileSqlBaseParser => T): T = {
+ val lexer = new GlutenCacheFileSqlBaseLexer(
+ new UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command))))
+ lexer.removeErrorListeners()
+ lexer.addErrorListener(ParseErrorListener)
+
+ val tokenStream = new CommonTokenStream(lexer)
+ val parser = new GlutenCacheFileSqlBaseParser(tokenStream)
+
+ parser.addParseListener(GlutenCacheFileSqlPostProcessor)
+ parser.removeErrorListeners()
+ parser.addErrorListener(ParseErrorListener)
+
+ try {
+ try {
+ // first, try parsing with potentially faster SLL mode
+ parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+ toResult(parser)
+ } catch {
+ case e: ParseCancellationException =>
+ // if we fail, parse with LL mode
+ tokenStream.seek(0) // rewind input stream
+ parser.reset()
+
+ // Try Again.
+ parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+ toResult(parser)
+ }
+ } catch {
+ case e: ParseException if e.command.isDefined =>
+ throw e
+ case e: ParseException =>
+ throw e.withCommand(command)
+ case e: AnalysisException =>
+ val position = Origin(e.line, e.startPosition)
+ throw new ParseException(
+ command = Option(command),
+ message = e.message,
+ start = position,
+ stop = position,
+ errorClass = Some("GLUTEN_CACHE_FILE_PARSING_ANALYSIS_ERROR"))
+ }
+ }
+}
+
+class GlutenCacheFileSqlAstBuilder extends GlutenCacheFileSqlBaseBaseVisitor[AnyRef] {
+ import org.apache.spark.sql.catalyst.parser.ParserUtils._
+
+ /** Convert a property list into a key-value map. */
+ override def visitPropertyList(
+ ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String, String] =
+ withOrigin(ctx) {
+ val properties = ctx.property.asScala.map {
+ property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
+ }
+
+ /**
+ * A property key can either be String or a collection of dot separated elements. This function
+ * extracts the property key based on whether its a string literal or a property identifier.
+ */
+ override def visitPropertyKey(key: GlutenCacheFileSqlBaseParser.PropertyKeyContext): String = {
+ if (key.stringLit() != null) {
+ string(visitStringLit(key.stringLit()))
+ } else {
+ key.getText
+ }
+ }
+
+ /**
+ * A property value can be String, Integer, Boolean or Decimal. This function extracts the
+ * property value based on whether its a string, integer, boolean or decimal literal.
+ */
+ override def visitPropertyValue(
+ value: GlutenCacheFileSqlBaseParser.PropertyValueContext): String = {
+ if (value == null) {
+ null
+ } else if (value.identifier != null) {
+ value.identifier.getText
+ } else if (value.value != null) {
+ string(visitStringLit(value.value))
+ } else if (value.booleanValue != null) {
+ value.getText.toLowerCase(Locale.ROOT)
+ } else {
+ value.getText
+ }
+ }
+
+ def visitPropertyKeyValues(
+ ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String, String] = {
+ val props = visitPropertyList(ctx)
+ val badKeys = props.collect { case (key, null) => key }
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}",
+ ctx)
+ }
+ props
+ }
+
+ override def visitStringLit(ctx: GlutenCacheFileSqlBaseParser.StringLitContext): Token = {
+ if (ctx != null) {
+ if (ctx.STRING != null) {
+ ctx.STRING.getSymbol
+ } else {
+ ctx.DOUBLEQUOTED_STRING.getSymbol
+ }
+ } else {
+ null
+ }
+ }
+
+ override def visitSingleStatement(
+ ctx: GlutenCacheFileSqlBaseParser.SingleStatementContext): AnyRef = withOrigin(ctx) {
+ visit(ctx.statement).asInstanceOf[LogicalPlan]
+ }
+
+ override def visitCacheFiles(ctx: GlutenCacheFileSqlBaseParser.CacheFilesContext): AnyRef =
+ withOrigin(ctx) {
+ val asynExecute = ctx.ASYNC != null
+ val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns)
+ val propertyOverrides = Option(ctx.cacheProps)
+ .map(visitPropertyKeyValues)
+ .getOrElse(Map.empty[String, String])
+ val path = ctx.path.getText
+
+ GlutenCacheFilesCommand(
+ asynExecute,
+ selectedColuman,
+ path.substring(1, path.length - 1),
+ propertyOverrides
+ )
+ }
+
+ override def visitPassThrough(ctx: GlutenCacheFileSqlBaseParser.PassThroughContext): AnyRef =
+ null
+
+ override def visitSelectedColumnNames(
+ ctx: GlutenCacheFileSqlBaseParser.SelectedColumnNamesContext): Option[Seq[String]] =
+ withOrigin(ctx) {
+ if (ctx != null) {
+ if (ctx.ASTERISK != null) {
+ // It means select all columns
+ None
+ } else if (ctx.identifier != null && !(ctx.identifier).isEmpty) {
+ Some(ctx.identifier.asScala.map(_.getText).toSeq)
+ } else {
+ throw new ParseException(s"Illegal selected column.", ctx)
+ }
+ } else {
+ throw new ParseException(s"Illegal selected column.", ctx)
+ }
+ }
+}
+
+case object GlutenCacheFileSqlPostProcessor extends GlutenCacheFileSqlBaseBaseListener {
+
+ /** Remove the back ticks from an Identifier. */
+ override def exitQuotedIdentifier(
+ ctx: GlutenCacheFileSqlBaseParser.QuotedIdentifierContext): Unit = {
+ replaceTokenByIdentifier(ctx, 1) {
+ token =>
+ // Remove the double back ticks in the string.
+ token.setText(token.getText.replace("``", "`"))
+ token
+ }
+ }
+
+ /** Treat non-reserved keywords as Identifiers. */
+ override def exitNonReserved(ctx: GlutenCacheFileSqlBaseParser.NonReservedContext): Unit = {
+ replaceTokenByIdentifier(ctx, 0)(identity)
+ }
+
+ private def replaceTokenByIdentifier(ctx: ParserRuleContext, stripMargins: Int)(
+ f: CommonToken => CommonToken = identity): Unit = {
+ val parent = ctx.getParent
+ parent.removeLastChild()
+ val token = ctx.getChild(0).getPayload.asInstanceOf[Token]
+ val newToken = new CommonToken(
+ new org.antlr.v4.runtime.misc.Pair(token.getTokenSource, token.getInputStream),
+ GlutenCacheFileSqlBaseParser.IDENTIFIER,
+ token.getChannel,
+ token.getStartIndex + stripMargins,
+ token.getStopIndex - stripMargins
+ )
+ parent.addChild(new TerminalNodeImpl(f(newToken)))
+ }
+}
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
index 18fc102bec3d1..4a3883c8cc2b7 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.VariableSubstitution
import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
-import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import java.util.Locale
@@ -256,21 +256,3 @@ case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener {
parent.addChild(new TerminalNodeImpl(f(newToken)))
}
}
-
-class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
- override def consume(): Unit = wrapped.consume
- override def getSourceName(): String = wrapped.getSourceName
- override def index(): Int = wrapped.index
- override def mark(): Int = wrapped.mark
- override def release(marker: Int): Unit = wrapped.release(marker)
- override def seek(where: Int): Unit = wrapped.seek(where)
- override def size(): Int = wrapped.size
-
- override def getText(interval: Interval): String = wrapped.getText(interval)
-
- override def LA(i: Int): Int = {
- val la = wrapped.LA(i)
- if (la == 0 || la == IntStream.EOF) la
- else Character.toUpperCase(la)
- }
-}
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
new file mode 100644
index 0000000000000..6ee2aac81c74a
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.antlr.v4.runtime.{CharStream, CodePointCharStream, IntStream}
+import org.antlr.v4.runtime.misc.Interval
+
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+ override def consume(): Unit = wrapped.consume
+ override def getSourceName(): String = wrapped.getSourceName
+ override def index(): Int = wrapped.index
+ override def mark(): Int = wrapped.mark
+ override def release(marker: Int): Unit = wrapped.release(marker)
+ override def seek(where: Int): Unit = wrapped.seek(where)
+ override def size(): Int = wrapped.size
+
+ override def getText(interval: Interval): String = wrapped.getText(interval)
+
+ override def LA(i: Int): Int = {
+ val la = wrapped.LA(i)
+ if (la == 0 || la == IntStream.EOF) la
+ else Character.toUpperCase(la)
+ }
+}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 8a3bde2358874..7f2b94eea314e 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -79,9 +79,20 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
context.reply(
CacheJobInfo(status = false, "", s"executor: $executorId cache data failed."))
}
- case GlutenMergeTreeCacheLoadStatus(jobId) =>
+ case GlutenCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
context.reply(status)
+ case GlutenFilesCacheLoad(files) =>
+ try {
+ val jobId = CHNativeCacheManager.nativeCacheFiles(files)
+ context.reply(CacheJobInfo(status = true, jobId))
+ } catch {
+ case e: Exception =>
+ context.reply(
+ CacheJobInfo(
+ status = false,
+ s"executor: $executorId cache data failed. ${e.getMessage}"))
+ }
case e =>
logError(s"Received unexpected message. $e")
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index 800b15b9949b0..e596e94fed722 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -39,8 +39,12 @@ object GlutenRpcMessages {
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
extends GlutenRpcMessage
- case class GlutenMergeTreeCacheLoadStatus(jobId: String)
+ case class GlutenCacheLoadStatus(jobId: String)
case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "")
extends GlutenRpcMessage
+
+ case class GlutenFilesCacheLoad(files: Array[Byte]) extends GlutenRpcMessage
+
+ case class GlutenFilesCacheLoadStatus(jobId: String)
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index f32d22d5eac08..1939d0e0367d6 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -16,24 +16,19 @@
*/
package org.apache.spark.sql.execution.commands
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.CacheResult
-import org.apache.gluten.execution.CacheResult.Status
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.rel.ExtensionTableBuilder
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.rpc.GlutenDriverEndpoint
-import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad, GlutenMergeTreeCacheLoadStatus}
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenMergeTreeCacheLoad}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.{checkExecutorId, collectJobTriggerResult, toExecutorId, waitAllJobFinish, waitRpcResults}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
-import org.apache.spark.util.ThreadUtils
import org.apache.hadoop.fs.Path
@@ -43,7 +38,6 @@ import java.util.{ArrayList => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
-import scala.concurrent.duration.Duration
case class GlutenCHCacheDataCommand(
onlyMetaCache: Boolean,
@@ -55,7 +49,8 @@ case class GlutenCHCacheDataCommand(
partitionColumn: Option[String],
partitionValue: Option[String],
tablePropertyOverrides: Map[String, String]
-) extends LeafRunnableCommand {
+) extends LeafRunnableCommand
+ with GlutenCacheBase {
override def output: Seq[Attribute] = Seq(
AttributeReference("result", BooleanType, nullable = false)(),
@@ -140,9 +135,7 @@ case class GlutenCHCacheDataCommand(
val executorIdsToAddFiles =
scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]()
val executorIdsToParts = scala.collection.mutable.Map[String, String]()
- executorIdsToAddFiles.put(
- GlutenCHCacheDataCommand.ALL_EXECUTORS,
- new ArrayBuffer[AddMergeTreeParts]())
+ executorIdsToAddFiles.put(ALL_EXECUTORS, new ArrayBuffer[AddMergeTreeParts]())
selectedAddFiles.foreach(
addFile => {
val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts]
@@ -156,7 +149,7 @@ case class GlutenCHCacheDataCommand(
if (locations.isEmpty) {
// non soft affinity
- executorIdsToAddFiles(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+ executorIdsToAddFiles(ALL_EXECUTORS)
.append(mergeTreePart)
} else {
locations.foreach(
@@ -205,9 +198,9 @@ case class GlutenCHCacheDataCommand(
}
})
val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
- if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) {
+ if (executorIdsToParts.contains(ALL_EXECUTORS)) {
// send all parts to all executors
- val tableMessage = executorIdsToParts(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+ val tableMessage = executorIdsToParts(ALL_EXECUTORS)
GlutenDriverEndpoint.executorDataMap.forEach(
(executorId, executor) => {
futureList.append(
@@ -230,86 +223,7 @@ case class GlutenCHCacheDataCommand(
)))
})
}
- val resultList = waitRpcResults(futureList)
- if (asynExecute) {
- val res = collectJobTriggerResult(resultList)
- Seq(Row(res._1, res._2.mkString(";")))
- } else {
- val res = waitAllJobFinish(resultList)
- Seq(Row(res._1, res._2))
- }
- }
-
-}
-
-object GlutenCHCacheDataCommand {
- private val ALL_EXECUTORS = "allExecutors"
-
- private def toExecutorId(executorId: String): String =
- executorId.split("_").last
-
- def waitAllJobFinish(jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, String) = {
- val res = collectJobTriggerResult(jobs)
- var status = res._1
- val messages = res._2
- jobs.foreach(
- job => {
- if (status) {
- var complete = false
- while (!complete) {
- Thread.sleep(5000)
- val future_result = GlutenDriverEndpoint.executorDataMap
- .get(toExecutorId(job._1))
- .executorEndpointRef
- .ask[CacheResult](GlutenMergeTreeCacheLoadStatus(job._2.jobId))
- val result = ThreadUtils.awaitResult(future_result, Duration.Inf)
- result.getStatus match {
- case Status.ERROR =>
- status = false
- messages.append(
- s"executor : {}, failed with message: {};",
- job._1,
- result.getMessage)
- complete = true
- case Status.SUCCESS =>
- complete = true
- case _ =>
- // still running
- }
- }
- }
- })
- (status, messages.mkString(";"))
- }
-
- private def collectJobTriggerResult(jobs: ArrayBuffer[(String, CacheJobInfo)]) = {
- var status = true
- val messages = ArrayBuffer[String]()
- jobs.foreach(
- job => {
- if (!job._2.status) {
- messages.append(job._2.reason)
- status = false
- }
- })
- (status, messages)
- }
- private def waitRpcResults = (futureList: ArrayBuffer[(String, Future[CacheJobInfo])]) => {
- val resultList = ArrayBuffer[(String, CacheJobInfo)]()
- futureList.foreach(
- f => {
- resultList.append((f._1, ThreadUtils.awaitResult(f._2, Duration.Inf)))
- })
- resultList
+ getResult(futureList, asynExecute)
}
-
- private def checkExecutorId(executorId: String): Unit = {
- if (!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) {
- throw new GlutenException(
- s"executor $executorId not found," +
- s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}")
- }
- }
-
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
new file mode 100644
index 0000000000000..2907febf8028e
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.execution.CacheResult
+import org.apache.gluten.execution.CacheResult.Status
+
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenCacheLoadStatus}
+import org.apache.spark.sql.Row
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+trait GlutenCacheBase {
+ def ALL_EXECUTORS: String = "allExecutors"
+
+ protected def toExecutorId(executorId: String): String =
+ executorId.split("_").last
+
+ protected def waitRpcResults
+ : ArrayBuffer[(String, Future[CacheJobInfo])] => ArrayBuffer[(String, CacheJobInfo)] =
+ (futureList: ArrayBuffer[(String, Future[CacheJobInfo])]) => {
+ val resultList = ArrayBuffer[(String, CacheJobInfo)]()
+ futureList.foreach(
+ f => {
+ resultList.append((f._1, ThreadUtils.awaitResult(f._2, Duration.Inf)))
+ })
+ resultList
+ }
+
+ protected def checkExecutorId(executorId: String): Unit = {
+ if (!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) {
+ throw new GlutenException(
+ s"executor $executorId not found," +
+ s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}")
+ }
+ }
+
+ def waitAllJobFinish(
+ jobs: ArrayBuffer[(String, CacheJobInfo)],
+ ask: (String, String) => Future[CacheResult]): (Boolean, String) = {
+ val res = collectJobTriggerResult(jobs)
+ var status = res._1
+ val messages = res._2
+ jobs.foreach(
+ job => {
+ if (status) {
+ var complete = false
+ while (!complete) {
+ Thread.sleep(5000)
+ val future_result = ask(job._1, job._2.jobId)
+ val result = ThreadUtils.awaitResult(future_result, Duration.Inf)
+ result.getStatus match {
+ case Status.ERROR =>
+ status = false
+ messages.append(
+ s"executor : {}, failed with message: {};",
+ job._1,
+ result.getMessage)
+ complete = true
+ case Status.SUCCESS =>
+ complete = true
+ case _ =>
+ // still running
+ }
+ }
+ }
+ })
+ (status, messages.mkString(";"))
+ }
+
+ protected def collectJobTriggerResult(
+ jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, ArrayBuffer[String]) = {
+ var status = true
+ val messages = ArrayBuffer[String]()
+ jobs.foreach(
+ job => {
+ if (!job._2.status) {
+ messages.append(job._2.reason)
+ status = false
+ }
+ })
+ (status, messages)
+ }
+
+ protected def getResult(
+ futureList: ArrayBuffer[(String, Future[CacheJobInfo])],
+ async: Boolean): Seq[Row] = {
+ val resultList = waitRpcResults(futureList)
+ if (async) {
+ val res = collectJobTriggerResult(resultList)
+ Seq(Row(res._1, res._2.mkString(";")))
+ } else {
+ val fetchStatus: (String, String) => Future[CacheResult] =
+ (executorId: String, jobId: String) => {
+ GlutenDriverEndpoint.executorDataMap
+ .get(toExecutorId(executorId))
+ .executorEndpointRef
+ .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+ }
+ val res = waitAllJobFinish(resultList, fetchStatus)
+ Seq(Row(res._1, res._2))
+ }
+ }
+}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
new file mode 100644
index 0000000000000..e535097ed9fcf
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.substrait.rel.LocalFilesBuilder
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+
+import org.apache.spark.affinity.CHAffinity
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoad}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.types.{BooleanType, StringType}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+import java.io.FileNotFoundException
+import java.lang.{Long => JLong}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+
+case class GlutenCacheFilesCommand(
+ async: Boolean,
+ selectedColumn: Option[Seq[String]],
+ filePath: String,
+ propertyOverrides: Map[String, String]
+) extends LeafRunnableCommand
+ with GlutenCacheBase {
+
+ override def output: Seq[Attribute] = Seq(
+ AttributeReference("result", BooleanType, nullable = false)(),
+ AttributeReference("reason", StringType, nullable = false)())
+
+ override def run(session: SparkSession): Seq[Row] = {
+ val targetFile = new Path(filePath)
+ val hadoopConf: Configuration = session.sparkContext.hadoopConfiguration
+ val fs = targetFile.getFileSystem(hadoopConf)
+ if (!fs.exists(targetFile)) {
+ throw new FileNotFoundException(filePath)
+ }
+
+ val recursive =
+ if ("true".equalsIgnoreCase(propertyOverrides.getOrElse("recursive", "false"))) {
+ true
+ } else {
+ false
+ }
+
+ val files: Seq[FileStatus] = listFiles(targetFile, recursive, fs)
+ val executorIdsToFiles =
+ scala.collection.mutable.Map[String, ArrayBuffer[FileStatus]]()
+ executorIdsToFiles.put(ALL_EXECUTORS, new ArrayBuffer[FileStatus]())
+
+ files.foreach(
+ fileStatus => {
+ val locations = CHAffinity.getHostLocations(fileStatus.getPath.toUri.toASCIIString)
+ if (locations.isEmpty) {
+ executorIdsToFiles(ALL_EXECUTORS).append(fileStatus)
+ } else {
+ locations.foreach(
+ executor => {
+ if (!executorIdsToFiles.contains(executor)) {
+ executorIdsToFiles.put(executor, new ArrayBuffer[FileStatus]())
+ }
+ executorIdsToFiles(executor).append(fileStatus)
+ })
+ }
+ })
+
+ val executorIdsToLocalFiles = executorIdsToFiles
+ .filter(_._2.nonEmpty)
+ .map {
+ case (executorId, fileStatusArray) =>
+ val paths = new JArrayList[String]()
+ val starts = new JArrayList[JLong]()
+ val lengths = new JArrayList[JLong]()
+ val partitionColumns = new JArrayList[JMap[String, String]]
+
+ fileStatusArray.foreach(
+ fileStatus => {
+ paths.add(fileStatus.getPath.toUri.toASCIIString)
+ starts.add(JLong.valueOf(0))
+ lengths.add(JLong.valueOf(fileStatus.getLen))
+ partitionColumns.add(new JHashMap[String, String]())
+ })
+
+ val localFile = LocalFilesBuilder.makeLocalFiles(
+ null,
+ paths,
+ starts,
+ lengths,
+ lengths,
+ new JArrayList[JLong](),
+ partitionColumns,
+ new JArrayList[JMap[String, String]](),
+ ReadFileFormat.ParquetReadFormat, // ignore format in backend
+ new JArrayList[String](),
+ new JHashMap[String, String]()
+ )
+
+ (executorId, localFile)
+ }
+ .toMap
+
+ val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
+ val fileNodeOption = executorIdsToLocalFiles.get(ALL_EXECUTORS)
+ if (fileNodeOption.isDefined) {
+ GlutenDriverEndpoint.executorDataMap.forEach(
+ (executorId, executor) => {
+ futureList.append(
+ (
+ executorId,
+ executor.executorEndpointRef.ask[CacheJobInfo](
+ GlutenFilesCacheLoad(fileNodeOption.get.toProtobuf.toByteArray))))
+ })
+ } else {
+ executorIdsToLocalFiles.foreach {
+ case (executorId, fileNode) =>
+ checkExecutorId(executorId)
+ val executor = GlutenDriverEndpoint.executorDataMap.get(
+ GlutenCacheFilesCommand.toExecutorId(executorId))
+ futureList.append(
+ (
+ executorId,
+ executor.executorEndpointRef.ask[CacheJobInfo](
+ GlutenFilesCacheLoad(fileNode.toProtobuf.toByteArray))))
+ }
+ }
+
+ getResult(futureList, async)
+ }
+
+ private def listFiles(targetFile: Path, recursive: Boolean, fs: FileSystem): Seq[FileStatus] = {
+ val dirContents = fs
+ .listStatus(targetFile)
+ .flatMap(f => addInputPathRecursively(fs, f, recursive))
+ .filter(isNonEmptyDataFile)
+ .toSeq
+ dirContents
+ }
+
+ private def addInputPathRecursively(
+ fs: FileSystem,
+ files: FileStatus,
+ recursive: Boolean): Seq[FileStatus] = {
+ if (files.isFile) {
+ Seq(files)
+ } else if (recursive) {
+ fs.listStatus(files.getPath)
+ .flatMap(
+ file => {
+ if (file.isFile) {
+ Seq(file)
+ } else {
+ addInputPathRecursively(fs, file, recursive)
+ }
+ })
+ } else {
+ Seq()
+ }
+ }
+
+ private def isNonEmptyDataFile(f: FileStatus): Boolean = {
+ if (!f.isFile || f.getLen == 0) {
+ false
+ } else {
+ val name = f.getPath.getName
+ !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+ }
+ }
+}
+
+object GlutenCacheFilesCommand {
+ val ALL_EXECUTORS = "allExecutors"
+
+ private def toExecutorId(executorId: String): String =
+ executorId.split("_").last
+}
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index f914eaa1860a8..dfc5fbd3b37e0 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -41,6 +41,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
version(0) + "." + version(1)
}
+ val CH_CONFIG_PREFIX: String = "spark.gluten.sql.columnar.backend.ch.runtime_config"
+ val CH_SETTING_PREFIX: String = "spark.gluten.sql.columnar.backend.ch.runtime_settings"
+
val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
new file mode 100644
index 0000000000000..863fdd0bb1163
--- /dev/null
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution.{CHNativeCacheManager, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+import org.apache.hadoop.fs.Path
+
+class GlutenClickHouseHDFSSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data"
+ override protected val tpchQueries: String =
+ rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
+ override protected val queriesResults: String = rootPath + "queries-output"
+
+ private val hdfsCachePath = "/tmp/gluten_hdfs_cache/"
+ private val cache_name = "gluten_cache"
+
+ /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager", "sort")
+ .set("spark.io.compression.codec", "snappy")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set(s"$CH_CONFIG_PREFIX.use_local_format", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true")
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name)
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath)
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi")
+ .set("spark.sql.adaptive.enabled", "false")
+ }
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ deleteCache()
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ deleteCache()
+ }
+
+ private def deleteCache(): Unit = {
+ val targetFile = new Path(tablesPath)
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.listStatus(targetFile)
+ .foreach(
+ table => {
+ if (table.isDirectory) {
+ fs.listStatus(table.getPath)
+ .foreach(
+ data => {
+ if (data.isFile) {
+ CHNativeCacheManager
+ .removeFiles(data.getPath.toUri.getPath.substring(1), cache_name)
+ }
+ })
+ }
+ })
+ clearDataPath(hdfsCachePath)
+ }
+
+ val runWithoutCache: () => Unit = () => {
+ runTPCHQuery(6) {
+ df =>
+ val plans = df.queryExecution.executedPlan.collect {
+ case scanExec: FileSourceScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans.head.metrics("readMissBytes").value != 0)
+ }
+ }
+
+ val runWithCache: () => Unit = () => {
+ runTPCHQuery(6) {
+ df =>
+ val plans = df.queryExecution.executedPlan.collect {
+ case scanExec: FileSourceScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans.head.metrics("readMissBytes").value == 0)
+ assert(plans.head.metrics("readCacheBytes").value != 0)
+ }
+ }
+
+ test("test hdfs cache") {
+ runWithoutCache()
+ runWithCache()
+ }
+
+ test("test cache file command") {
+ runSql(
+ s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
+ noFallBack = false) { _ => }
+ runWithCache()
+ }
+
+ test("test no cache by query") {
+ withSQLConf(
+ s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache" -> "true") {
+ runWithoutCache()
+ }
+
+ runWithoutCache()
+ runWithCache()
+ }
+}
diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h
index c91b7264db31c..a92155d14ea18 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -195,6 +195,8 @@ class BackendInitializerUtil
inline static const String GLUTEN_TASK_OFFHEAP = "spark.gluten.memory.task.offHeap.size.in.bytes";
+ inline static const String GLUTEN_LOCAL_CACHE_PREFIX = "gluten_cache.local.";
+
/// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which
/// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written.
/// So we need to get the spark user from env and pass it to libhdfs3.
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h
index ac82b0fff03af..1de019d1ec25c 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -17,9 +17,10 @@
#pragma once
-#include
-#include
#include
+#include
+#include
+#include
namespace local_engine
{
@@ -134,13 +135,17 @@ struct HdfsConfig
{
inline static const String HDFS_ASYNC = "hdfs.enable_async_io";
- bool hdfs_async = true;
+ bool hdfs_async;
- static HdfsConfig loadFromContext(DB::ContextPtr context)
+ static HdfsConfig loadFromContext(const Poco::Util::AbstractConfiguration & config, const DB::ReadSettings & read_settings)
{
- HdfsConfig config;
- config.hdfs_async = context->getConfigRef().getBool(HDFS_ASYNC, true);
- return config;
+ HdfsConfig hdfs;
+ if (read_settings.enable_filesystem_cache)
+ hdfs.hdfs_async = false;
+ else
+ hdfs.hdfs_async = config.getBool(HDFS_ASYNC, true);
+
+ return hdfs;
}
};
@@ -159,10 +164,17 @@ struct S3Config
static S3Config loadFromContext(DB::ContextPtr context)
{
S3Config config;
- config.s3_local_cache_enabled = context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
- config.s3_local_cache_max_size = context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
- config.s3_local_cache_cache_path = context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
- config.s3_gcs_issue_compose_request = context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
+
+ if (context->getConfigRef().has("S3_LOCAL_CACHE_ENABLE"))
+ {
+ LOG_WARNING(&Poco::Logger::get("S3Config"), "Config {} has deprecated.", S3_LOCAL_CACHE_ENABLE);
+
+ config.s3_local_cache_enabled = context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
+ config.s3_local_cache_max_size = context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
+ config.s3_local_cache_cache_path = context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
+ config.s3_gcs_issue_compose_request = context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
+ }
+
return config;
}
};
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 7b8b4cfd95a85..039d978bf77de 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -18,12 +18,57 @@
#include
#include
#include
+#include
using namespace rapidjson;
+namespace ProfileEvents
+{
+extern const Event FileSegmentWaitReadBufferMicroseconds;
+extern const Event FileSegmentReadMicroseconds;
+extern const Event FileSegmentCacheWriteMicroseconds;
+extern const Event FileSegmentPredownloadMicroseconds;
+extern const Event FileSegmentUsedBytes;
+
+extern const Event CachedReadBufferReadFromSourceMicroseconds;
+extern const Event CachedReadBufferReadFromCacheMicroseconds;
+extern const Event CachedReadBufferCacheWriteMicroseconds;
+extern const Event CachedReadBufferReadFromSourceBytes;
+extern const Event CachedReadBufferReadFromCacheBytes;
+extern const Event CachedReadBufferCacheWriteBytes;
+extern const Event CachedReadBufferCreateBufferMicroseconds;
+
+extern const Event CachedReadBufferReadFromCacheHits;
+extern const Event CachedReadBufferReadFromCacheMisses;
+}
+
namespace local_engine
{
+static void writeCacheHits(Writer & writer)
+{
+ auto & counters = DB::CurrentThread::getProfileEvents();
+ auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
+ auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
+ auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();
+ auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
+ auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000;
+ auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000;
+
+ writer.Key("read_cache_hits");
+ writer.Uint64(read_cache_hits);
+ writer.Key("miss_cache_hits");
+ writer.Uint64(miss_cache_hits);
+ writer.Key("read_cache_bytes");
+ writer.Uint64(read_cache_bytes);
+ writer.Key("read_miss_bytes");
+ writer.Uint64(read_miss_bytes);
+ writer.Key("read_cache_millisecond");
+ writer.Uint64(read_cache_millisecond);
+ writer.Key("miss_cache_millisecond");
+ writer.Uint64(miss_cache_millisecond);
+}
+
RelMetric::RelMetric(size_t id_, const String & name_, std::vector & steps_) : id(id_), name(name_), steps(steps_)
{
}
@@ -117,7 +162,7 @@ void RelMetric::serialize(Writer & writer, bool) const
}
writer.EndArray();
- if (auto read_mergetree = dynamic_cast(step))
+ if (auto read_mergetree = dynamic_cast(step))
{
auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk;
auto selected_marks = read_mergetree->getAnalysisResult().selected_marks;
@@ -128,6 +173,11 @@ void RelMetric::serialize(Writer & writer, bool) const
writer.Uint64(selected_marks);
writer.Key("total_marks_pk");
writer.Uint64(total_marks_pk);
+ writeCacheHits(writer);
+ }
+ else if (dynamic_cast(step))
+ {
+ writeCacheHits(writer);
}
writer.EndObject();
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index a97f0c72ada4d..0dc852a901105 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -16,20 +16,21 @@
*/
#include "CacheManager.h"
+#include
#include
#include
#include
-#include
+#include
#include
-#include
-#include
+#include
#include
#include
#include
#include
+#include
#include
+#include
#include
-#include
#include
@@ -178,4 +179,62 @@ jobject CacheManager::getCacheStatus(JNIEnv * env, const String & jobId)
}
return env->NewObject(cache_result_class, cache_result_constructor, status, charTojstring(env, message.c_str()));
}
+
+Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder)
+{
+ auto task = [file, read_buffer_builder, context = this->context]()
+ {
+ LOG_INFO(getLogger("CacheManager"), "Loading cache file {}", file.uri_file());
+
+ try
+ {
+ std::unique_ptr rb = read_buffer_builder->build(file);
+ while (!rb->eof())
+ rb->ignoreAll();
+ }
+ catch (std::exception & e)
+ {
+ LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n {}", file.uri_file(), e.what());
+ std::rethrow_exception(std::current_exception());
+ }
+ };
+
+ return std::move(task);
+}
+
+JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
+{
+ JobId id = toString(UUIDHelpers::generateV4());
+ Job job(id);
+
+ if (file_infos.items_size())
+ {
+ const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
+ const auto read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);
+
+ if (read_buffer_builder->file_cache)
+ for (const auto & file : file_infos.items())
+ job.addTask(cacheFile(file, read_buffer_builder));
+ else
+ LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because cache not enabled.");
+ }
+
+ auto & scheduler = JobScheduler::instance();
+ scheduler.scheduleJob(std::move(job));
+ return id;
+}
+
+void CacheManager::removeFiles(String file, String cache_name)
+{
+ // only for ut
+ for (const auto & [name, file_cache] : FileCacheFactory::instance().getAll())
+ {
+ if (name != cache_name)
+ continue;
+
+ if (const auto cache = file_cache->cache)
+ cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id);
+ }
+}
+
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index b88a3ea03e4ec..6335f86bb162d 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -15,9 +15,13 @@
* limitations under the License.
*/
#pragma once
+
+#include
+
#include
#include
#include
+#include
namespace local_engine
{
@@ -40,6 +44,10 @@ class CacheManager {
Task cachePart(const MergeTreeTable& table, const MergeTreePart& part, const std::unordered_set& columns);
JobId cacheParts(const String& table_def, const std::unordered_set& columns);
static jobject getCacheStatus(JNIEnv * env, const String& jobId);
+
+ Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder);
+ JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos);
+ static void removeFiles(String file, String cache_name);
private:
CacheManager() = default;
DB::ContextMutablePtr context;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index da15890070b09..cfc68188c5ca5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -35,7 +35,7 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
@@ -49,7 +49,6 @@
#include
#include
#include
-#include
#include
#include
@@ -77,8 +76,6 @@ namespace ErrorCodes
}
}
-namespace fs = std::filesystem;
-
namespace local_engine
{
template
@@ -205,6 +202,7 @@ class LocalFileReadBufferBuilder : public ReadBufferBuilder
#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
+ using ReadBufferCreator = std::function(bool restricted_seek, const DB::StoredObject & object)>;
public:
explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_), context(context_) { }
~HDFSFileReadBufferBuilder() override = default;
@@ -212,18 +210,23 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
std::unique_ptr
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
- auto config = HdfsConfig::loadFromContext(context);
+ DB::ReadSettings read_settings = getReadSettings(context);
+ auto & config = context->getConfigRef();
+ auto hdfs_config = HdfsConfig::loadFromContext(config, read_settings);
Poco::URI file_uri(file_info.uri_file());
std::string uri_path = "hdfs://" + file_uri.getHost();
if (file_uri.getPort())
- uri_path += ":" + std::to_string(file_uri.getPort());
+ uri_path += ":" + std::to_string(static_cast(file_uri.getPort()));
- DB::ReadSettings read_settings;
- std::unique_ptr read_buffer;
+ std::cout << "read_from_filesystem_cache_if_exists_otherwise_bypass_cache:" << read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache << std::endl;
+
+ size_t read_util_position = 0;
+ size_t read_begin = 0;
if (set_read_util_position)
{
std::pair start_end_pos
- = adjustFileReadStartAndEndPos(file_info.start(), file_info.start() + file_info.length(), uri_path, file_uri.getPath());
+ = adjustFileReadStartAndEndPos(file_info.start(), file_info.start() + file_info.length(), uri_path, file_uri.getPath());
+
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to {},{}",
@@ -232,34 +235,57 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
start_end_pos.first,
start_end_pos.second);
- auto read_buffer_impl = std::make_unique(
- uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, start_end_pos.second, true);
- if (config.hdfs_async)
- {
- auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
- read_buffer = std::make_unique(pool_reader, read_settings, std::move(read_buffer_impl));
- }
- else
- read_buffer = std::move(read_buffer_impl);
+ read_begin = start_end_pos.first;
+ read_util_position = start_end_pos.second;
+ }
- if (auto * seekable_in = dynamic_cast(read_buffer.get()))
- if (start_end_pos.first)
- seekable_in->seek(start_end_pos.first, SEEK_SET);
+ size_t file_size = 0;
+ if (file_info.has_properties())
+ file_size = file_info.properties().filesize();
+
+ std::unique_ptr read_buffer;
+
+ if (hdfs_config.hdfs_async)
+ {
+ std::optional size = std::nullopt;
+ if (file_size)
+ size = file_size;
+
+ auto read_buffer_impl = std::make_shared(
+ uri_path, file_uri.getPath(), config, read_settings, read_util_position, true, size);
+ auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
+ read_buffer = std::make_unique(pool_reader, read_settings, std::move(read_buffer_impl));
}
else
{
- auto read_buffer_impl
- = std::make_unique(uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, 0, true);
- if (config.hdfs_async)
+ if (!file_size)
{
- read_buffer = std::make_unique(
- context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER),
- read_settings,
- std::move(read_buffer_impl));
+ // only for spark3.2 file partition not contained file size
+ // so first compute file size first
+ auto read_buffer_impl = std::make_unique(
+ uri_path, file_uri.getPath(), config, read_settings, read_util_position, true);
+ file_size = read_buffer_impl->getFileSize();
}
- else
- read_buffer = std::move(read_buffer_impl);
+
+ ReadBufferCreator hdfs_read_buffer_creator
+ = [this, hdfs_uri = uri_path, hdfs_file_path = file_uri.getPath(), read_settings, &config, read_util_position](
+ bool /* restricted_seek */, const DB::StoredObject & object) -> std::unique_ptr
+ {
+ return std::make_unique(
+ hdfs_uri, hdfs_file_path, config, read_settings, read_util_position, true, object.bytes_size);
+ };
+
+ DB::StoredObjects stored_objects{DB::StoredObject{file_uri.getPath().substr(1), "", file_size}};
+ auto cache_hdfs_read = std::make_unique(
+ std::move(hdfs_read_buffer_creator), stored_objects, "hdfs:", read_settings, nullptr, /* use_external_buffer */ false);
+ cache_hdfs_read->setReadUntilPosition(read_util_position);
+ read_buffer = std::move(cache_hdfs_read);
}
+
+ if (set_read_util_position && read_begin)
+ if (auto * seekable_in = dynamic_cast(read_buffer.get()))
+ seekable_in->seek(read_begin, SEEK_SET);
+
return read_buffer;
}
@@ -367,6 +393,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
result.second = get_next_line_pos(fs.get(), fin, read_end_pos, hdfs_file_size);
return result;
}
+
private:
DB::ContextPtr context;
};
@@ -382,23 +409,19 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_)
{
auto config = S3Config::loadFromContext(context);
- new_settings = context->getReadSettings();
- new_settings.enable_filesystem_cache = config.s3_local_cache_enabled;
-
- if (new_settings.enable_filesystem_cache)
+ // use gluten cache config is first priority
+ if (!file_cache && config.s3_local_cache_enabled)
{
DB::FileCacheSettings file_cache_settings;
file_cache_settings.max_size = config.s3_local_cache_max_size;
auto cache_base_path = config.s3_local_cache_cache_path;
- if (!fs::exists(cache_base_path))
- fs::create_directories(cache_base_path);
+ if (!std::filesystem::exists(cache_base_path))
+ std::filesystem::create_directories(cache_base_path);
file_cache_settings.base_path = cache_base_path;
file_cache = DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", file_cache_settings, "");
file_cache->initialize();
-
- new_settings.remote_fs_cache = file_cache;
}
}
@@ -407,6 +430,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
std::unique_ptr
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
+ DB::ReadSettings read_settings = getReadSettings(context);
Poco::URI file_uri(file_info.uri_file());
// file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
const std::string& bucket = file_uri.getHost();
@@ -416,9 +440,8 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
size_t object_size = object_info.size;
Int64 object_modified_time = object_info.last_modification_time;
- if (new_settings.enable_filesystem_cache)
+ if (read_settings.enable_filesystem_cache)
{
-
auto file_cache_key = DB::FileCacheKey(key);
auto last_cache_time = files_cache_time_map.get(file_cache_key);
// quick check
@@ -436,7 +459,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
}
auto read_buffer_creator
- = [bucket, client, this](bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr
+ = [bucket, client, read_settings, this](bool restricted_seek, const DB::StoredObject & object) -> std::unique_ptr
{
return std::make_unique(
client,
@@ -444,7 +467,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
object.remote_path,
"",
DB::S3::RequestSettings(),
- new_settings,
+ read_settings,
/* use_external_buffer */ true,
/* offset */ 0,
/* read_until_position */0,
@@ -453,11 +476,11 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
DB::StoredObjects stored_objects{DB::StoredObject{key, "", object_size}};
auto s3_impl = std::make_unique(
- std::move(read_buffer_creator), stored_objects, "s3:" + bucket + "/", new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
+ std::move(read_buffer_creator), stored_objects, "s3:" + bucket + "/", read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader
- = std::make_unique(std::move(s3_impl), pool_reader, new_settings, nullptr, nullptr);
+ = std::make_unique(std::move(s3_impl), pool_reader, read_settings, nullptr, nullptr);
if (set_read_util_position)
{
@@ -478,7 +501,7 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
async_reader->setReadUntilEnd();
}
- if (new_settings.remote_fs_prefetch)
+ if (read_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
return async_reader;
@@ -488,7 +511,6 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
static const std::string SHARED_CLIENT_KEY;
static ConcurrentLRU> per_bucket_clients;
static FileCacheConcurrentMap files_cache_time_map;
- DB::ReadSettings new_settings;
DB::FileCachePtr file_cache;
std::string & stripQuote(std::string & s)
@@ -732,6 +754,57 @@ void registerReadBufferBuilders()
#endif
}
+ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr context_) : context(context_)
+{
+ const auto & config = context->getConfigRef();
+ if (config.getBool("gluten_cache.local.enabled", false))
+ {
+ DB::FileCacheSettings file_cache_settings;
+
+ file_cache_settings.loadFromConfig(config, "gluten_cache.local");
+
+ if (std::filesystem::path(file_cache_settings.base_path).is_relative())
+ file_cache_settings.base_path = std::filesystem::path(context->getPath()) / "caches" / file_cache_settings.base_path;
+
+ if (!std::filesystem::exists(file_cache_settings.base_path))
+ std::filesystem::create_directories(file_cache_settings.base_path);
+
+ auto name = config.getString("gluten_cache.local.name");
+ auto * config_prefix = "";
+ file_cache = DB::FileCacheFactory::instance().getOrCreate(name, file_cache_settings, config_prefix);
+ file_cache->initialize();
+ }
+}
+
+DB::ReadSettings ReadBufferBuilder::getReadSettings(DB::ContextPtr context) const
+{
+ DB::ReadSettings read_settings = context->getReadSettings();
+ if (file_cache)
+ {
+ read_settings.enable_filesystem_cache = true;
+ read_settings.remote_fs_cache = file_cache;
+ }
+ else
+ {
+ read_settings.enable_filesystem_cache = false;
+ }
+
+ return read_settings;
+}
+
+
+std::unique_ptr
+ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position)
+{
+ auto in = build(file_info, set_read_util_position);
+
+ /// Wrap the read buffer with compression method if exists
+ Poco::URI file_uri(file_info.uri_file());
+ DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
+ return compression != DB::CompressionMethod::None ? DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) : std::move(in);
+}
+
+
ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
static ReadBufferBuilderFactory instance;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index f5218f0aa5def..92d8d41c1290f 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -15,21 +15,21 @@
* limitations under the License.
*/
#pragma once
+
#include
#include
#include
-#include
-#include
-#include
#include
-#include
+
namespace local_engine
{
+
class ReadBufferBuilder
{
public:
- explicit ReadBufferBuilder(DB::ContextPtr context_) : context(context_) { }
+ explicit ReadBufferBuilder(DB::ContextPtr context_);
+
virtual ~ReadBufferBuilder() = default;
/// build a new read buffer
@@ -37,19 +37,14 @@ class ReadBufferBuilder
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false) = 0;
/// build a new read buffer, consider compression method
- std::unique_ptr buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false)
- {
- auto in = build(file_info, set_read_util_position);
-
- /// Wrap the read buffer with compression method if exists
- Poco::URI file_uri(file_info.uri_file());
- DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
- return compression != DB::CompressionMethod::None ? DB::wrapReadBufferWithCompressionMethod(std::move(in), compression)
- : std::move(in);
- }
+ std::unique_ptr buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false);
protected:
+ DB::ReadSettings getReadSettings(DB::ContextPtr context) const;
DB::ContextPtr context;
+
+public:
+ DB::FileCachePtr file_cache = nullptr;
};
using ReadBufferBuilderPtr = std::shared_ptr;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp
index 3c3d6d4f89c2f..f27da2f920488 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1292,6 +1292,30 @@ JNIEXPORT jobject Java_org_apache_gluten_execution_CHNativeCacheManager_nativeGe
return local_engine::CacheManager::instance().getCacheStatus(env, jstring2string(env, id));
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
+
+JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheFiles(JNIEnv * env, jobject, jbyteArray files)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ const auto files_bytes = local_engine::getByteArrayElementsSafe(env, files);
+ const std::string::size_type files_bytes_size = files_bytes.length();
+ std::string_view files_view = {reinterpret_cast(files_bytes.elems()), files_bytes_size};
+ substrait::ReadRel::LocalFiles local_files = local_engine::BinaryToMessage(files_view);
+
+ auto jobId = local_engine::CacheManager::instance().cacheFiles(local_files);
+ return local_engine::charTojstring(env, jobId.c_str());
+ LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
+}
+
+JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, jobject, jstring file_, jstring cache_name_)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ auto file = jstring2string(env, file_);
+ auto cache_name = jstring2string(env, cache_name_);
+
+ local_engine::CacheManager::removeFiles(file, cache_name);
+ LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
#ifdef __cplusplus
}