diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp index bff4108f28a1..cdbe6c72897c 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp @@ -17,6 +17,8 @@ #include "GlutenDiskHDFS.h" #include + +#include #include #if USE_HDFS @@ -70,6 +72,15 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage() config_prefix); } - +std::unique_ptr GlutenDiskHDFS::writeFile( + const String & path, + size_t buf_size, + DB::WriteMode mode, + const DB::WriteSettings & settings) +{ + if (throttler) + throttler->add(1); + return DiskObjectStorage::writeFile(path, buf_size, mode, settings); +} } #endif \ No newline at end of file diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h index 9caedaae8785..4e375b283951 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h @@ -19,6 +19,7 @@ #include +#include #include #if USE_HDFS #include @@ -43,6 +44,8 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage object_key_prefix = object_key_prefix_; hdfs_object_storage = dynamic_cast(object_storage_.get()); hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/"); + auto max_speed = config.getUInt(config_prefix + ".write_speed", 450); + throttler = std::make_shared(max_speed); } void createDirectory(const String & path) override; @@ -52,11 +55,16 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage void removeDirectory(const String & path) override; DB::DiskObjectStoragePtr createDiskObjectStorage() override; + + std::unique_ptr writeFile(const String& path, size_t buf_size, DB::WriteMode mode, + const DB::WriteSettings& settings) override; + private: String path2AbsPath(const String & path); GlutenHDFSObjectStorage * hdfs_object_storage; String object_key_prefix; + DB::ThrottlerPtr throttler; }; #endif }