diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 30f443265caeb..c95b788583229 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -30,6 +30,8 @@ import io.minio.messages.DeleteObject import java.io.File import java.util +import scala.concurrent.duration.DurationInt + // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit @@ -43,6 +45,12 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" override protected val queriesResults: String = rootPath + "mergetree-queries-output" + private val client = MinioClient + .builder() + .endpoint(MINIO_ENDPOINT) + .credentials(S3_ACCESS_KEY, S3_SECRET_KEY) + .build() + override protected def createTPCHNotNullTables(): Unit = { createNotNullTPCHTablesInParquet(tablesPath) } @@ -60,11 +68,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite override protected def beforeEach(): Unit = { super.beforeEach() - val client = MinioClient - .builder() - .endpoint(MINIO_ENDPOINT) - .credentials(S3_ACCESS_KEY, S3_SECRET_KEY) - .build() if (client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) { val results = client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build()) @@ -168,9 +171,42 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite assertResult(1)(addFiles.size) assertResult(600572)(addFiles.head.rows) } + eventually(timeout(10.seconds), interval(2.seconds)) { + verifyS3CompactFileExist("lineitem_mergetree_s3") + } spark.sql("drop table lineitem_mergetree_s3") // clean up } + private def verifyS3CompactFileExist(table: String): Unit = { + val args = ListObjectsArgs + .builder() + .bucket(BUCKET_NAME) + .recursive(true) + .prefix(table) + .build() + var objectCount: Int = 0 + var metadataGlutenExist: Boolean = false + var metadataBinExist: Boolean = false + var dataBinExist: Boolean = false + client + .listObjects(args) + .forEach( + obj => { + objectCount += 1 + if (obj.get().objectName().contains("metadata.gluten")) { + metadataGlutenExist = true + } else if (obj.get().objectName().contains("meta.bin")) { + metadataBinExist = true + } else if (obj.get().objectName().contains("data.bin")) { + dataBinExist = true + } + }) + assertResult(5)(objectCount) + assert(metadataGlutenExist) + assert(metadataBinExist) + assert(dataBinExist) + } + test("test mergetree write with orderby keys / primary keys") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3; @@ -635,6 +671,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite withSQLConf( "spark.databricks.delta.optimize.minFileSize" -> "200000000", + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage" -> "true", "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true" ) { spark.sql(s""" diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index b74c18dd14af1..13c6fa46927ba 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -556,7 +556,7 @@ std::vector BackendInitializerUtil::wrapDiskPathConfig( if (path_prefix.empty() && path_suffix.empty()) return changed_paths; Poco::Util::AbstractConfiguration::Keys disks; - std::unordered_set disk_types = {"s3", "hdfs_gluten", "cache"}; + std::unordered_set disk_types = {"s3_gluten", "hdfs_gluten", "cache"}; config.keys("storage_configuration.disks", disks); std::ranges::for_each( @@ -577,7 +577,7 @@ std::vector BackendInitializerUtil::wrapDiskPathConfig( changed_paths.emplace_back(final_path); } } - else if (disk_type == "s3" || disk_type == "hdfs_gluten") + else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten") { String metadata_path = config.getString(disk_prefix + ".metadata_path", ""); if (!metadata_path.empty()) @@ -825,7 +825,7 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) }; global_context->setTemporaryStoragePath(config->getString("tmp_path", getDefaultPath()), 0); - global_context->setPath(config->getString("path", "/")); + global_context->setPath(config->getString("path", "/tmp/ch_default")); String uncompressed_cache_policy = config->getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY); size_t uncompressed_cache_size = config->getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE); diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp index 7a3ba4bed2449..66c4470101a19 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp @@ -94,6 +94,7 @@ void CompactObjectStorageDiskTransaction::commit() whole_meta.addObject(key, 0, offset); metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString()); out.sync(); + out.finalize(); }; merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path); diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp new file mode 100644 index 0000000000000..ff65c648ed21e --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp @@ -0,0 +1,61 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#pragma once + + +#include "GlutenDiskS3.h" +#include +#include +#include "CompactObjectStorageDiskTransaction.h" + +#if USE_AWS_S3 +namespace local_engine +{ + + DB::DiskTransactionPtr GlutenDiskS3::createTransaction() + { + return std::make_shared(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk()); + } + + std::unique_ptr GlutenDiskS3::readFile( + const String & path, + const ReadSettings & settings, + std::optional read_hint, + std::optional file_size) const + { + ReadSettings copy_settings = settings; + // Threadpool read is not supported for s3 compact version currently + copy_settings.remote_fs_method = RemoteFSReadMethod::read; + return DiskObjectStorage::readFile(path, copy_settings, read_hint, file_size); + } + + DiskObjectStoragePtr GlutenDiskS3::createDiskObjectStorage() + { + const auto config_prefix = "storage_configuration.disks." + name; + return std::make_shared( + getName(), + object_key_prefix, + getMetadataStorage(), + getObjectStorage(), + SerializedPlanParser::global_context->getConfigRef(), + config_prefix, + object_storage_creator); + } + +} + +#endif diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h new file mode 100644 index 0000000000000..4ff18e12eb82e --- /dev/null +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h @@ -0,0 +1,56 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#pragma once + + +#include +#include +#include "CompactObjectStorageDiskTransaction.h" + +#if USE_AWS_S3 +namespace local_engine +{ +class GlutenDiskS3 : public DB::DiskObjectStorage +{ +public: + GlutenDiskS3( + const String & name_, + const String & object_key_prefix_, + DB::MetadataStoragePtr metadata_storage_, + DB::ObjectStoragePtr object_storage_, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + std::function creator) + : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix), + object_storage_creator(creator) {} + + DB::DiskTransactionPtr createTransaction() override; + + std::unique_ptr readFile( + const String & path, + const ReadSettings & settings, + std::optional read_hint, + std::optional file_size) const override; + + DiskObjectStoragePtr createDiskObjectStorage() override; + +private: + std::function object_storage_creator; +}; +} + +#endif diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp index 8a920edcce77a..52398b5f2fa74 100644 --- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp +++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp @@ -25,6 +25,10 @@ #include #endif +#if USE_AWS_S3 +#include +#endif + #include "registerGlutenDisks.h" namespace local_engine @@ -52,16 +56,20 @@ void registerGlutenDisks(bool global_skip_access_check) bool) -> DB::DiskPtr { bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false); + auto object_storage_creator = [name, skip_access_check, config_prefix]( + const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> DB::ObjectStoragePtr + { return DB::ObjectStorageFactory::instance().create(name, conf, config_prefix, ctx, skip_access_check); }; auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check); auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local"); - DB::DiskObjectStoragePtr disk = std::make_shared( + DB::DiskObjectStoragePtr disk = std::make_shared( name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, - config_prefix); + config_prefix, + object_storage_creator); disk->startup(context, skip_access_check); return disk; diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 403b845147fa4..ed18a7b9304c0 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -231,42 +231,24 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded() auto read_settings = context->getReadSettings(); auto write_settings = context->getWriteSettings(); Stopwatch watch; - - // Temporary support for S3 - bool s3_disk = dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3"); for (const auto & merge_tree_data_part : new_parts.unsafeGet()) { String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name; String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name; - if (s3_disk) + std::vector files; + storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files); + auto src_disk = storage->getStoragePolicy()->getAnyDisk(); + auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk(); + auto tx = dest_disk->createTransaction(); + for (const auto & file : files) { - storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent( - local_relative_path, - dest_storage->getStoragePolicy()->getAnyDisk(), - remote_relative_path, - read_settings, - write_settings, - nullptr); + auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings); + auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings); + copyData(*read_buffer, *write_buffer); + write_buffer->finalize(); } - else - { - std::vector files; - storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files); - auto src_disk = storage->getStoragePolicy()->getAnyDisk(); - auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk(); - auto tx = dest_disk->createTransaction(); - for (const auto & file : files) - { - auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings); - auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings); - copyData(*read_buffer, *write_buffer); - write_buffer->finalize(); - } - tx->commit(); - } - - + tx->commit(); LOG_DEBUG( &Poco::Logger::get("SparkMergeTreeWriter"), "Upload part {} to disk {} success.",