Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Aug 15, 2024
1 parent 38292f5 commit e1d4af4
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ public static void cacheParts(String table, Set<String> columns, boolean async)

private static native void nativeCacheParts(String table, String columns, boolean async);

public static native long nativeCacheFiles(byte[] files, boolean async);
public static native void nativeCacheFiles(byte[] files, boolean async);

// only for ut
public static native void removeFiles(String file, String cacheName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ case class GlutenCacheFilesCommand(
override def run(session: SparkSession): Seq[Row] = {
val targetFile = new Path(filePath)
val hadoopConf: Configuration = session.sparkContext.hadoopConfiguration
// val fs = FileSystem.get(uri, hadoopConf)
val fs = targetFile.getFileSystem(hadoopConf)
if (!fs.exists(targetFile)) {
throw new FileNotFoundException(filePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution.tpch

import org.apache.gluten.execution.{CHNativeCacheManager, FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

import org.apache.hadoop.fs.Path

class GlutenClickHouseHDFSSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

override protected val needCopyParquetToTablePath = true

override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data"
override protected val tpchQueries: String =
rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
override protected val queriesResults: String = rootPath + "queries-output"

private val hdfsCachePath = "/tmp/gluten_hdfs_cache/"
private val cache_name = "gluten_cache"

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "sort")
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set(s"$CH_CONFIG_PREFIX.use_local_format", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true")
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name)
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath)
.set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi")
.set("spark.sql.adaptive.enabled", "false")
}

override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}

override def beforeAll(): Unit = {
super.beforeAll()
deleteCache()
}

override protected def beforeEach(): Unit = {
super.beforeEach()
deleteCache()
}

private def deleteCache(): Unit = {
val targetFile = new Path(tablesPath)
val fs = targetFile.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.listStatus(targetFile)
.foreach(
table => {
if (table.isDirectory) {
fs.listStatus(table.getPath)
.foreach(
data => {
if (data.isFile) {
CHNativeCacheManager
.removeFiles(data.getPath.toUri.getPath.substring(1), cache_name)
}
})
}
})
clearDataPath(hdfsCachePath)
}

val runWithoutCache: () => Unit = () => {
runTPCHQuery(6) {
df =>
val plans = df.queryExecution.executedPlan.collect {
case scanExec: FileSourceScanExecTransformer => scanExec
}
assert(plans.size == 1)
assert(plans.head.metrics("readMissBytes").value != 0)
}
}

val runWithCache: () => Unit = () => {
runTPCHQuery(6) {
df =>
val plans = df.queryExecution.executedPlan.collect {
case scanExec: FileSourceScanExecTransformer => scanExec
}
assert(plans.size == 1)
assert(plans.head.metrics("readMissBytes").value == 0)
assert(plans.head.metrics("readCacheBytes").value != 0)
}
}

test("test hdfs cache") {
runWithoutCache()
runWithCache()
}

test("test cache file command") {
runSql(
s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
noFallBack = false) { _ => }
runWithCache()
}

test("test no cache by query") {
withSQLConf(
s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache" -> "true") {
runWithoutCache()
}

runWithoutCache()
runWithCache()
}
}
25 changes: 19 additions & 6 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
*/
#include "CacheManager.h"

#include <ranges>
#include <Core/Settings.h>
#include <Disks/IStoragePolicy.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Storages/Mergetree/MetaDataHelper.h>
// #include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Common/ThreadPool.h>
#include <Interpreters/Context.h>
#include <Parser/MergeTreeRelParser.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Sinks/NullSink.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Mergetree/MetaDataHelper.h>
#include <Common/Logger.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <ranges>

namespace DB
{
Expand Down Expand Up @@ -184,4 +183,18 @@ void CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos, bool as
latch->wait();
}
}

void CacheManager::removeFiles(String file, String cache_name)
{
// only for ut
for (auto cache_data : FileCacheFactory::instance().getAll())
{
if (cache_data.first != cache_name)
continue;

if (const auto cache = cache_data.second->cache)
cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id);
}
}

}
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class CacheManager {

void cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder, std::shared_ptr<std::latch> latch);
void cacheFiles(substrait::ReadRel::LocalFiles file_infos, bool async);
static void removeFiles(String file, String cache_name);
private:
CacheManager() = default;

Expand Down
10 changes: 10 additions & 0 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,16 @@ JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCache
LOCAL_ENGINE_JNI_METHOD_END(env, );
}

JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, jobject, jstring file_, jstring cache_name_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto file = jstring2string(env, file_);
auto cache_name = jstring2string(env, cache_name_);

local_engine::CacheManager::removeFiles(file, cache_name);
LOCAL_ENGINE_JNI_METHOD_END(env, );
}

#ifdef __cplusplus
}

Expand Down

0 comments on commit e1d4af4

Please sign in to comment.