From 371be6f860c2b409ea70a79e0c2e4e72a2624e38 Mon Sep 17 00:00:00 2001 From: LiuNeng <1398775315@qq.com> Date: Wed, 21 Aug 2024 13:57:54 +0800 Subject: [PATCH] [CH] Added cleanup logic for expiration mergetree part cache What changes were proposed in this pull request? Added cleanup logic for expiration mergetree part cache How was this patch tested? unit tests, manual tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) --- cpp-ch/local-engine/Common/GlutenConfig.h | 8 ++--- .../Storages/CustomStorageMergeTree.cpp | 6 ++-- .../Storages/CustomStorageMergeTree.h | 2 +- .../Storages/StorageMergeTreeFactory.cpp | 12 +++---- .../Storages/StorageMergeTreeFactory.h | 36 +++++++++++++++++-- 5 files changed, 48 insertions(+), 16 deletions(-) diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index 02bb8a9f4c04..d0e2e9dee8b5 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -172,14 +172,14 @@ struct MergeTreeConfig inline static const String TABLE_PART_METADATA_CACHE_MAX_COUNT = "table_part_metadata_cache_max_count"; inline static const String TABLE_METADATA_CACHE_MAX_COUNT = "table_metadata_cache_max_count"; - size_t table_part_metadata_cache_max_count = 1000; - size_t table_metadata_cache_max_count = 100; + size_t table_part_metadata_cache_max_count = 5000; + size_t table_metadata_cache_max_count = 500; static MergeTreeConfig loadFromContext(DB::ContextPtr context) { MergeTreeConfig config; - config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 1000); - config.table_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_METADATA_CACHE_MAX_COUNT, 100); + config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 5000); + config.table_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_METADATA_CACHE_MAX_COUNT, 500); return config; } }; diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index 961f482c7cae..7336d7db5190 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -262,16 +262,16 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart( return res; } -void CustomStorageMergeTree::removePartFromMemory(const MergeTreeData::DataPartPtr & part_to_detach) +void CustomStorageMergeTree::removePartFromMemory(const MergeTreeData::DataPart & part_to_detach) { auto lock = lockParts(); bool removed_active_part = false; bool restored_active_part = false; - auto it_part = data_parts_by_info.find(part_to_detach->info); + auto it_part = data_parts_by_info.find(part_to_detach.info); if (it_part == data_parts_by_info.end()) { - LOG_DEBUG(log, "No such data part {}", part_to_detach->getNameWithState()); + LOG_DEBUG(log, "No such data part {}", part_to_detach.getNameWithState()); return; } diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h index 9144aba429c0..773e5858c24f 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h @@ -54,7 +54,7 @@ class CustomStorageMergeTree final : public MergeTreeData bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override; std::map getUnfinishedMutationCommands() const override; std::vector loadDataPartsWithNames(std::unordered_set parts); - void removePartFromMemory(const MergeTreeData::DataPartPtr & part_to_detach); + void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach); MergeTreeDataWriter writer; MergeTreeDataSelectExecutor reader; diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp index 3f7aac8724a0..eefd1c5fd1ec 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp @@ -87,20 +87,20 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i std::unordered_set missing_names; if (!datapart_map->has(table_name)) [[unlikely]] { - auto cache = std::make_shared>(config.table_part_metadata_cache_max_count); + auto cache = std::make_shared>(config.table_part_metadata_cache_max_count); datapart_map->add(table_name, cache); } // find the missing cache part name for (const auto & name : part_name) { - if (!(*(datapart_map->get(table_name)))->has(name)) + if (!(*datapart_map->get(table_name))->has(name)) { missing_names.emplace(name); } else { - res.emplace_back((*((*(datapart_map->get(table_name)))->get(name)))); + res.emplace_back((*datapart_map->get(table_name))->get(name)->get()->dataPart()); } } @@ -112,17 +112,17 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i storage_merge_tree = storage_map->get(table_name)->first; } auto missing_parts = storage_merge_tree->loadDataPartsWithNames(missing_names); - for (const auto & part : missing_parts) + for (auto & part : missing_parts) { res.emplace_back(part); - (*(datapart_map->get(table_name)))->add(part->name, part); + (*datapart_map->get(table_name))->add(part->name, std::make_shared(part, storage_merge_tree)); } } return res; } // will be inited in native init phase std::unique_ptr>> StorageMergeTreeFactory::storage_map = nullptr; -std::unique_ptr>>> StorageMergeTreeFactory::datapart_map = nullptr; +std::unique_ptr>>> StorageMergeTreeFactory::datapart_map = nullptr; std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex; std::recursive_mutex StorageMergeTreeFactory::datapart_mutex; diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h index 3fa8c6285bbe..09a2d5747b26 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h @@ -27,6 +27,37 @@ namespace local_engine { using CustomStorageMergeTreePtr = std::shared_ptr; +class DataPartStorageHolder +{ +public: + DataPartStorageHolder(const DataPartPtr& data_part, const CustomStorageMergeTreePtr& storage) + : data_part_(data_part), + storage_(storage) + { + } + + [[nodiscard]] DataPartPtr dataPart() const + { + return data_part_; + } + + [[nodiscard]] CustomStorageMergeTreePtr storage() const + { + return storage_; + } + + ~DataPartStorageHolder() + { + storage_->removePartFromMemory(*data_part_); + std::cerr << fmt::format("clean part {}", data_part_->name) << std::endl; + } + +private: + DataPartPtr data_part_; + CustomStorageMergeTreePtr storage_; +}; +using DataPartStorageHolderPtr = std::shared_ptr; + class StorageMergeTreeFactory { public: @@ -50,7 +81,7 @@ class StorageMergeTreeFactory auto & datapart_map_v = datapart_map; if (!datapart_map_v) { - datapart_map_v = std::make_unique>>>( + datapart_map_v = std::make_unique>>>( config.table_metadata_cache_max_count); } else @@ -68,7 +99,8 @@ class StorageMergeTreeFactory private: static std::unique_ptr>> storage_map; - static std::unique_ptr>>> datapart_map; + static std::unique_ptr>>> datapart_map; + static std::recursive_mutex storage_map_mutex; static std::recursive_mutex datapart_mutex; };