diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index ca5b39fff1ac..56b8f056bc25 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -74,7 +74,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) } - ignore("test mergetree table write") { + test("test mergetree table write") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; |""".stripMargin) @@ -157,7 +157,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite spark.sql("drop table lineitem_mergetree_hdfs") } - ignore("test mergetree write with orderby keys / primary keys") { + test("test mergetree write with orderby keys / primary keys") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs; |""".stripMargin) @@ -254,7 +254,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite spark.sql("drop table lineitem_mergetree_orderbykey_hdfs") } - ignore("test mergetree write with partition") { + test("test mergetree write with partition") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs; |""".stripMargin) @@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite spark.sql("drop table lineitem_mergetree_partition_hdfs") } - ignore("test mergetree write with bucket table") { + test("test mergetree write with bucket table") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs; |""".stripMargin) @@ -537,7 +537,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite spark.sql("drop table lineitem_mergetree_bucket_hdfs") } - ignore("test mergetree write with the path based") { + test("test mergetree write with the path based") { val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs" val sourceDF = spark.sql(s""" diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index e374d3f5fd9e..2bbb2945334b 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence CH_BRANCH=rebase_ch/20240616 -CH_COMMIT=e0e4b947245 +CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index a4634c3f3bc7..937beae99a6b 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -765,6 +765,11 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) // We must set the application type to CLIENT to avoid ServerUUID::get() throw exception global_context->setApplicationType(Context::ApplicationType::CLIENT); } + else + { + // just for ut + global_context->updateStorageConfiguration(*config); + } } void BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr config, DB::Settings & settings) @@ -801,10 +806,7 @@ void registerAllFunctions() void registerGlutenDisks() { registerDisks(true); - -#if USE_AWS_S3 registerGlutenDisks(true); -#endif } void BackendInitializerUtil::registerAllFactories() diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp index cdbe6c72897c..07a7aa6bd006 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp @@ -32,16 +32,11 @@ void GlutenDiskHDFS::createDirectory(const String & path) hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str()); } -String GlutenDiskHDFS::path2AbsPath(const String & path) -{ - return getObjectStorage()->generateObjectKeyForPath(path).serialize(); -} - void GlutenDiskHDFS::createDirectories(const String & path) { DiskObjectStorage::createDirectories(path); - auto* hdfs = hdfs_object_storage->getHDFSFS(); - fs::path p = path; + auto * hdfs = hdfs_object_storage->getHDFSFS(); + fs::path p = "/" + path; std::vector paths_created; while (hdfsExists(hdfs, p.c_str()) < 0) { @@ -69,7 +64,8 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage() getMetadataStorage(), getObjectStorage(), SerializedPlanParser::global_context->getConfigRef(), - config_prefix); + config_prefix, + object_storage_creator); } std::unique_ptr GlutenDiskHDFS::writeFile( diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h index 4e375b283951..222b9f8928a3 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h @@ -37,12 +37,15 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage DB::MetadataStoragePtr metadata_storage_, DB::ObjectStoragePtr object_storage_, const Poco::Util::AbstractConfiguration & config, - const String & config_prefix) + const String & config_prefix, + std::function _object_storage_creator) : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix) + , object_key_prefix(object_key_prefix_) + , hdfs_config_prefix(config_prefix) + , object_storage_creator(_object_storage_creator) { - chassert(dynamic_cast(object_storage_.get()) != nullptr); - object_key_prefix = object_key_prefix_; - hdfs_object_storage = dynamic_cast(object_storage_.get()); + hdfs_object_storage = typeid_cast>(object_storage_); hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/"); auto max_speed = config.getUInt(config_prefix + ".write_speed", 450); throttler = std::make_shared(max_speed); @@ -59,12 +62,24 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage std::unique_ptr writeFile(const String& path, size_t buf_size, DB::WriteMode mode, const DB::WriteSettings& settings) override; + void applyNewSettings( + const Poco::Util::AbstractConfiguration & config, + DB::ContextPtr context, + const String & config_prefix, + const DB::DisksMap & map) override + { + DB::ObjectStoragePtr tmp = object_storage_creator(config, context); + hdfs_object_storage = typeid_cast>(tmp); + object_storage = hdfs_object_storage; + } private: - String path2AbsPath(const String & path); - - GlutenHDFSObjectStorage * hdfs_object_storage; + std::shared_ptr hdfs_object_storage; String object_key_prefix; DB::ThrottlerPtr throttler; + const String hdfs_config_prefix; + std::function + object_storage_creator; }; #endif } diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp index 60b82ec845bb..cab87d66d884 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp @@ -38,7 +38,7 @@ DB::ObjectStorageKey local_engine::GlutenHDFSObjectStorage::generateObjectKeyFor initializeHDFSFS(); /// what ever data_source_description.description value is, consider that key as relative key chassert(data_directory.starts_with("/")); - return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / data_directory.substr(1) / path); + return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / data_directory.substr(1), path); } } #endif diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h index a532c98cb87d..da37e1d782db 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h @@ -33,7 +33,7 @@ class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage const String & hdfs_root_path_, SettingsPtr settings_, const Poco::Util::AbstractConfiguration & config_) - : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* lazy_initialize */true), config(config_) + : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* lazy_initialize */false) { } std::unique_ptr readObject( /// NOLINT @@ -43,8 +43,6 @@ class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage std::optional file_size = {}) const override; DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override; hdfsFS getHDFSFS() const { return hdfs_fs.get(); } -private: - const Poco::Util::AbstractConfiguration & config; }; #endif diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp index c080e0525f3c..9e4546498034 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp @@ -121,7 +121,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory) config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024), context->getSettingsRef().hdfs_replication ); - return std::make_unique(uri, std::move(settings), config); + return std::make_shared(uri, std::move(settings), config); }); } #endif diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp index c7e9c5fd32ba..8a920edcce77 100644 --- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp +++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp @@ -40,6 +40,8 @@ void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory); void registerGlutenDisks(bool global_skip_access_check) { auto & factory = DB::DiskFactory::instance(); + +#if USE_AWS_S3 auto creator = [global_skip_access_check]( const String & name, const Poco::Util::AbstractConfiguration & config, @@ -66,7 +68,7 @@ void registerGlutenDisks(bool global_skip_access_check) }; auto & object_factory = DB::ObjectStorageFactory::instance(); -#if USE_AWS_S3 + registerGlutenS3ObjectStorage(object_factory); factory.registerDiskType("s3_gluten", creator); /// For compatibility #endif @@ -82,11 +84,20 @@ void registerGlutenDisks(bool global_skip_access_check) bool) -> DB::DiskPtr { bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false); - auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check); + auto object_storage_creator = [name, skip_access_check, config_prefix]( + const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> DB::ObjectStoragePtr + { return DB::ObjectStorageFactory::instance().create(name, conf, config_prefix, ctx, skip_access_check); }; + auto object_storage = object_storage_creator(config, context); auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local"); DB::DiskObjectStoragePtr disk = std::make_shared( - name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, config_prefix); + name, + object_storage->getCommonKeyPrefix(), + std::move(metadata_storage), + std::move(object_storage), + config, + config_prefix, + object_storage_creator); disk->startup(context, skip_access_check); return disk;