diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index 99b212059966..bbfac80a7374 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -652,7 +652,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite it.next() files += 1 } - assertResult(72)(files) + assertResult(4)(files) } } } diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 6afd19152c79..58fe073de427 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,4 +1,4 @@ CH_ORG=Kyligence CH_BRANCH=rebase_ch/20240711 -CH_COMMIT=4ab4aa7fe04 +CH_COMMIT=6632e76fd32940940749b53335ccc4843f3f2638 diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp new file mode 100644 index 000000000000..7a3ba4bed244 --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CompactObjectStorageDiskTransaction.h" + +#include +#include + +namespace local_engine +{ +int getFileOrder(const std::string & path) +{ + if (path.ends_with("columns.txt")) + return 1; + if (path.ends_with("metadata_version.txt")) + return 2; + if (path.ends_with("count.txt")) + return 3; + if (path.ends_with("default_compression_codec.txt")) + return 4; + if (path.ends_with("checksums.txt")) + return 5; + if (path.ends_with("uuid.txt")) + return 6; + if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") || path.ends_with(".cmrk1") || + path.ends_with(".mrk3") || path.ends_with(".mrk2") || path.ends_with(".mrk1")) + return 10; + if (path.ends_with("idx")) + return 20; + if (path.ends_with("bin")) + return 1000; + return 100; +} + +bool isMetaDataFile(const std::string & path) +{ + return !path.ends_with("bin"); +} + +using FileMappings = std::vector>>; + +void CompactObjectStorageDiskTransaction::commit() +{ + auto metadata_tx = disk.getMetadataStorage()->createTransaction(); + std::filesystem::path data_path = std::filesystem::path(prefix_path) / "data.bin"; + std::filesystem::path meta_path = std::filesystem::path(prefix_path) / "meta.bin"; + + auto object_storage = disk.getObjectStorage(); + auto data_key = object_storage->generateObjectKeyForPath(data_path); + auto meta_key = object_storage->generateObjectKeyForPath(meta_path); + + disk.createDirectories(prefix_path); + auto data_write_buffer = object_storage->writeObject(DB::StoredObject(data_key.serialize(), data_path), DB::WriteMode::Rewrite); + auto meta_write_buffer = object_storage->writeObject(DB::StoredObject(meta_key.serialize(), meta_path), DB::WriteMode::Rewrite); + String buffer; + buffer.resize(1024 * 1024); + + auto merge_files = [&](std::ranges::input_range auto && list, DB::WriteBuffer & out, const DB::ObjectStorageKey & key , const String &local_path) + { + size_t offset = 0; + std::ranges::for_each( + list, + [&](auto & item) + { + DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first); + DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath()); + int file_size = 0; + while (int count = read.readBig(buffer.data(), buffer.size())) + { + file_size += count; + out.write(buffer.data(), count); + } + metadata.addObject(key, offset, file_size); + metadata_tx->writeStringToFile(item.first, metadata.serializeToString()); + offset += file_size; + }); + + // You can load the complete file in advance through this metadata original, which improves the download efficiency of mergetree metadata. + DB::DiskObjectStorageMetadata whole_meta(object_storage->getCommonKeyPrefix(), local_path); + whole_meta.addObject(key, 0, offset); + metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString()); + out.sync(); + }; + + merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path); + merge_files(files | std::ranges::views::filter([](auto file) { return isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path); + + metadata_tx->commit(); + files.clear(); +} + +std::unique_ptr CompactObjectStorageDiskTransaction::writeFile( + const std::string & path, + size_t buf_size, + DB::WriteMode mode, + const DB::WriteSettings &, + bool) +{ + if (mode != DB::WriteMode::Rewrite) + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `writeFile` with Append is not implemented"); + } + if (prefix_path.empty()) + prefix_path = path.substr(0, path.find_last_of('/')); + else if (!path.starts_with(prefix_path)) + throw DB::Exception( + DB::ErrorCodes::NOT_IMPLEMENTED, + "Don't support write file in different dirs, path {}, prefix path: {}", + path, + prefix_path); + auto tmp = std::make_shared(tmp_data); + files.emplace_back(path, tmp); + auto tx = disk.getMetadataStorage()->createTransaction(); + tx->createDirectoryRecursive(std::filesystem::path(path).parent_path()); + tx->createEmptyMetadataFile(path); + tx->commit(); + return std::make_unique(tmp->getAbsolutePath(), buf_size); +} +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h new file mode 100644 index 000000000000..e15c362f304a --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +} +} + +namespace local_engine +{ + +class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction { + public: + explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_) + : disk(disk_), tmp_data(tmp_) + { + chassert(!tmp_->isRemote()); + } + + void commit() override; + + void undo() override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `undo` is not implemented"); + } + + void createDirectory(const std::string & path) override + { + disk.createDirectory(path); + } + + void createDirectories(const std::string & path) override + { + disk.createDirectories(path); + } + + void createFile(const std::string & path) override + { + disk.createFile(path); + } + + void clearDirectory(const std::string & path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `clearDirectory` is not implemented"); + } + + void moveDirectory(const std::string & from_path, const std::string & to_path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `moveDirectory` is not implemented"); + } + + void moveFile(const String & from_path, const String & to_path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `moveFile` is not implemented"); + } + + void replaceFile(const std::string & from_path, const std::string & to_path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `replaceFile` is not implemented"); + } + + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const DB::ReadSettings & read_settings, const DB::WriteSettings & write_settings) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `copyFile` is not implemented"); + } + + std::unique_ptr writeFile( /// NOLINT + const std::string & path, + size_t buf_size, + DB::WriteMode mode, + const DB::WriteSettings & settings, + bool /*autocommit */) override; + + + void writeFileUsingBlobWritingFunction(const String & path, DB::WriteMode mode, WriteBlobFunction && write_blob_function) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `writeFileUsingBlobWritingFunction` is not implemented"); + } + + void removeFile(const std::string & path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeFile` is not implemented"); + } + + void removeFileIfExists(const std::string & path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeFileIfExists` is not implemented"); + } + + void removeDirectory(const std::string & path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeDirectory` is not implemented"); + } + + void removeRecursive(const std::string & path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeRecursive` is not implemented"); + } + + void removeSharedFile(const std::string & path, bool keep_shared_data) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedFile` is not implemented"); + } + + void removeSharedRecursive(const std::string & path, bool keep_all_shared_data, const DB::NameSet & file_names_remove_metadata_only) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedRecursive` is not implemented"); + } + + void removeSharedFileIfExists(const std::string & path, bool keep_shared_data) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedFileIfExists` is not implemented"); + } + + void removeSharedFiles(const DB::RemoveBatchRequest & files, bool keep_all_batch_data, const DB::NameSet & file_names_remove_metadata_only) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedFiles` is not implemented"); + } + + void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override + { + disk.setLastModified(path, timestamp); + } + + void chmod(const String & path, mode_t mode) override + { + disk.chmod(path, mode); + } + + void setReadOnly(const std::string & path) override + { + disk.setReadOnly(path); + } + + void createHardLink(const std::string & src_path, const std::string & dst_path) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `createHardLink` is not implemented"); + } + + void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented"); + } + +private: + DB::IDisk & disk; + DB::DiskPtr tmp_data; + std::vector>> files; + String prefix_path = ""; +}; +} + diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp index f207ad232b4f..9c4b390ea8b0 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp @@ -20,12 +20,19 @@ #include #include + +#include "CompactObjectStorageDiskTransaction.h" #if USE_HDFS namespace local_engine { using namespace DB; +DiskTransactionPtr GlutenDiskHDFS::createTransaction() +{ + return std::make_shared(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk()); +} + void GlutenDiskHDFS::createDirectory(const String & path) { DiskObjectStorage::createDirectory(path); diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h index 97a99f1deaba..b0f82a340b1f 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h @@ -21,6 +21,8 @@ #include #include +#include +#include #if USE_HDFS #include #endif @@ -51,6 +53,8 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage throttler = std::make_shared(max_speed); } + DB::DiskTransactionPtr createTransaction() override; + void createDirectory(const String & path) override; void createDirectories(const String & path) override; @@ -72,7 +76,17 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage { DB::ObjectStoragePtr tmp = object_storage_creator(config, context); hdfs_object_storage = typeid_cast>(tmp); - object_storage = hdfs_object_storage; + // only for java ut + bool is_cache = object_storage->supportsCache(); + if (is_cache) + { + auto cache_os = reinterpret_cast(object_storage.get()); + object_storage = hdfs_object_storage; + auto cache = DB::FileCacheFactory::instance().getOrCreate(cache_os->getCacheName(), cache_os->getCacheSettings(), "storage_configuration.disks.hdfs_cache"); + wrapWithCache(cache, cache_os->getCacheSettings(), cache_os->getCacheConfigName()); + } + else + object_storage = hdfs_object_storage; } private: std::shared_ptr hdfs_object_storage; diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index ec0e0932fc76..ad7461b5c247 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -148,9 +148,26 @@ CustomStorageMergeTree::CustomStorageMergeTree( std::atomic CustomStorageMergeTree::part_num; + + +void CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set parts) +{ + auto disk = getDisks().front(); + if (!disk->isRemote()) return; + std::vector meta_paths; + std::ranges::for_each(parts, [&](const String & name) { meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); }); + for (const auto & meta_path: meta_paths) + { + if (!disk->exists(meta_path)) continue; + auto in = disk->readFile(meta_path); + String ignore_data; + readStringUntilEOF(ignore_data, *in); + } +} + std::vector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) { - auto parts_lock = lockParts(); + prefectchMetaDataFile(parts); std::vector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); for (const auto& name : parts) @@ -161,8 +178,6 @@ std::vector CustomStorageMergeTree::loadDataPartsWithNames data_parts.emplace_back(res.part); } - // without it "test mergetree optimize partitioned by one low card column" will log ERROR - calculateColumnAndSecondaryIndexSizesImpl(); return data_parts; } @@ -211,6 +226,7 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart( res.part->loadVersionMetadata(); res.part->setState(to_state); + auto parts_lock = lockParts(); DataPartIteratorByInfo it; bool inserted; @@ -239,6 +255,9 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart( if (res.part->hasLightweightDelete()) has_lightweight_delete_parts.store(true); + // without it "test mergetree optimize partitioned by one low card column" will log ERROR + calculateColumnAndSecondaryIndexSizesImpl(); + LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName()); return res; } diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h index cd507a3ac751..9144aba429c0 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h @@ -65,6 +65,7 @@ class CustomStorageMergeTree final : public MergeTreeData private: SimpleIncrement increment; + void prefectchMetaDataFile(std::unordered_set parts); void startBackgroundMovesIfNeeded() override; std::unique_ptr getDefaultSettings() const override; LoadPartResult loadDataPart( diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp index 8e8e4c556beb..05b2623b4d16 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp @@ -164,7 +164,7 @@ void MergeSparkMergeTreeTask::finish() // MergeTreeData::Transaction transaction(storage, txn.get()); // storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction); // transaction.commit(); - + new_part->getDataPartStoragePtr()->commitTransaction(); ThreadFuzzer::maybeInjectSleep(); ThreadFuzzer::maybeInjectMemoryLimitException(); diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp index 57bb804fa9d1..72b7f0070e80 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp @@ -113,8 +113,6 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & auto item_path = part_path / item.first; auto out = metadata_disk->writeFile(item_path); out->write(item.second.data(), item.second.size()); - out->finalize(); - out->sync(); } }; thread_pool.scheduleOrThrow(job); diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 406f2aaa23df..0f0df0050b26 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -232,18 +232,42 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded() auto read_settings = context->getReadSettings(); auto write_settings = context->getWriteSettings(); Stopwatch watch; + + // Temporary support for S3 + bool s3_disk = dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3"); for (const auto & merge_tree_data_part : new_parts.unsafeGet()) { String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name; String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name; - storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent( + if (s3_disk) + { + storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent( local_relative_path, dest_storage->getStoragePolicy()->getAnyDisk(), remote_relative_path, read_settings, write_settings, nullptr); + } + else + { + std::vector files; + storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files); + auto src_disk = storage->getStoragePolicy()->getAnyDisk(); + auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk(); + auto tx = dest_disk->createTransaction(); + for (const auto & file : files) + { + auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings); + auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings); + copyData(*read_buffer, *write_buffer); + write_buffer->finalize(); + } + tx->commit(); + } + + LOG_DEBUG( &Poco::Logger::get("SparkMergeTreeWriter"), "Upload part {} to disk {} success.", @@ -306,7 +330,6 @@ DB::MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPartAndFin { MergeTreeDataWriter::TemporaryPart temp_part; writeTempPart(temp_part, block_with_partition, metadata_snapshot); - temp_part.finalize(); return temp_part; } @@ -399,6 +422,7 @@ void SparkMergeTreeWriter::writeTempPart( new_data_part->partition = std::move(partition); new_data_part->minmax_idx = std::move(minmax_idx); + data_part_storage->beginTransaction(); SyncGuardPtr sync_guard; if (new_data_part->isStoredOnDisk()) { @@ -441,6 +465,8 @@ void SparkMergeTreeWriter::writeTempPart( temp_part.part = new_data_part; temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)}); + temp_part.finalize(); + data_part_storage->commitTransaction(); } std::vector SparkMergeTreeWriter::getAllPartInfo()