From 83061b5b8fb0c59254465dc11f6bbb43e6796b1e Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Mon, 15 Jul 2024 12:59:27 +0800 Subject: [PATCH] version 2 --- .../CompactObjectStorageDiskTransaction.cpp | 105 +++++++++++++----- .../CompactObjectStorageDiskTransaction.h | 1 + .../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 2 +- .../Storages/CustomStorageMergeTree.cpp | 25 ++++- .../Storages/CustomStorageMergeTree.h | 1 + .../Mergetree/MergeSparkMergeTreeTask.cpp | 2 +- .../Mergetree/SparkMergeTreeWriter.cpp | 1 - 7 files changed, 106 insertions(+), 31 deletions(-) diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp index 261c56699405..e108ffde7433 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp @@ -1,39 +1,87 @@ #include "CompactObjectStorageDiskTransaction.h" #include +#include namespace local_engine { +int getFileOrder(const std::string & path) +{ + if (path.ends_with("columns.txt")) + return 1; + if (path.ends_with("metadata_version.txt")) + return 2; + if (path.ends_with("count.txt")) + return 3; + if (path.ends_with("default_compression_codec.txt")) + return 4; + if (path.ends_with("checksums.txt")) + return 5; + if (path.ends_with("uuid.txt")) + return 6; + if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") || path.ends_with(".cmrk1") || + path.ends_with(".mrk3") || path.ends_with(".mrk2") || path.ends_with(".mrk1")) + return 10; + if (path.ends_with("idx")) + return 20; + if (path.ends_with("bin")) + return 1000; + return 100; +} + +bool isMetaDataFile(const std::string & path) +{ + return !path.ends_with("bin"); +} + +using FileMappings = std::vector>>; void CompactObjectStorageDiskTransaction::commit() { auto metadata_tx = disk.getMetadataStorage()->createTransaction(); - std::filesystem::path path = prefix_path; - path /= "data.bin"; + std::filesystem::path data_path = std::filesystem::path(prefix_path) / "data.bin"; + std::filesystem::path meta_path = std::filesystem::path(prefix_path) / "meta.bin"; + auto object_storage = disk.getObjectStorage(); - auto object_key = object_storage->generateObjectKeyForPath(path); + auto data_key = object_storage->generateObjectKeyForPath(data_path); + auto meta_key = object_storage->generateObjectKeyForPath(meta_path); + disk.createDirectories(prefix_path); - auto write_buffer = object_storage->writeObject(DB::StoredObject(object_key.serialize(), path), DB::WriteMode::Rewrite); + auto data_write_buffer = object_storage->writeObject(DB::StoredObject(data_key.serialize(), data_path), DB::WriteMode::Rewrite); + auto meta_write_buffer = object_storage->writeObject(DB::StoredObject(meta_key.serialize(), meta_path), DB::WriteMode::Rewrite); String buffer; - buffer.resize(1024*1024); - size_t offset = 0; + buffer.resize(1024 * 1024); - for (const auto & item : files) + auto merge_files = [&](std::ranges::input_range auto && list, DB::WriteBuffer & out, const DB::ObjectStorageKey & key , const String &local_path) { - DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first); - DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath()); - int file_size = 0; - while (int count = read.readBig(buffer.data(), buffer.size())) - { - file_size += count; - write_buffer->write(buffer.data(), count); - } - metadata.addObject(object_key, offset, file_size); - metadata_tx->writeStringToFile(item.first, metadata.serializeToString()); - // - offset += file_size; - } - write_buffer->sync(); + size_t offset = 0; + std::ranges::for_each( + list, + [&](auto & item) + { + DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first); + DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath()); + int file_size = 0; + while (int count = read.readBig(buffer.data(), buffer.size())) + { + file_size += count; + out.write(buffer.data(), count); + } + metadata.addObject(key, offset, file_size); + metadata_tx->writeStringToFile(item.first, metadata.serializeToString()); + offset += file_size; + }); + + // You can load the complete file in advance through this metadata original, which improves the download efficiency of mergetree metadata. + DB::DiskObjectStorageMetadata whole_meta(object_storage->getCommonKeyPrefix(), local_path); + whole_meta.addObject(key, 0, offset); + metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString()); + out.sync(); + }; + + merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path); + merge_files(files | std::ranges::views::filter([](auto file) { return isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path); + metadata_tx->commit(); files.clear(); } @@ -42,7 +90,7 @@ std::unique_ptr CompactObjectStorageDiskTransaction const std::string & path, size_t buf_size, DB::WriteMode mode, - const DB::WriteSettings & , + const DB::WriteSettings &, bool) { if (mode != DB::WriteMode::Rewrite) @@ -51,11 +99,18 @@ std::unique_ptr CompactObjectStorageDiskTransaction } if (prefix_path.empty()) prefix_path = path.substr(0, path.find_last_of('/')); - else - if (!path.starts_with(prefix_path)) - throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Don't support write file in different dirs, path {}, prefix path: {}", path, prefix_path); + else if (!path.starts_with(prefix_path)) + throw DB::Exception( + DB::ErrorCodes::NOT_IMPLEMENTED, + "Don't support write file in different dirs, path {}, prefix path: {}", + path, + prefix_path); auto tmp = std::make_shared(tmp_data); files.emplace_back(path, tmp); + auto tx = disk.getMetadataStorage()->createTransaction(); + tx->createDirectoryRecursive(std::filesystem::path(path).parent_path()); + tx->createEmptyMetadataFile(path); + tx->commit(); return std::make_unique(tmp->getAbsolutePath(), buf_size); } } \ No newline at end of file diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h index c8f36a8923ab..7638adbb2827 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h @@ -15,6 +15,7 @@ extern const int NOT_IMPLEMENTED; namespace local_engine { + class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction { public: explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_) diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp index 941bee622701..9c4b390ea8b0 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp @@ -28,7 +28,7 @@ namespace local_engine { using namespace DB; -DB::DiskTransactionPtr GlutenDiskHDFS::createTransaction() +DiskTransactionPtr GlutenDiskHDFS::createTransaction() { return std::make_shared(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk()); } diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index ec0e0932fc76..ad7461b5c247 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -148,9 +148,26 @@ CustomStorageMergeTree::CustomStorageMergeTree( std::atomic CustomStorageMergeTree::part_num; + + +void CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set parts) +{ + auto disk = getDisks().front(); + if (!disk->isRemote()) return; + std::vector meta_paths; + std::ranges::for_each(parts, [&](const String & name) { meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); }); + for (const auto & meta_path: meta_paths) + { + if (!disk->exists(meta_path)) continue; + auto in = disk->readFile(meta_path); + String ignore_data; + readStringUntilEOF(ignore_data, *in); + } +} + std::vector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) { - auto parts_lock = lockParts(); + prefectchMetaDataFile(parts); std::vector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); for (const auto& name : parts) @@ -161,8 +178,6 @@ std::vector CustomStorageMergeTree::loadDataPartsWithNames data_parts.emplace_back(res.part); } - // without it "test mergetree optimize partitioned by one low card column" will log ERROR - calculateColumnAndSecondaryIndexSizesImpl(); return data_parts; } @@ -211,6 +226,7 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart( res.part->loadVersionMetadata(); res.part->setState(to_state); + auto parts_lock = lockParts(); DataPartIteratorByInfo it; bool inserted; @@ -239,6 +255,9 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart( if (res.part->hasLightweightDelete()) has_lightweight_delete_parts.store(true); + // without it "test mergetree optimize partitioned by one low card column" will log ERROR + calculateColumnAndSecondaryIndexSizesImpl(); + LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName()); return res; } diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h index cd507a3ac751..9144aba429c0 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h @@ -65,6 +65,7 @@ class CustomStorageMergeTree final : public MergeTreeData private: SimpleIncrement increment; + void prefectchMetaDataFile(std::unordered_set parts); void startBackgroundMovesIfNeeded() override; std::unique_ptr getDefaultSettings() const override; LoadPartResult loadDataPart( diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp index 8e8e4c556beb..05b2623b4d16 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp @@ -164,7 +164,7 @@ void MergeSparkMergeTreeTask::finish() // MergeTreeData::Transaction transaction(storage, txn.get()); // storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction); // transaction.commit(); - + new_part->getDataPartStoragePtr()->commitTransaction(); ThreadFuzzer::maybeInjectSleep(); ThreadFuzzer::maybeInjectMemoryLimitException(); diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 28b65ab88839..c5cf2ab24c2e 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -241,7 +241,6 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded() auto src_disk = storage->getStoragePolicy()->getAnyDisk(); auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk(); auto tx = dest_disk->createTransaction(); - std::sort(files.begin(), files.end()); for (const auto & file : files) { auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);