From 31eceae0b75f66d883040a3bb8f36d5f76a60f51 Mon Sep 17 00:00:00 2001 From: loneylee Date: Fri, 1 Nov 2024 15:55:01 +0800 Subject: [PATCH] [GLUTEN-7765][CH] Support CACHE META command for MergeTree table --- .../execution/CHNativeCacheManager.java | 7 +++--- .../spark/rpc/GlutenExecutorEndpoint.scala | 4 ++-- .../apache/spark/rpc/GlutenRpcMessages.scala | 5 +++- .../commands/GlutenCHCacheDataCommand.scala | 4 ++-- .../Storages/Cache/CacheManager.cpp | 24 ++++++++++++++----- .../Storages/Cache/CacheManager.h | 5 ++-- .../MergeTree/SparkStorageMergeTree.cpp | 11 +++++++++ cpp-ch/local-engine/local_engine_jni.cpp | 6 ++--- 8 files changed, 47 insertions(+), 19 deletions(-) diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java index 4033d8c6b1cc..7c89cf6b41ad 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java @@ -19,11 +19,12 @@ import java.util.Set; public class CHNativeCacheManager { - public static String cacheParts(String table, Set columns) { - return nativeCacheParts(table, String.join(",", columns)); + public static String cacheParts(String table, Set columns, boolean onlyMetaCache) { + return nativeCacheParts(table, String.join(",", columns), onlyMetaCache); } - private static native String nativeCacheParts(String table, String columns); + private static native String nativeCacheParts( + String table, String columns, boolean onlyMetaCache); public static CacheResult getCacheStatus(String jobId) { return nativeGetCacheStatus(jobId); diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 559a22cb12c2..b9ae9ff363b6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) => + case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) => try { - val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns) + val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, onlyMetaCache) context.reply(CacheJobInfo(status = true, jobId)) } catch { case e: Exception => diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala index e596e94fed72..8127c324b79c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala @@ -36,7 +36,10 @@ object GlutenRpcMessages { extends GlutenRpcMessage // for mergetree cache - case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String]) + case class GlutenMergeTreeCacheLoad( + mergeTreeTable: String, + columns: util.Set[String], + onlyMetaCache: Boolean) extends GlutenRpcMessage case class GlutenCacheLoadStatus(jobId: String) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 1c7b4f232205..69a8c4218714 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand( ( executorId, executor.executorEndpointRef.ask[CacheJobInfo]( - GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava) + GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache) ))) }) } else { @@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand( ( value._1, executorData.executorEndpointRef.ask[CacheJobInfo]( - GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava) + GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache) ))) }) } diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index bb688724c3d4..e2ba48e9d272 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -81,17 +81,31 @@ struct CacheJobContext MergeTreeTableInstance table; }; -Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns) +Task CacheManager::cachePart( + const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns, bool only_meta_cache) { CacheJobContext job_context{table}; job_context.table.parts.clear(); job_context.table.parts.push_back(part); job_context.table.snapshot_id = ""; - Task task = [job_detail = job_context, context = this->context, read_columns = columns]() + Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache]() { try { auto storage = job_detail.table.restoreStorage(context); + std::vector selected_parts + = StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name}); + + if (only_meta_cache) + { + LOG_INFO( + getLogger("CacheManager"), + "Load meta cache of table {}.{} part {} success.", + job_detail.table.database, + job_detail.table.table, + job_detail.table.parts.front().name); + return; + } auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); NamesAndTypesList names_and_types_list; @@ -102,8 +116,6 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr names_and_types_list.push_back(NameAndTypePair(column.name, column.type)); } auto query_info = buildQueryInfo(names_and_types_list); - std::vector selected_parts - = StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name}); auto read_step = storage->reader.readFromParts( selected_parts, storage->getMutationsSnapshot({}), @@ -135,13 +147,13 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr return std::move(task); } -JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set& columns) +JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set& columns, bool only_meta_cache) { JobId id = toString(UUIDHelpers::generateV4()); Job job(id); for (const auto & part : table.parts) { - job.addTask(cachePart(table, part, columns)); + job.addTask(cachePart(table, part, columns, only_meta_cache)); } auto& scheduler = JobScheduler::instance(); scheduler.scheduleJob(std::move(job)); diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index b59963ec4fa7..8fd26d249abc 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -40,7 +40,7 @@ class CacheManager static CacheManager & instance(); static void initialize(const DB::ContextMutablePtr & context); - JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set & columns); + JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set & columns, bool only_meta_cache); static jobject getCacheStatus(JNIEnv * env, const String & jobId); Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder); @@ -48,7 +48,8 @@ class CacheManager static void removeFiles(String file, String cache_name); private: - Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns); + Task cachePart( + const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns, bool only_meta_cache); CacheManager() = default; DB::ContextMutablePtr context; }; diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index 9c90c67f69b3..45be9dcf7442 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -22,6 +22,12 @@ #include #include +namespace ProfileEvents +{ +extern const Event LoadedDataParts; +extern const Event LoadedDataPartsMicroseconds; +} + namespace DB { namespace MergeTreeSetting @@ -176,6 +182,7 @@ void SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set std::vector SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set & parts) { + Stopwatch watch; prefetchMetaDataFile(parts); std::vector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); @@ -187,6 +194,10 @@ std::vector SparkStorageMergeTree::loadDataPartsWithNames( data_parts.emplace_back(res.part); } + watch.stop(); + LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds", parts.size(), watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size()); + ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds, watch.elapsedMicroseconds()); return data_parts; } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 02e606e1edc8..8ff8a866b7ca 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1340,8 +1340,8 @@ JNIEXPORT void Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc } -JNIEXPORT jstring -Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * env, jobject, jstring table_, jstring columns_) +JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts( + JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean only_meta_cache_) { LOCAL_ENGINE_JNI_METHOD_START auto table_def = jstring2string(env, table_); @@ -1351,7 +1351,7 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * for (const auto & col : tokenizer) column_set.insert(col); local_engine::MergeTreeTableInstance table(table_def); - auto id = local_engine::CacheManager::instance().cacheParts(table, column_set); + auto id = local_engine::CacheManager::instance().cacheParts(table, column_set, only_meta_cache_); return local_engine::charTojstring(env, id.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); }