Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6078][CH] Enable mergetree hdfs suite #6080

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
}

ignore("test mergetree table write") {
test("test mergetree table write") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -157,7 +157,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_hdfs")
}

ignore("test mergetree write with orderby keys / primary keys") {
test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -254,7 +254,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
}

ignore("test mergetree write with partition") {
test("test mergetree write with partition") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_partition_hdfs")
}

ignore("test mergetree write with bucket table") {
test("test mergetree write with bucket table") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -537,7 +537,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}

ignore("test mergetree write with the path based") {
test("test mergetree write with the path based") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"

val sourceDF = spark.sql(s"""
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240616
CH_COMMIT=e0e4b947245
CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da
8 changes: 5 additions & 3 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,11 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
// We must set the application type to CLIENT to avoid ServerUUID::get() throw exception
global_context->setApplicationType(Context::ApplicationType::CLIENT);
}
else
{
// just for ut
global_context->updateStorageConfiguration(*config);
}
}

void BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr config, DB::Settings & settings)
Expand Down Expand Up @@ -801,10 +806,7 @@ void registerAllFunctions()
void registerGlutenDisks()
{
registerDisks(true);

#if USE_AWS_S3
registerGlutenDisks(true);
#endif
}

void BackendInitializerUtil::registerAllFactories()
Expand Down
12 changes: 4 additions & 8 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,11 @@ void GlutenDiskHDFS::createDirectory(const String & path)
hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
}

String GlutenDiskHDFS::path2AbsPath(const String & path)
{
return getObjectStorage()->generateObjectKeyForPath(path).serialize();
}

void GlutenDiskHDFS::createDirectories(const String & path)
{
DiskObjectStorage::createDirectories(path);
auto* hdfs = hdfs_object_storage->getHDFSFS();
fs::path p = path;
auto * hdfs = hdfs_object_storage->getHDFSFS();
fs::path p = "/" + path;
std::vector<std::string> paths_created;
while (hdfsExists(hdfs, p.c_str()) < 0)
{
Expand Down Expand Up @@ -69,7 +64,8 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
config_prefix);
config_prefix,
object_storage_creator);
}

std::unique_ptr<DB::WriteBufferFromFileBase> GlutenDiskHDFS::writeFile(
Expand Down
29 changes: 22 additions & 7 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage
DB::MetadataStoragePtr metadata_storage_,
DB::ObjectStoragePtr object_storage_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
const String & config_prefix,
std::function<DB::ObjectStoragePtr(
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> _object_storage_creator)
: DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix)
, object_key_prefix(object_key_prefix_)
, hdfs_config_prefix(config_prefix)
, object_storage_creator(_object_storage_creator)
{
chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get()) != nullptr);
object_key_prefix = object_key_prefix_;
hdfs_object_storage = dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
hdfs_object_storage = typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(object_storage_);
hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
auto max_speed = config.getUInt(config_prefix + ".write_speed", 450);
throttler = std::make_shared<DB::Throttler>(max_speed);
Expand All @@ -59,12 +62,24 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage
std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path, size_t buf_size, DB::WriteMode mode,
const DB::WriteSettings& settings) override;

void applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
DB::ContextPtr context,
const String & config_prefix,
const DB::DisksMap & map) override
{
DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
hdfs_object_storage = typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
object_storage = hdfs_object_storage;
}
private:
String path2AbsPath(const String & path);

GlutenHDFSObjectStorage * hdfs_object_storage;
std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
String object_key_prefix;
DB::ThrottlerPtr throttler;
const String hdfs_config_prefix;
std::function<DB::ObjectStoragePtr(
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)>
object_storage_creator;
};
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ DB::ObjectStorageKey local_engine::GlutenHDFSObjectStorage::generateObjectKeyFor
initializeHDFSFS();
/// what ever data_source_description.description value is, consider that key as relative key
chassert(data_directory.starts_with("/"));
return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / data_directory.substr(1) / path);
return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / data_directory.substr(1), path);
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
const String & hdfs_root_path_,
SettingsPtr settings_,
const Poco::Util::AbstractConfiguration & config_)
: HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* lazy_initialize */true), config(config_)
: HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* lazy_initialize */false)
{
}
std::unique_ptr<DB::ReadBufferFromFileBase> readObject( /// NOLINT
Expand All @@ -43,8 +43,6 @@ class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
std::optional<size_t> file_size = {}) const override;
DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
hdfsFS getHDFSFS() const { return hdfs_fs.get(); }
private:
const Poco::Util::AbstractConfiguration & config;
};
#endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
context->getSettingsRef().hdfs_replication
);
return std::make_unique<GlutenHDFSObjectStorage>(uri, std::move(settings), config);
return std::make_shared<GlutenHDFSObjectStorage>(uri, std::move(settings), config);
});
}
#endif
Expand Down
17 changes: 14 additions & 3 deletions cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory);
void registerGlutenDisks(bool global_skip_access_check)
{
auto & factory = DB::DiskFactory::instance();

#if USE_AWS_S3
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
Expand All @@ -66,7 +68,7 @@ void registerGlutenDisks(bool global_skip_access_check)
};

auto & object_factory = DB::ObjectStorageFactory::instance();
#if USE_AWS_S3

registerGlutenS3ObjectStorage(object_factory);
factory.registerDiskType("s3_gluten", creator); /// For compatibility
#endif
Expand All @@ -82,11 +84,20 @@ void registerGlutenDisks(bool global_skip_access_check)
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto object_storage_creator = [name, skip_access_check, config_prefix](
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> DB::ObjectStoragePtr
{ return DB::ObjectStorageFactory::instance().create(name, conf, config_prefix, ctx, skip_access_check); };
auto object_storage = object_storage_creator(config, context);
auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local");

DB::DiskObjectStoragePtr disk = std::make_shared<local_engine::GlutenDiskHDFS>(
name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, config_prefix);
name,
object_storage->getCommonKeyPrefix(),
std::move(metadata_storage),
std::move(object_storage),
config,
config_prefix,
object_storage_creator);

disk->startup(context, skip_access_check);
return disk;
Expand Down
Loading