Skip to content

Commit

Permalink
[GLUTEN-6078][CH] Enable mergetree hdfs suite (#6080)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
(Fixes: #6078)

How was this patch tested?
Test by ut
  • Loading branch information
loneylee authored Jun 19, 2024
1 parent cf3a98e commit c9a6648
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
}

ignore("test mergetree table write") {
test("test mergetree table write") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -157,7 +157,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_hdfs")
}

ignore("test mergetree write with orderby keys / primary keys") {
test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -254,7 +254,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
}

ignore("test mergetree write with partition") {
test("test mergetree write with partition") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_partition_hdfs")
}

ignore("test mergetree write with bucket table") {
test("test mergetree write with bucket table") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -537,7 +537,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}

ignore("test mergetree write with the path based") {
test("test mergetree write with the path based") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"

val sourceDF = spark.sql(s"""
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240616
CH_COMMIT=e0e4b947245
CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da
8 changes: 5 additions & 3 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,11 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
// We must set the application type to CLIENT to avoid ServerUUID::get() throw exception
global_context->setApplicationType(Context::ApplicationType::CLIENT);
}
else
{
// just for ut
global_context->updateStorageConfiguration(*config);
}
}

void BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr config, DB::Settings & settings)
Expand Down Expand Up @@ -801,10 +806,7 @@ void registerAllFunctions()
void registerGlutenDisks()
{
registerDisks(true);

#if USE_AWS_S3
registerGlutenDisks(true);
#endif
}

void BackendInitializerUtil::registerAllFactories()
Expand Down
12 changes: 4 additions & 8 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,11 @@ void GlutenDiskHDFS::createDirectory(const String & path)
hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
}

String GlutenDiskHDFS::path2AbsPath(const String & path)
{
return getObjectStorage()->generateObjectKeyForPath(path).serialize();
}

void GlutenDiskHDFS::createDirectories(const String & path)
{
DiskObjectStorage::createDirectories(path);
auto* hdfs = hdfs_object_storage->getHDFSFS();
fs::path p = path;
auto * hdfs = hdfs_object_storage->getHDFSFS();
fs::path p = "/" + path;
std::vector<std::string> paths_created;
while (hdfsExists(hdfs, p.c_str()) < 0)
{
Expand Down Expand Up @@ -69,7 +64,8 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
config_prefix);
config_prefix,
object_storage_creator);
}

std::unique_ptr<DB::WriteBufferFromFileBase> GlutenDiskHDFS::writeFile(
Expand Down
29 changes: 22 additions & 7 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage
DB::MetadataStoragePtr metadata_storage_,
DB::ObjectStoragePtr object_storage_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
const String & config_prefix,
std::function<DB::ObjectStoragePtr(
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> _object_storage_creator)
: DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix)
, object_key_prefix(object_key_prefix_)
, hdfs_config_prefix(config_prefix)
, object_storage_creator(_object_storage_creator)
{
chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get()) != nullptr);
object_key_prefix = object_key_prefix_;
hdfs_object_storage = dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
hdfs_object_storage = typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(object_storage_);
hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
auto max_speed = config.getUInt(config_prefix + ".write_speed", 450);
throttler = std::make_shared<DB::Throttler>(max_speed);
Expand All @@ -59,12 +62,24 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage
std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path, size_t buf_size, DB::WriteMode mode,
const DB::WriteSettings& settings) override;

void applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
DB::ContextPtr context,
const String & config_prefix,
const DB::DisksMap & map) override
{
DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
hdfs_object_storage = typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
object_storage = hdfs_object_storage;
}
private:
String path2AbsPath(const String & path);

GlutenHDFSObjectStorage * hdfs_object_storage;
std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
String object_key_prefix;
DB::ThrottlerPtr throttler;
const String hdfs_config_prefix;
std::function<DB::ObjectStoragePtr(
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)>
object_storage_creator;
};
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ DB::ObjectStorageKey local_engine::GlutenHDFSObjectStorage::generateObjectKeyFor
initializeHDFSFS();
/// what ever data_source_description.description value is, consider that key as relative key
chassert(data_directory.starts_with("/"));
return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / data_directory.substr(1) / path);
return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / data_directory.substr(1), path);
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
const String & hdfs_root_path_,
SettingsPtr settings_,
const Poco::Util::AbstractConfiguration & config_)
: HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* lazy_initialize */true), config(config_)
: HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* lazy_initialize */false)
{
}
std::unique_ptr<DB::ReadBufferFromFileBase> readObject( /// NOLINT
Expand All @@ -43,8 +43,6 @@ class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
std::optional<size_t> file_size = {}) const override;
DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
hdfsFS getHDFSFS() const { return hdfs_fs.get(); }
private:
const Poco::Util::AbstractConfiguration & config;
};
#endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
context->getSettingsRef().hdfs_replication
);
return std::make_unique<GlutenHDFSObjectStorage>(uri, std::move(settings), config);
return std::make_shared<GlutenHDFSObjectStorage>(uri, std::move(settings), config);
});
}
#endif
Expand Down
17 changes: 14 additions & 3 deletions cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory);
void registerGlutenDisks(bool global_skip_access_check)
{
auto & factory = DB::DiskFactory::instance();

#if USE_AWS_S3
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
Expand All @@ -66,7 +68,7 @@ void registerGlutenDisks(bool global_skip_access_check)
};

auto & object_factory = DB::ObjectStorageFactory::instance();
#if USE_AWS_S3

registerGlutenS3ObjectStorage(object_factory);
factory.registerDiskType("s3_gluten", creator); /// For compatibility
#endif
Expand All @@ -82,11 +84,20 @@ void registerGlutenDisks(bool global_skip_access_check)
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto object_storage_creator = [name, skip_access_check, config_prefix](
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> DB::ObjectStoragePtr
{ return DB::ObjectStorageFactory::instance().create(name, conf, config_prefix, ctx, skip_access_check); };
auto object_storage = object_storage_creator(config, context);
auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local");

DB::DiskObjectStoragePtr disk = std::make_shared<local_engine::GlutenDiskHDFS>(
name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, config_prefix);
name,
object_storage->getCommonKeyPrefix(),
std::move(metadata_storage),
std::move(object_storage),
config,
config_prefix,
object_storage_creator);

disk->startup(context, skip_access_check);
return disk;
Expand Down

0 comments on commit c9a6648

Please sign in to comment.