diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java index 173cc3e864a08..ff19ac33f0424 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java @@ -25,5 +25,8 @@ public static void cacheParts(String table, Set columns, boolean async) private static native void nativeCacheParts(String table, String columns, boolean async); - public static native long nativeCacheFiles(byte[] files, boolean async); + public static native void nativeCacheFiles(byte[] files, boolean async); + + // only for ut + public static native void removeFiles(String file, String cacheName); } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala index a49ea572b741b..0b7c03c26f286 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala @@ -54,7 +54,6 @@ case class GlutenCacheFilesCommand( override def run(session: SparkSession): Seq[Row] = { val targetFile = new Path(filePath) val hadoopConf: Configuration = session.sparkContext.hadoopConfiguration -// val fs = FileSystem.get(uri, hadoopConf) val fs = targetFile.getFileSystem(hadoopConf) if (!fs.exists(targetFile)) { throw new FileNotFoundException(filePath) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala new file mode 100644 index 0000000000000..a2849e99c5cb3 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.tpch + +import org.apache.gluten.execution.{CHNativeCacheManager, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +import org.apache.hadoop.fs.Path + +class GlutenClickHouseHDFSSuite + extends GlutenClickHouseTPCHAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data" + override protected val tpchQueries: String = + rootPath + "../../../../gluten-core/src/test/resources/tpch-queries" + override protected val queriesResults: String = rootPath + "queries-output" + + private val hdfsCachePath = "/tmp/gluten_hdfs_cache/" + private val cache_name = "gluten_cache" + + /** Run Gluten + ClickHouse Backend with SortShuffleManager */ + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "sort") + .set("spark.io.compression.codec", "snappy") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.adaptive.enabled", "true") + .set(s"$CH_CONFIG_PREFIX.use_local_format", "true") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true") + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name) + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath) + .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi") + .set("spark.sql.adaptive.enabled", "false") + } + + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + override def beforeAll(): Unit = { + super.beforeAll() + deleteCache() + } + + override protected def beforeEach(): Unit = { + super.beforeEach() + deleteCache() + } + + private def deleteCache(): Unit = { + val targetFile = new Path(tablesPath) + val fs = targetFile.getFileSystem(spark.sparkContext.hadoopConfiguration) + fs.listStatus(targetFile) + .foreach( + table => { + if (table.isDirectory) { + fs.listStatus(table.getPath) + .foreach( + data => { + if (data.isFile) { + CHNativeCacheManager + .removeFiles(data.getPath.toUri.getPath.substring(1), cache_name) + } + }) + } + }) + clearDataPath(hdfsCachePath) + } + + val runWithoutCache: () => Unit = () => { + runTPCHQuery(6) { + df => + val plans = df.queryExecution.executedPlan.collect { + case scanExec: FileSourceScanExecTransformer => scanExec + } + assert(plans.size == 1) + assert(plans.head.metrics("readMissBytes").value != 0) + } + } + + val runWithCache: () => Unit = () => { + runTPCHQuery(6) { + df => + val plans = df.queryExecution.executedPlan.collect { + case scanExec: FileSourceScanExecTransformer => scanExec + } + assert(plans.size == 1) + assert(plans.head.metrics("readMissBytes").value == 0) + assert(plans.head.metrics("readCacheBytes").value != 0) + } + } + + test("test hdfs cache") { + runWithoutCache() + runWithCache() + } + + test("test cache file command") { + runSql( + s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'", + noFallBack = false) { _ => } + runWithCache() + } + + test("test no cache by query") { + withSQLConf( + s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache" -> "true") { + runWithoutCache() + } + + runWithoutCache() + runWithCache() + } +} diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index dc6f5b5ab2be0..b8288175994eb 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -16,22 +16,21 @@ */ #include "CacheManager.h" +#include #include #include #include -#include +#include #include -#include -// #include -#include +#include #include #include #include -#include #include +#include #include +#include #include -#include namespace DB { @@ -184,4 +183,18 @@ void CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos, bool as latch->wait(); } } + +void CacheManager::removeFiles(String file, String cache_name) +{ + // only for ut + for (auto cache_data : FileCacheFactory::instance().getAll()) + { + if (cache_data.first != cache_name) + continue; + + if (const auto cache = cache_data.second->cache) + cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id); + } +} + } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index 0549b2de3b840..9bbfbbf38a343 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -39,6 +39,7 @@ class CacheManager { void cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder, std::shared_ptr latch); void cacheFiles(substrait::ReadRel::LocalFiles file_infos, bool async); + static void removeFiles(String file, String cache_name); private: CacheManager() = default; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 0e7b18948218c..e22c411bbeec7 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1299,6 +1299,16 @@ JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCache LOCAL_ENGINE_JNI_METHOD_END(env, ); } +JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, jobject, jstring file_, jstring cache_name_) +{ + LOCAL_ENGINE_JNI_METHOD_START + auto file = jstring2string(env, file_); + auto cache_name = jstring2string(env, cache_name_); + + local_engine::CacheManager::removeFiles(file, cache_name); + LOCAL_ENGINE_JNI_METHOD_END(env, ); +} + #ifdef __cplusplus }