Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6590][CH] Support compact mergetree file on s3 #6591

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import io.minio.messages.DeleteObject
import java.io.File
import java.util

import scala.concurrent.duration.DurationInt

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

Expand All @@ -43,6 +45,12 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath + "mergetree-queries-output"

private val client = MinioClient
.builder()
.endpoint(MINIO_ENDPOINT)
.credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
.build()

override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}
Expand All @@ -60,11 +68,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite

override protected def beforeEach(): Unit = {
super.beforeEach()
val client = MinioClient
.builder()
.endpoint(MINIO_ENDPOINT)
.credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
.build()
if (client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
val results =
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
Expand Down Expand Up @@ -168,9 +171,42 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
assertResult(1)(addFiles.size)
assertResult(600572)(addFiles.head.rows)
}
eventually(timeout(10.seconds), interval(2.seconds)) {
verifyS3CompactFileExist("lineitem_mergetree_s3")
}
spark.sql("drop table lineitem_mergetree_s3") // clean up
}

private def verifyS3CompactFileExist(table: String): Unit = {
val args = ListObjectsArgs
.builder()
.bucket(BUCKET_NAME)
.recursive(true)
.prefix(table)
.build()
var objectCount: Int = 0
var metadataGlutenExist: Boolean = false
var metadataBinExist: Boolean = false
var dataBinExist: Boolean = false
client
.listObjects(args)
.forEach(
obj => {
objectCount += 1
if (obj.get().objectName().contains("metadata.gluten")) {
metadataGlutenExist = true
} else if (obj.get().objectName().contains("meta.bin")) {
metadataBinExist = true
} else if (obj.get().objectName().contains("data.bin")) {
dataBinExist = true
}
})
assertResult(5)(objectCount)
assert(metadataGlutenExist)
assert(metadataBinExist)
assert(dataBinExist)
}

test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
Expand Down Expand Up @@ -635,6 +671,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite

withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage" -> "true",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test directly merge parts on s3. See this code branch
image

"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true"
) {
spark.sql(s"""
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
if (path_prefix.empty() && path_suffix.empty())
return changed_paths;
Poco::Util::AbstractConfiguration::Keys disks;
std::unordered_set<String> disk_types = {"s3", "hdfs_gluten", "cache"};
std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten", "cache"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlutenClickHouseWholeStageTransformerSuite configuration of disk type for UT should be changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disk type is already set to s3_gluten.
image

config.keys("storage_configuration.disks", disks);

std::ranges::for_each(
Expand All @@ -590,7 +590,7 @@ std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
changed_paths.emplace_back(final_path);
}
}
else if (disk_type == "s3" || disk_type == "hdfs_gluten")
else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
{
String metadata_path = config.getString(disk_prefix + ".metadata_path", "");
if (!metadata_path.empty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void CompactObjectStorageDiskTransaction::commit()
whole_meta.addObject(key, 0, offset);
metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString());
out.sync();
out.finalize();
};

merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
Expand Down
62 changes: 62 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once


#include "GlutenDiskS3.h"
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Parser/SerializedPlanParser.h>
#include "CompactObjectStorageDiskTransaction.h"

#if USE_AWS_S3
namespace local_engine
{

DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
}

std::unique_ptr<ReadBufferFromFileBase> GlutenDiskS3::readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
ReadSettings copy_settings = settings;
// Threadpool read is not supported for s3 compact version currently
copy_settings.remote_fs_method = RemoteFSReadMethod::read;
return DiskObjectStorage::readFile(path, copy_settings, read_hint, file_size);
}

DiskObjectStoragePtr GlutenDiskS3::createDiskObjectStorage()
{
const auto config_prefix = "storage_configuration.disks." + name;
return std::make_shared<GlutenDiskS3>(
getName(),
object_key_prefix,
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
config_prefix,
object_storage_creator);
}

}

#endif
57 changes: 57 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once


#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Parser/SerializedPlanParser.h>
#include "CompactObjectStorageDiskTransaction.h"

#if USE_AWS_S3
namespace local_engine
{
class GlutenDiskS3 : public DB::DiskObjectStorage
{
public:
GlutenDiskS3(
const String & name_,
const String & object_key_prefix_,
DB::MetadataStoragePtr metadata_storage_,
DB::ObjectStoragePtr object_storage_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> creator)
: DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix),
object_storage_creator(creator) {}

DB::DiskTransactionPtr createTransaction() override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;

DiskObjectStoragePtr createDiskObjectStorage() override;

private:
std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> object_storage_creator;
};
}

#endif
12 changes: 10 additions & 2 deletions cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
#endif

#if USE_AWS_S3
#include <Disks/ObjectStorages/GlutenDiskS3.h>
#endif

#include "registerGlutenDisks.h"

namespace local_engine
Expand Down Expand Up @@ -52,16 +56,20 @@ void registerGlutenDisks(bool global_skip_access_check)
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage_creator = [name, skip_access_check, config_prefix](
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> DB::ObjectStoragePtr
{ return DB::ObjectStorageFactory::instance().create(name, conf, config_prefix, ctx, skip_access_check); };
auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local");

DB::DiskObjectStoragePtr disk = std::make_shared<DB::DiskObjectStorage>(
DB::DiskObjectStoragePtr disk = std::make_shared<local_engine::GlutenDiskS3>(
name,
object_storage->getCommonKeyPrefix(),
std::move(metadata_storage),
std::move(object_storage),
config,
config_prefix);
config_prefix,
object_storage_creator);

disk->startup(context, skip_access_check);
return disk;
Expand Down
40 changes: 11 additions & 29 deletions cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,42 +202,24 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
Stopwatch watch;

// Temporary support for S3
bool s3_disk = dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;

if (s3_disk)
std::vector<String> files;
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)
{
storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
local_relative_path,
dest_storage->getStoragePolicy()->getAnyDisk(),
remote_relative_path,
read_settings,
write_settings,
nullptr);
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
}
else
{
std::vector<String> files;
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)
{
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
}
tx->commit();
}


tx->commit();
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload part {} to disk {} success.",
Expand Down
Loading