Skip to content

Commit

Permalink
[GLUTEN-6590] Support compact mergetree file on s3
Browse files Browse the repository at this point in the history
  • Loading branch information
lwz9103 committed Jul 25, 2024
1 parent 751fa83 commit c7d6a02
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import io.minio.messages.DeleteObject
import java.io.File
import java.util

import scala.concurrent.duration.DurationInt

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

Expand All @@ -43,6 +45,12 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath + "mergetree-queries-output"

private val client = MinioClient
.builder()
.endpoint(MINIO_ENDPOINT)
.credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
.build()

override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}
Expand All @@ -60,11 +68,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite

override protected def beforeEach(): Unit = {
super.beforeEach()
val client = MinioClient
.builder()
.endpoint(MINIO_ENDPOINT)
.credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
.build()
if (client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
val results =
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
Expand Down Expand Up @@ -168,9 +171,42 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
assertResult(1)(addFiles.size)
assertResult(600572)(addFiles.head.rows)
}
eventually(timeout(10.seconds), interval(2.seconds)) {
verifyS3CompactFileExist("lineitem_mergetree_s3")
}
spark.sql("drop table lineitem_mergetree_s3") // clean up
}

private def verifyS3CompactFileExist(table: String): Unit = {
val args = ListObjectsArgs
.builder()
.bucket(BUCKET_NAME)
.recursive(true)
.prefix(table)
.build()
var objectCount: Int = 0
var metadataGlutenExist: Boolean = false
var metadataBinExist: Boolean = false
var dataBinExist: Boolean = false
client
.listObjects(args)
.forEach(
obj => {
objectCount += 1
if (obj.get().objectName().contains("metadata.gluten")) {
metadataGlutenExist = true
} else if (obj.get().objectName().contains("meta.bin")) {
metadataBinExist = true
} else if (obj.get().objectName().contains("data.bin")) {
dataBinExist = true
}
})
assertResult(5)(objectCount)
assert(metadataGlutenExist)
assert(metadataBinExist)
assert(dataBinExist)
}

test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
Expand Down Expand Up @@ -635,6 +671,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite

withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage" -> "true",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true"
) {
spark.sql(s"""
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
if (path_prefix.empty() && path_suffix.empty())
return changed_paths;
Poco::Util::AbstractConfiguration::Keys disks;
std::unordered_set<String> disk_types = {"s3", "hdfs_gluten", "cache"};
std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten", "cache"};
config.keys("storage_configuration.disks", disks);

std::ranges::for_each(
Expand All @@ -577,7 +577,7 @@ std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
changed_paths.emplace_back(final_path);
}
}
else if (disk_type == "s3" || disk_type == "hdfs_gluten")
else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
{
String metadata_path = config.getString(disk_prefix + ".metadata_path", "");
if (!metadata_path.empty())
Expand Down Expand Up @@ -825,7 +825,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
};

global_context->setTemporaryStoragePath(config->getString("tmp_path", getDefaultPath()), 0);
global_context->setPath(config->getString("path", "/"));
global_context->setPath(config->getString("path", "/tmp/ch_default"));

String uncompressed_cache_policy = config->getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
size_t uncompressed_cache_size = config->getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void CompactObjectStorageDiskTransaction::commit()
whole_meta.addObject(key, 0, offset);
metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString());
out.sync();
out.finalize();
};

merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
Expand Down
61 changes: 61 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once


#include "GlutenDiskS3.h"
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Parser/SerializedPlanParser.h>
#include "CompactObjectStorageDiskTransaction.h"

#if USE_AWS_S3
namespace local_engine
{

DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
}

std::unique_ptr<ReadBufferFromFileBase> GlutenDiskS3::readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
ReadSettings copy_settings = settings;
// Threadpool read is not supported for s3 compact version currently
copy_settings.remote_fs_method = RemoteFSReadMethod::read;
return DiskObjectStorage::readFile(path, copy_settings, read_hint, file_size);
}

DiskObjectStoragePtr GlutenDiskS3::createDiskObjectStorage()
{
const auto config_prefix = "storage_configuration.disks." + name;
return std::make_shared<GlutenDiskS3>(
getName(),
object_key_prefix,
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
config_prefix,
object_storage_creator);
}

}

#endif
56 changes: 56 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once


#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Parser/SerializedPlanParser.h>
#include "CompactObjectStorageDiskTransaction.h"

#if USE_AWS_S3
namespace local_engine
{
class GlutenDiskS3 : public DB::DiskObjectStorage
{
public:
GlutenDiskS3(
const String & name_,
const String & object_key_prefix_,
DB::MetadataStoragePtr metadata_storage_,
DB::ObjectStoragePtr object_storage_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> creator)
: DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix),
object_storage_creator(creator) {}

DB::DiskTransactionPtr createTransaction() override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;

DiskObjectStoragePtr createDiskObjectStorage() override;

private:
std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> object_storage_creator;
};
}

#endif
12 changes: 10 additions & 2 deletions cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
#endif

#if USE_AWS_S3
#include <Disks/ObjectStorages/GlutenDiskS3.h>
#endif

#include "registerGlutenDisks.h"

namespace local_engine
Expand Down Expand Up @@ -52,16 +56,20 @@ void registerGlutenDisks(bool global_skip_access_check)
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage_creator = [name, skip_access_check, config_prefix](
const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> DB::ObjectStoragePtr
{ return DB::ObjectStorageFactory::instance().create(name, conf, config_prefix, ctx, skip_access_check); };
auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local");

DB::DiskObjectStoragePtr disk = std::make_shared<DB::DiskObjectStorage>(
DB::DiskObjectStoragePtr disk = std::make_shared<local_engine::GlutenDiskS3>(
name,
object_storage->getCommonKeyPrefix(),
std::move(metadata_storage),
std::move(object_storage),
config,
config_prefix);
config_prefix,
object_storage_creator);

disk->startup(context, skip_access_check);
return disk;
Expand Down
40 changes: 11 additions & 29 deletions cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,42 +231,24 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
Stopwatch watch;

// Temporary support for S3
bool s3_disk = dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;

if (s3_disk)
std::vector<String> files;
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)
{
storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
local_relative_path,
dest_storage->getStoragePolicy()->getAnyDisk(),
remote_relative_path,
read_settings,
write_settings,
nullptr);
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
}
else
{
std::vector<String> files;
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)
{
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
}
tx->commit();
}


tx->commit();
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload part {} to disk {} success.",
Expand Down

0 comments on commit c7d6a02

Please sign in to comment.