From d76c05c7dd731b4e6269afca7e7b82498941131a Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Thu, 7 Nov 2024 12:16:48 -0800 Subject: [PATCH] Add support for S3 bucket config (#11321) Summary: Allow all `hive.s3` options to be set on a per-bucket basis. The bucket-specific option is set by replacing the hive.s3. prefix on an option with hive.s3.bucket.BUCKETNAME., where BUCKETNAME is the name of the bucket. When connecting to a bucket, all options explicitly set will override the base hive.s3. values. These semantics are similar to the Apache Hadoop-Aws module. https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html Spark uses this for ETL workloads between 2 buckets. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11321 Reviewed By: Yuhta Differential Revision: D65365858 Pulled By: kgpai fbshipit-source-id: 0a0cf401ef22d7270da7e932a99c0d8614597299 --- velox/connectors/hive/HiveConfig.cpp | 68 ------ velox/connectors/hive/HiveConfig.h | 80 ------- .../hive/storage_adapters/s3fs/CMakeLists.txt | 7 +- .../s3fs/RegisterS3FileSystem.cpp | 65 +++--- .../hive/storage_adapters/s3fs/S3Config.cpp | 77 ++++++ .../hive/storage_adapters/s3fs/S3Config.h | 221 ++++++++++++++++++ .../storage_adapters/s3fs/S3FileSystem.cpp | 116 +++++---- .../hive/storage_adapters/s3fs/S3FileSystem.h | 12 +- .../hive/storage_adapters/s3fs/S3Util.cpp | 4 +- .../hive/storage_adapters/s3fs/S3Util.h | 53 +++-- .../hive/storage_adapters/s3fs/S3WriteFile.h | 2 +- .../s3fs/tests/CMakeLists.txt | 3 +- .../s3fs/tests/S3ConfigTest.cpp | 93 ++++++++ .../s3fs/tests/S3FileSystemFinalizeTest.cpp | 8 +- .../tests/S3FileSystemRegistrationTest.cpp | 4 +- .../s3fs/tests/S3FileSystemTest.cpp | 42 ++-- .../s3fs/tests/S3InsertTest.cpp | 4 +- .../s3fs/tests/S3MultipleEndpointsTest.cpp | 185 +++++++++------ .../s3fs/tests/S3ReadTest.cpp | 8 +- .../s3fs/tests/S3UtilTest.cpp | 21 +- .../connectors/hive/tests/HiveConfigTest.cpp | 36 --- 21 files changed, 685 insertions(+), 424 deletions(-) create mode 100644 velox/connectors/hive/storage_adapters/s3fs/S3Config.cpp create mode 100644 velox/connectors/hive/storage_adapters/s3fs/S3Config.h create mode 100644 velox/connectors/hive/storage_adapters/s3fs/tests/S3ConfigTest.cpp diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 837d18aeeb4b..12aa51015a20 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -71,70 +71,6 @@ bool HiveConfig::immutablePartitions() const { return config_->get(kImmutablePartitions, false); } -bool HiveConfig::s3UseVirtualAddressing() const { - return !config_->get(kS3PathStyleAccess, false); -} - -std::string HiveConfig::s3GetLogLevel() const { - return config_->get(kS3LogLevel, std::string("FATAL")); -} - -bool HiveConfig::s3UseSSL() const { - return config_->get(kS3SSLEnabled, true); -} - -bool HiveConfig::s3UseInstanceCredentials() const { - return config_->get(kS3UseInstanceCredentials, false); -} - -std::string HiveConfig::s3Endpoint() const { - return config_->get(kS3Endpoint, std::string("")); -} - -std::optional HiveConfig::s3AccessKey() const { - return static_cast>( - config_->get(kS3AwsAccessKey)); -} - -std::optional HiveConfig::s3SecretKey() const { - return static_cast>( - config_->get(kS3AwsSecretKey)); -} - -std::optional HiveConfig::s3IAMRole() const { - return static_cast>( - config_->get(kS3IamRole)); -} - -std::string HiveConfig::s3IAMRoleSessionName() const { - return config_->get(kS3IamRoleSessionName, std::string("velox-session")); -} - -std::optional HiveConfig::s3ConnectTimeout() const { - return static_cast>( - config_->get(kS3ConnectTimeout)); -} - -std::optional HiveConfig::s3SocketTimeout() const { - return static_cast>( - config_->get(kS3SocketTimeout)); -} - -std::optional HiveConfig::s3MaxConnections() const { - return static_cast>( - config_->get(kS3MaxConnections)); -} - -std::optional HiveConfig::s3MaxAttempts() const { - return static_cast>( - config_->get(kS3MaxAttempts)); -} - -std::optional HiveConfig::s3RetryMode() const { - return static_cast>( - config_->get(kS3RetryMode)); -} - std::string HiveConfig::gcsEndpoint() const { return config_->get(kGCSEndpoint, std::string("")); } @@ -325,10 +261,6 @@ uint64_t HiveConfig::filePreloadThreshold() const { return config_->get(kFilePreloadThreshold, 8UL << 20); } -bool HiveConfig::s3UseProxyFromEnv() const { - return config_->get(kS3UseProxyFromEnv, false); -} - uint8_t HiveConfig::readTimestampUnit(const config::ConfigBase* session) const { const auto unit = session->get( kReadTimestampUnitSession, diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 800bf498ac0b..e57f05e22d08 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -57,53 +57,6 @@ class HiveConfig { static constexpr const char* kImmutablePartitions = "hive.immutable-partitions"; - /// Virtual addressing is used for AWS S3 and is the default - /// (path-style-access is false). Path access style is used for some on-prem - /// systems like Minio. - static constexpr const char* kS3PathStyleAccess = "hive.s3.path-style-access"; - - /// Log granularity of AWS C++ SDK. - static constexpr const char* kS3LogLevel = "hive.s3.log-level"; - - /// Use HTTPS to communicate with the S3 API. - static constexpr const char* kS3SSLEnabled = "hive.s3.ssl.enabled"; - - /// Use the EC2 metadata service to retrieve API credentials. - static constexpr const char* kS3UseInstanceCredentials = - "hive.s3.use-instance-credentials"; - - /// The S3 storage endpoint server. This can be used to connect to an - /// S3-compatible storage system instead of AWS. - static constexpr const char* kS3Endpoint = "hive.s3.endpoint"; - - /// Default AWS access key to use. - static constexpr const char* kS3AwsAccessKey = "hive.s3.aws-access-key"; - - /// Default AWS secret key to use. - static constexpr const char* kS3AwsSecretKey = "hive.s3.aws-secret-key"; - - /// IAM role to assume. - static constexpr const char* kS3IamRole = "hive.s3.iam-role"; - - /// Session name associated with the IAM role. - static constexpr const char* kS3IamRoleSessionName = - "hive.s3.iam-role-session-name"; - - /// Socket connect timeout. - static constexpr const char* kS3ConnectTimeout = "hive.s3.connect-timeout"; - - /// Socket read timeout. - static constexpr const char* kS3SocketTimeout = "hive.s3.socket-timeout"; - - /// Maximum concurrent TCP connections for a single http client. - static constexpr const char* kS3MaxConnections = "hive.s3.max-connections"; - - /// Maximum retry attempts for a single http client. - static constexpr const char* kS3MaxAttempts = "hive.s3.max-attempts"; - - /// Retry mode for a single http client. - static constexpr const char* kS3RetryMode = "hive.s3.retry-mode"; - /// The GCS storage endpoint server. static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint"; @@ -244,9 +197,6 @@ class HiveConfig { static constexpr const char* kSortWriterFinishTimeSliceLimitMsSession = "sort_writer_finish_time_slice_limit_ms"; - static constexpr const char* kS3UseProxyFromEnv = - "hive.s3.use-proxy-from-env"; - // The unit for reading timestamps from files. static constexpr const char* kReadTimestampUnit = "hive.reader.timestamp-unit"; @@ -263,34 +213,6 @@ class HiveConfig { bool immutablePartitions() const; - bool s3UseVirtualAddressing() const; - - std::string s3GetLogLevel() const; - - bool s3UseSSL() const; - - bool s3UseInstanceCredentials() const; - - std::string s3Endpoint() const; - - std::optional s3AccessKey() const; - - std::optional s3SecretKey() const; - - std::optional s3IAMRole() const; - - std::string s3IAMRoleSessionName() const; - - std::optional s3ConnectTimeout() const; - - std::optional s3SocketTimeout() const; - - std::optional s3MaxConnections() const; - - std::optional s3MaxAttempts() const; - - std::optional s3RetryMode() const; - std::string gcsEndpoint() const; std::string gcsCredentialsPath() const; @@ -364,8 +286,6 @@ class HiveConfig { uint64_t filePreloadThreshold() const; - bool s3UseProxyFromEnv() const; - // Returns the timestamp unit used when reading timestamps from files. uint8_t readTimestampUnit(const config::ConfigBase* session) const; diff --git a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt index 96d6031c1728..4a35df8ef475 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt @@ -16,7 +16,12 @@ velox_add_library(velox_s3fs RegisterS3FileSystem.cpp) if(VELOX_ENABLE_S3) - velox_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp) + velox_sources( + velox_s3fs + PRIVATE + S3FileSystem.cpp + S3Util.cpp + S3Config.cpp) velox_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS}) velox_link_libraries(velox_s3fs velox_dwio_common Folly::folly diff --git a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp index 6f746b1c1bc4..eca9a4f10a34 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp @@ -15,14 +15,12 @@ */ #ifdef VELOX_ENABLE_S3 -#include "velox/connectors/hive/HiveConfig.h" // @manual +#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" // @manual #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" // @manual #include "velox/dwio/common/FileSink.h" #endif -#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" // @manual - namespace facebook::velox::filesystems { #ifdef VELOX_ENABLE_S3 @@ -31,44 +29,53 @@ using FileSystemMap = folly::Synchronized< /// Multiple S3 filesystems are supported. /// Key is the endpoint value specified in the config using hive.s3.endpoint. -/// If the endpoint is empty, it will default to AWS S3. +/// If the endpoint is empty, it will default to AWS S3 Library. +/// Different S3 buckets can be accessed with different client configurations. +/// This allows for different endpoints, data read and write strategies. +/// The bucket specific option is set by replacing the hive.s3. prefix on an +/// option with hive.s3.bucket.BUCKETNAME., where BUCKETNAME is the name of the +/// bucket. When connecting to a bucket, all options explicitly set will +/// override the base hive.s3. values. + FileSystemMap& fileSystems() { static FileSystemMap instances; return instances; } -std::string getS3Identity(const std::shared_ptr& config) { - HiveConfig hiveConfig = HiveConfig(config); - auto endpoint = hiveConfig.s3Endpoint(); - if (!endpoint.empty()) { - // The identity is the endpoint. - return endpoint; - } - // Default key value. - return "aws-s3-key"; -} - std::shared_ptr fileSystemGenerator( std::shared_ptr properties, - std::string_view /*filePath*/) { - std::shared_ptr config = - std::make_shared( - std::unordered_map()); - if (properties) { - config = std::make_shared(properties->rawConfigsCopy()); + std::string_view s3Path) { + std::string bucketName, key; + getBucketAndKeyFromPath(getPath(s3Path), bucketName, key); + auto identity = S3Config::identity(bucketName, properties); + + // Check if an instance exists with a read lock (shared). + auto fs = fileSystems().withRLock( + [&](auto& instanceMap) -> std::shared_ptr { + auto iterator = instanceMap.find(identity); + if (iterator != instanceMap.end()) { + return iterator->second; + } + return nullptr; + }); + if (fs != nullptr) { + return fs; } - const auto s3Identity = getS3Identity(config); return fileSystems().withWLock( [&](auto& instanceMap) -> std::shared_ptr { - initializeS3(config.get()); - auto iterator = instanceMap.find(s3Identity); - if (iterator == instanceMap.end()) { - auto fs = std::make_shared(properties); - instanceMap.insert({s3Identity, fs}); - return fs; + // Repeat the checks with a write lock. + auto iterator = instanceMap.find(identity); + if (iterator != instanceMap.end()) { + return iterator->second; } - return iterator->second; + + auto logLevel = + properties->get(S3Config::kS3LogLevel, std::string("FATAL")); + initializeS3(logLevel); + auto fs = std::make_shared(bucketName, properties); + instanceMap.insert({identity, fs}); + return fs; }); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3Config.cpp new file mode 100644 index 000000000000..9f3f17a127e3 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.cpp @@ -0,0 +1,77 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" + +#include "velox/common/config/Config.h" + +namespace facebook::velox::filesystems { + +std::string S3Config::identity( + std::string_view bucket, + std::shared_ptr config) { + auto bucketEndpoint = bucketConfigKey(Keys::kEndpoint, bucket); + if (config->valueExists(bucketEndpoint)) { + auto value = config->get(bucketEndpoint); + if (value.has_value()) { + return value.value(); + } + } + auto baseEndpoint = baseConfigKey(Keys::kEndpoint); + if (config->valueExists(baseEndpoint)) { + auto value = config->get(baseEndpoint); + if (value.has_value()) { + return value.value(); + } + } + return kDefaultS3Identity; +} + +S3Config::S3Config( + std::string_view bucket, + const std::shared_ptr properties) { + for (int key = static_cast(Keys::kBegin); + key < static_cast(Keys::kEnd); + key++) { + auto s3Key = static_cast(key); + auto value = S3Config::configTraits().find(s3Key)->second; + auto configSuffix = value.first; + auto configDefault = value.second; + + // Set bucket S3 config "hive.s3.bucket.*" if present. + std::stringstream bucketConfig; + bucketConfig << kS3BucketPrefix << bucket << "." << configSuffix; + auto configVal = static_cast>( + properties->get(bucketConfig.str())); + if (configVal.has_value()) { + config_[s3Key] = configVal.value(); + } else { + // Set base config "hive.s3.*" if present. + std::stringstream baseConfig; + baseConfig << kS3Prefix << configSuffix; + configVal = static_cast>( + properties->get(baseConfig.str())); + if (configVal.has_value()) { + config_[s3Key] = configVal.value(); + } else { + // Set the default value. + config_[s3Key] = configDefault; + } + } + } +} + +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Config.h b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h new file mode 100644 index 000000000000..6f17a9be6420 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Config.h @@ -0,0 +1,221 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::config { +class ConfigBase; +} + +namespace facebook::velox::filesystems { + +/// Build config required to initialize an S3FileSystem instance. +/// All hive.s3 options can be set on a per-bucket basis. +/// The bucket-specific option is set by replacing the hive.s3. prefix on an +/// option with hive.s3.bucket.BUCKETNAME., where BUCKETNAME is the name of the +/// bucket. +/// When connecting to a bucket, all options explicitly set will override the +/// base hive.s3. values. +/// These semantics are similar to the Apache Hadoop-Aws module. +/// https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html +class S3Config { + public: + S3Config() = delete; + + /// S3 config prefix. + static constexpr const char* kS3Prefix = "hive.s3."; + + /// S3 bucket config prefix + static constexpr const char* kS3BucketPrefix = "hive.s3.bucket."; + + /// Log granularity of AWS C++ SDK. + static constexpr const char* kS3LogLevel = "hive.s3.log-level"; + + /// S3FileSystem default identity. + static constexpr const char* kDefaultS3Identity = "s3-default-identity"; + + /// Keys to identify the config. + enum class Keys { + kBegin, + kEndpoint = kBegin, + kAccessKey, + kSecretKey, + kPathStyleAccess, + kSSLEnabled, + kUseInstanceCredentials, + kIamRole, + kIamRoleSessionName, + kConnectTimeout, + kSocketTimeout, + kMaxConnections, + kMaxAttempts, + kRetryMode, + kUseProxyFromEnv, + kEnd + }; + + /// Map of keys -> . + /// New config must be added here along with a getter function below. + static const std::unordered_map< + Keys, + std::pair>>& + configTraits() { + static const std::unordered_map< + Keys, + std::pair>> + config = { + {Keys::kEndpoint, std::make_pair("endpoint", "")}, + {Keys::kAccessKey, std::make_pair("aws-access-key", std::nullopt)}, + {Keys::kSecretKey, std::make_pair("aws-secret-key", std::nullopt)}, + {Keys::kPathStyleAccess, + std::make_pair("path-style-access", "false")}, + {Keys::kSSLEnabled, std::make_pair("ssl.enabled", "true")}, + {Keys::kUseInstanceCredentials, + std::make_pair("use-instance-credentials", "false")}, + {Keys::kIamRole, std::make_pair("iam-role", std::nullopt)}, + {Keys::kIamRoleSessionName, + std::make_pair("iam-role-session-name", "velox-session")}, + {Keys::kConnectTimeout, + std::make_pair("connect-timeout", std::nullopt)}, + {Keys::kSocketTimeout, + std::make_pair("socket-timeout", std::nullopt)}, + {Keys::kMaxConnections, + std::make_pair("max-connections", std::nullopt)}, + {Keys::kMaxAttempts, std::make_pair("max-attempts", std::nullopt)}, + {Keys::kRetryMode, std::make_pair("retry-mode", std::nullopt)}, + {Keys::kUseProxyFromEnv, + std::make_pair("use-proxy-from-env", "false")}}; + return config; + } + + S3Config( + std::string_view bucket, + std::shared_ptr config); + + /// Identity is used as a key for the S3FileSystem instance map. + /// This will be the bucket endpoint or the base endpoint or the + /// default identity in that order. + static std::string identity( + std::string_view bucket, + std::shared_ptr config); + + /// Return the base config for the input Key. + static std::string baseConfigKey(Keys key) { + std::stringstream buffer; + buffer << kS3Prefix << configTraits().find(key)->second.first; + return buffer.str(); + } + + /// Return the bucket config for the input key. + static std::string bucketConfigKey(Keys key, std::string_view bucket) { + std::stringstream buffer; + buffer << kS3BucketPrefix << bucket << "." + << configTraits().find(key)->second.first; + return buffer.str(); + } + + /// The S3 storage endpoint server. This can be used to connect to an + /// S3-compatible storage system instead of AWS. + std::string endpoint() const { + return config_.find(Keys::kEndpoint)->second.value(); + } + + /// Access key to use. + std::optional accessKey() const { + return config_.find(Keys::kAccessKey)->second; + } + + /// Secret key to use + std::optional secretKey() const { + return config_.find(Keys::kSecretKey)->second; + } + + /// Virtual addressing is used for AWS S3 and is the default + /// (path-style-access is false). Path access style is used for some on-prem + /// systems like Minio. + bool useVirtualAddressing() const { + auto value = config_.find(Keys::kPathStyleAccess)->second.value(); + return !folly::to(value); + } + + /// Use HTTPS to communicate with the S3 API. + bool useSSL() const { + auto value = config_.find(Keys::kSSLEnabled)->second.value(); + return folly::to(value); + } + + /// Use the EC2 metadata service to retrieve API credentials. + bool useInstanceCredentials() const { + auto value = config_.find(Keys::kUseInstanceCredentials)->second.value(); + return folly::to(value); + } + + /// IAM role to assume. + std::optional iamRole() const { + return config_.find(Keys::kIamRole)->second; + } + + /// Session name associated with the IAM role. + std::string iamRoleSessionName() const { + return config_.find(Keys::kIamRoleSessionName)->second.value(); + } + + /// Socket connect timeout. + std::optional connectTimeout() const { + return config_.find(Keys::kConnectTimeout)->second; + } + + /// Socket read timeout. + std::optional socketTimeout() const { + return config_.find(Keys::kSocketTimeout)->second; + } + + /// Maximum concurrent TCP connections for a single http client. + std::optional maxConnections() const { + auto val = config_.find(Keys::kMaxConnections)->second; + if (val.has_value()) { + return folly::to(val.value()); + } + return std::optional(); + } + + /// Maximum retry attempts for a single http client. + std::optional maxAttempts() const { + auto val = config_.find(Keys::kMaxAttempts)->second; + if (val.has_value()) { + return folly::to(val.value()); + } + return std::optional(); + } + + /// Retry mode for a single http client. + std::optional retryMode() const { + return config_.find(Keys::kRetryMode)->second; + } + + bool useProxyFromEnv() const { + auto value = config_.find(Keys::kUseProxyFromEnv)->second.value(); + return folly::to(value); + } + + private: + std::unordered_map> config_; +}; + +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 7126774cea1f..10f71b9f201a 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -17,10 +17,9 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" -#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" -#include "velox/core/QueryConfig.h" #include "velox/dwio/common/DataBuffer.h" #include @@ -47,7 +46,7 @@ #include #include -namespace facebook::velox { +namespace facebook::velox::filesystems { namespace { // Reference: https://issues.apache.org/jira/browse/ARROW-8692 // https://github.com/apache/arrow/blob/master/cpp/src/arrow/filesystem/s3fs.cc#L843 @@ -74,9 +73,9 @@ Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) { class S3ReadFile final : public ReadFile { public: - S3ReadFile(const std::string& path, Aws::S3::S3Client* client) + S3ReadFile(std::string_view path, Aws::S3::S3Client* client) : client_(client) { - getBucketAndKeyFromS3Path(path, bucket_, key_); + getBucketAndKeyFromPath(path, bucket_, key_); } // Gets the length of the file. @@ -190,7 +189,8 @@ class S3ReadFile final : public ReadFile { int64_t length_ = -1; }; -Aws::Utils::Logging::LogLevel inferS3LogLevel(std::string level) { +Aws::Utils::Logging::LogLevel inferS3LogLevel(std::string_view logLevel) { + std::string level = std::string(logLevel); // Convert to upper case. std::transform( level.begin(), level.end(), level.begin(), [](unsigned char c) { @@ -215,18 +215,16 @@ Aws::Utils::Logging::LogLevel inferS3LogLevel(std::string level) { } } // namespace -namespace filesystems { - class S3WriteFile::Impl { public: explicit Impl( - const std::string& path, + std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool) : client_(client), pool_(pool) { VELOX_CHECK_NOT_NULL(client); VELOX_CHECK_NOT_NULL(pool); - getBucketAndKeyFromS3Path(path, bucket_, key_); + getBucketAndKeyFromPath(path, bucket_, key_); currentPart_ = std::make_unique>(*pool_); currentPart_->reserve(kPartUploadSize); // Check that the object doesn't exist, if it does throw an error. @@ -412,7 +410,7 @@ class S3WriteFile::Impl { }; S3WriteFile::S3WriteFile( - const std::string& path, + std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool) { impl_ = std::make_shared(path, client, pool); @@ -438,8 +436,6 @@ int S3WriteFile::numPartsUploaded() const { return impl_->numPartsUploaded(); } -using namespace connector::hive; - // Initialize and Finalize the AWS SDK C++ library. // Initialization must be done before creating a S3FileSystem. // Finalization must be done after all S3FileSystem instances have been deleted. @@ -451,13 +447,13 @@ struct AwsInstance { } // Returns true iff the instance was newly initialized with config. - bool initialize(const config::ConfigBase* config) { + bool initialize(std::string_view logLevel) { if (isFinalized_.load()) { VELOX_FAIL("Attempt to initialize S3 after it has been finalized."); } if (!isInitialized_.exchange(true)) { // Not already initialized. - doInitialize(config); + doInitialize(logLevel); return true; } return false; @@ -489,11 +485,8 @@ struct AwsInstance { } private: - void doInitialize(const config::ConfigBase* config) { - std::shared_ptr hiveConfig = std::make_shared( - std::make_shared(config->rawConfigsCopy())); - awsOptions_.loggingOptions.logLevel = - inferS3LogLevel(hiveConfig->s3GetLogLevel()); + void doInitialize(std::string_view logLevel) { + awsOptions_.loggingOptions.logLevel = inferS3LogLevel(logLevel); // In some situations, curl triggers a SIGPIPE signal causing the entire // process to be terminated without any notification. // This behavior is seen via Prestissimo on AmazonLinux2 on AWS EC2. @@ -516,8 +509,8 @@ AwsInstance* getAwsInstance() { return instance.get(); } -bool initializeS3(const config::ConfigBase* config) { - return getAwsInstance()->initialize(config); +bool initializeS3(std::string_view logLevel) { + return getAwsInstance()->initialize(logLevel); } static std::atomic fileSystemCount = 0; @@ -529,16 +522,14 @@ void finalizeS3() { class S3FileSystem::Impl { public: - Impl(const config::ConfigBase* config) { - hiveConfig_ = std::make_shared( - std::make_shared(config->rawConfigsCopy())); + Impl(const S3Config& s3Config) { VELOX_CHECK(getAwsInstance()->isInitialized(), "S3 is not initialized"); Aws::Client::ClientConfiguration clientConfig; - clientConfig.endpointOverride = hiveConfig_->s3Endpoint(); + clientConfig.endpointOverride = s3Config.endpoint(); - if (hiveConfig_->s3UseProxyFromEnv()) { - auto proxyConfig = S3ProxyConfigurationBuilder(hiveConfig_->s3Endpoint()) - .useSsl(hiveConfig_->s3UseSSL()) + if (s3Config.useProxyFromEnv()) { + auto proxyConfig = S3ProxyConfigurationBuilder(s3Config.endpoint()) + .useSsl(s3Config.useSSL()) .build(); if (proxyConfig.has_value()) { clientConfig.proxyScheme = Aws::Http::SchemeMapper::FromString( @@ -550,44 +541,44 @@ class S3FileSystem::Impl { } } - if (hiveConfig_->s3UseSSL()) { + if (s3Config.useSSL()) { clientConfig.scheme = Aws::Http::Scheme::HTTPS; } else { clientConfig.scheme = Aws::Http::Scheme::HTTP; } - if (hiveConfig_->s3ConnectTimeout().has_value()) { + if (s3Config.connectTimeout().has_value()) { clientConfig.connectTimeoutMs = std::chrono::duration_cast( facebook::velox::config::toDuration( - hiveConfig_->s3ConnectTimeout().value())) + s3Config.connectTimeout().value())) .count(); } - if (hiveConfig_->s3SocketTimeout().has_value()) { + if (s3Config.socketTimeout().has_value()) { clientConfig.requestTimeoutMs = std::chrono::duration_cast( facebook::velox::config::toDuration( - hiveConfig_->s3SocketTimeout().value())) + s3Config.socketTimeout().value())) .count(); } - if (hiveConfig_->s3MaxConnections().has_value()) { - clientConfig.maxConnections = hiveConfig_->s3MaxConnections().value(); + if (s3Config.maxConnections().has_value()) { + clientConfig.maxConnections = s3Config.maxConnections().value(); } - auto retryStrategy = getRetryStrategy(); + auto retryStrategy = getRetryStrategy(s3Config); if (retryStrategy.has_value()) { clientConfig.retryStrategy = retryStrategy.value(); } - auto credentialsProvider = getCredentialsProvider(); + auto credentialsProvider = getCredentialsProvider(s3Config); client_ = std::make_shared( credentialsProvider, clientConfig, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - hiveConfig_->s3UseVirtualAddressing()); + s3Config.useVirtualAddressing()); ++fileSystemCount; } @@ -622,11 +613,11 @@ class S3FileSystem::Impl { } // Return an AWSCredentialsProvider based on the config. - std::shared_ptr getCredentialsProvider() - const { - auto accessKey = hiveConfig_->s3AccessKey(); - auto secretKey = hiveConfig_->s3SecretKey(); - const auto iamRole = hiveConfig_->s3IAMRole(); + std::shared_ptr getCredentialsProvider( + const S3Config& s3Config) const { + auto accessKey = s3Config.accessKey(); + auto secretKey = s3Config.secretKey(); + const auto iamRole = s3Config.iamRole(); int keyCount = accessKey.has_value() + secretKey.has_value(); // keyCount=0 means both are not specified @@ -637,7 +628,7 @@ class S3FileSystem::Impl { "Invalid configuration: both access key and secret key must be specified"); int configCount = (accessKey.has_value() && secretKey.has_value()) + - iamRole.has_value() + hiveConfig_->s3UseInstanceCredentials(); + iamRole.has_value() + s3Config.useInstanceCredentials(); VELOX_USER_CHECK( (configCount <= 1), "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); @@ -647,23 +638,23 @@ class S3FileSystem::Impl { accessKey.value(), secretKey.value()); } - if (hiveConfig_->s3UseInstanceCredentials()) { + if (s3Config.useInstanceCredentials()) { return getDefaultCredentialsProvider(); } if (iamRole.has_value()) { return getIAMRoleCredentialsProvider( - iamRole.value(), hiveConfig_->s3IAMRoleSessionName()); + iamRole.value(), s3Config.iamRoleSessionName()); } return getDefaultCredentialsProvider(); } // Return a client RetryStrategy based on the config. - std::optional> getRetryStrategy() - const { - auto retryMode = hiveConfig_->s3RetryMode(); - auto maxAttempts = hiveConfig_->s3MaxAttempts(); + std::optional> getRetryStrategy( + const S3Config& s3Config) const { + auto retryMode = s3Config.retryMode(); + auto maxAttempts = s3Config.maxAttempts(); if (retryMode.has_value()) { if (retryMode.value() == "standard") { if (maxAttempts.has_value()) { @@ -723,13 +714,15 @@ class S3FileSystem::Impl { } private: - std::shared_ptr hiveConfig_; std::shared_ptr client_; }; -S3FileSystem::S3FileSystem(std::shared_ptr config) +S3FileSystem::S3FileSystem( + std::string_view bucketName, + const std::shared_ptr config) : FileSystem(config) { - impl_ = std::make_shared(config.get()); + S3Config s3Config(bucketName, config); + impl_ = std::make_shared(s3Config); } std::string S3FileSystem::getLogLevelName() const { @@ -737,20 +730,20 @@ std::string S3FileSystem::getLogLevelName() const { } std::unique_ptr S3FileSystem::openFileForRead( - std::string_view path, + std::string_view s3Path, const FileOptions& options) { - const auto file = s3Path(path); - auto s3file = std::make_unique(file, impl_->s3Client()); + const auto path = getPath(s3Path); + auto s3file = std::make_unique(path, impl_->s3Client()); s3file->initialize(options); return s3file; } std::unique_ptr S3FileSystem::openFileForWrite( - std::string_view path, + std::string_view s3Path, const FileOptions& options) { - const auto file = s3Path(path); + const auto path = getPath(s3Path); auto s3file = - std::make_unique(file, impl_->s3Client(), options.pool); + std::make_unique(path, impl_->s3Client(), options.pool); return s3file; } @@ -758,5 +751,4 @@ std::string S3FileSystem::name() const { return "S3"; } -} // namespace filesystems -} // namespace facebook::velox +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index 088575760f99..c624fc6ec5e8 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -17,12 +17,10 @@ #pragma once #include "velox/common/file/FileSystems.h" -#include "velox/connectors/hive/HiveConfig.h" namespace facebook::velox::filesystems { -using namespace facebook::velox::connector::hive; -bool initializeS3(const config::ConfigBase* config); +bool initializeS3(std::string_view logLevel = "FATAL"); void finalizeS3(); @@ -31,16 +29,18 @@ void finalizeS3(); /// type of file can be constructed based on a filename. class S3FileSystem : public FileSystem { public: - explicit S3FileSystem(std::shared_ptr config); + S3FileSystem( + std::string_view bucketName, + const std::shared_ptr config); std::string name() const override; std::unique_ptr openFileForRead( - std::string_view path, + std::string_view s3Path, const FileOptions& options = {}) override; std::unique_ptr openFileForWrite( - std::string_view path, + std::string_view s3Path, const FileOptions& options) override; void remove(std::string_view path) override { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp index e312ca30ca51..66e38ec4b396 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp @@ -23,7 +23,7 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" -namespace facebook::velox { +namespace facebook::velox::filesystems { std::string getErrorStringFromS3Error( const Aws::Client::AWSError& error) { @@ -148,4 +148,4 @@ std::optional S3ProxyConfigurationBuilder::build() { return proxyUri; } -} // namespace facebook::velox +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3Util.h b/velox/connectors/hive/storage_adapters/s3fs/S3Util.h index 399b5c2740e6..0835c9ee419a 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3Util.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3Util.h @@ -23,29 +23,29 @@ #include #include +#include #include #include "velox/common/base/Exceptions.h" -#include - -namespace facebook::velox { +namespace facebook::velox::filesystems { namespace { -constexpr std::string_view kSep{"/"}; +static std::string_view kSep{"/"}; // AWS S3 EMRFS, Hadoop block storage filesystem on-top of Amazon S3 buckets. -constexpr std::string_view kS3Scheme{"s3://"}; +static std::string_view kS3Scheme{"s3://"}; // This should not be mixed with s3 nor the s3a. // S3A Hadoop 3.x (previous connectors "s3" and "s3n" are deprecated). -constexpr std::string_view kS3aScheme{"s3a://"}; +static std::string_view kS3aScheme{"s3a://"}; // DEPRECATED: s3n are deprecated in Hadoop 3.x but we are supporting s3n for // data that hasn't been migrated yet. -constexpr std::string_view kS3nScheme{"s3n://"}; +static std::string_view kS3nScheme{"s3n://"}; // OSS Alibaba support S3 format, usage only with SSL. -constexpr std::string_view kOssScheme{"oss://"}; +static std::string_view kOssScheme{"oss://"}; // Tencent COS support S3 format. -constexpr std::string_view kCosScheme{"cos://"}; -constexpr std::string_view kCosNScheme{"cosn://"}; +static std::string_view kCosScheme{"cos://"}; +static std::string_view kCosNScheme{"cosn://"}; + // From AWS documentation constexpr int kS3MaxKeySize{1024}; } // namespace @@ -80,8 +80,9 @@ inline bool isS3File(const std::string_view filename) { isOssFile(filename) || isCosFile(filename) || isCosNFile(filename); } -inline void getBucketAndKeyFromS3Path( - const std::string& path, +// The input `path` must not have the S3 prefix. +inline void getBucketAndKeyFromPath( + std::string_view path, std::string& bucket, std::string& key) { auto firstSep = path.find_first_of(kSep); @@ -96,32 +97,30 @@ inline void getBucketAndKeyFromS3Path( // regexp="(^[a-z0-9])([.-]?[a-z0-9]+){2,62}([/]?$)" // 3. Disallowed IPv4 notation - regexp: // regexp="^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4}[/]?$" -inline std::string s3URI(const std::string& bucket) { - return std::string(kS3Scheme) + bucket; -} - -inline std::string s3URI(const std::string& bucket, const std::string& key) { - return s3URI(bucket) + "/" + key; +inline std::string s3URI(std::string_view bucket, std::string_view key) { + std::stringstream ss; + ss << kS3Scheme << bucket << kSep << key; + return ss.str(); } -inline std::string s3Path(const std::string_view& path) { +inline std::string_view getPath(std::string_view path) { // Remove one of the prefixes 's3://', 'oss://', 's3a://' if any from the // given path. // TODO: Each prefix should be implemented as its own filesystem. if (isS3AwsFile(path)) { - return std::string(path.substr(kS3Scheme.length())); + return path.substr(kS3Scheme.length()); } else if (isS3aFile(path)) { - return std::string(path.substr(kS3aScheme.length())); + return path.substr(kS3aScheme.length()); } else if (isS3nFile(path)) { - return std::string(path.substr(kS3nScheme.length())); + return path.substr(kS3nScheme.length()); } else if (isOssFile(path)) { - return std::string(path.substr(kOssScheme.length())); + return path.substr(kOssScheme.length()); } else if (isCosFile(path)) { - return std::string(path.substr(kCosScheme.length())); + return path.substr(kCosScheme.length()); } else if (isCosNFile(path)) { - return std::string(path.substr(kCosNScheme.length())); + return path.substr(kCosNScheme.length()); } - return std::string(path); + return path; } inline Aws::String awsString(const std::string& s) { @@ -209,7 +208,7 @@ class S3ProxyConfigurationBuilder { bool useSsl_; }; -} // namespace facebook::velox +} // namespace facebook::velox::filesystems template <> struct fmt::formatter : formatter { diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index dd2661fad362..929eed20c370 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -48,7 +48,7 @@ namespace facebook::velox::filesystems { class S3WriteFile : public WriteFile { public: S3WriteFile( - const std::string& path, + std::string_view path, Aws::S3::S3Client* client, memory::MemoryPool* pool); diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt index 852038c483b2..0cc023848b30 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_s3file_test S3FileSystemTest.cpp S3UtilTest.cpp) +add_executable(velox_s3file_test S3FileSystemTest.cpp S3UtilTest.cpp + S3ConfigTest.cpp) add_test(velox_s3file_test velox_s3file_test) target_link_libraries( velox_s3file_test diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3ConfigTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3ConfigTest.cpp new file mode 100644 index 000000000000..62c397de3160 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3ConfigTest.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" +#include "velox/common/config/Config.h" + +#include + +namespace facebook::velox::filesystems { +namespace { +TEST(S3ConfigTest, defaultConfig) { + auto config = std::make_shared( + std::unordered_map()); + auto s3Config = S3Config("", config); + ASSERT_EQ(s3Config.useVirtualAddressing(), true); + ASSERT_EQ(s3Config.useSSL(), true); + ASSERT_EQ(s3Config.useInstanceCredentials(), false); + ASSERT_EQ(s3Config.endpoint(), ""); + ASSERT_EQ(s3Config.accessKey(), std::nullopt); + ASSERT_EQ(s3Config.secretKey(), std::nullopt); + ASSERT_EQ(s3Config.iamRole(), std::nullopt); + ASSERT_EQ(s3Config.iamRoleSessionName(), "velox-session"); +} + +TEST(HiveConfigTest, overrideConfig) { + std::unordered_map configFromFile = { + {S3Config::baseConfigKey(S3Config::Keys::kPathStyleAccess), "true"}, + {S3Config::baseConfigKey(S3Config::Keys::kSSLEnabled), "false"}, + {S3Config::baseConfigKey(S3Config::Keys::kUseInstanceCredentials), + "true"}, + {S3Config::baseConfigKey(S3Config::Keys::kEndpoint), "hey"}, + {S3Config::baseConfigKey(S3Config::Keys::kAccessKey), "hello"}, + {S3Config::baseConfigKey(S3Config::Keys::kSecretKey), "hello"}, + {S3Config::baseConfigKey(S3Config::Keys::kIamRole), "hello"}, + {S3Config::baseConfigKey(S3Config::Keys::kIamRoleSessionName), "velox"}}; + auto s3Config = S3Config( + "", std::make_shared(std::move(configFromFile))); + ASSERT_EQ(s3Config.useVirtualAddressing(), false); + ASSERT_EQ(s3Config.useSSL(), false); + ASSERT_EQ(s3Config.useInstanceCredentials(), true); + ASSERT_EQ(s3Config.endpoint(), "hey"); + ASSERT_EQ(s3Config.accessKey(), std::optional("hello")); + ASSERT_EQ(s3Config.secretKey(), std::optional("hello")); + ASSERT_EQ(s3Config.iamRole(), std::optional("hello")); + ASSERT_EQ(s3Config.iamRoleSessionName(), "velox"); +} + +TEST(HiveConfigTest, overrideBucketConfig) { + std::string_view bucket = "bucket"; + std::unordered_map bucketConfigFromFile = { + {S3Config::baseConfigKey(S3Config::Keys::kPathStyleAccess), "true"}, + {S3Config::baseConfigKey(S3Config::Keys::kSSLEnabled), "false"}, + {S3Config::baseConfigKey(S3Config::Keys::kUseInstanceCredentials), + "true"}, + {S3Config::baseConfigKey(S3Config::Keys::kEndpoint), "hey"}, + {S3Config::bucketConfigKey(S3Config::Keys::kEndpoint, bucket), + "bucket-hey"}, + {S3Config::baseConfigKey(S3Config::Keys::kAccessKey), "hello"}, + {S3Config::bucketConfigKey(S3Config::Keys::kAccessKey, bucket), + "bucket-hello"}, + {S3Config::baseConfigKey(S3Config::Keys::kSecretKey), "secret-hello"}, + {S3Config::bucketConfigKey(S3Config::Keys::kSecretKey, bucket), + "bucket-secret-hello"}, + {S3Config::baseConfigKey(S3Config::Keys::kIamRole), "hello"}, + {S3Config::baseConfigKey(S3Config::Keys::kIamRoleSessionName), "velox"}}; + auto s3Config = S3Config( + bucket, + std::make_shared(std::move(bucketConfigFromFile))); + ASSERT_EQ(s3Config.useVirtualAddressing(), false); + ASSERT_EQ(s3Config.useSSL(), false); + ASSERT_EQ(s3Config.useInstanceCredentials(), true); + ASSERT_EQ(s3Config.endpoint(), "bucket-hey"); + ASSERT_EQ(s3Config.accessKey(), std::optional("bucket-hello")); + ASSERT_EQ(s3Config.secretKey(), std::optional("bucket-secret-hello")); + ASSERT_EQ(s3Config.iamRole(), std::optional("hello")); + ASSERT_EQ(s3Config.iamRoleSessionName(), "velox"); +} + +} // namespace +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp index 1ee0387faa0e..9bebc4162490 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemFinalizeTest.cpp @@ -26,16 +26,16 @@ namespace { TEST(S3FileSystemFinalizeTest, finalize) { auto s3Config = std::make_shared( std::unordered_map()); - ASSERT_TRUE(filesystems::initializeS3(s3Config.get())); - ASSERT_FALSE(filesystems::initializeS3(s3Config.get())); + ASSERT_TRUE(filesystems::initializeS3()); + ASSERT_FALSE(filesystems::initializeS3()); { - filesystems::S3FileSystem s3fs(s3Config); + filesystems::S3FileSystem s3fs("", s3Config); VELOX_ASSERT_THROW( filesystems::finalizeS3(), "Cannot finalize S3 while in use"); } filesystems::finalizeS3(); VELOX_ASSERT_THROW( - filesystems::initializeS3(s3Config.get()), + filesystems::initializeS3(), "Attempt to initialize S3 after it has been finalized."); } diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp index c247a7430943..b115bb151234 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp @@ -17,7 +17,7 @@ #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" -namespace facebook::velox { +namespace facebook::velox::filesystems { namespace { class S3FileSystemRegistrationTest : public S3Test { @@ -77,4 +77,4 @@ TEST_F(S3FileSystemRegistrationTest, finalize) { filesystems::finalizeS3FileSystem(), "Cannot finalize S3FileSystem while in use"); } -} // namespace facebook::velox +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 1b92147d5b88..c97bff4ee894 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -20,7 +20,7 @@ #include -namespace facebook::velox { +namespace facebook::velox::filesystems { namespace { class S3FileSystemTest : public S3Test { @@ -31,8 +31,8 @@ class S3FileSystemTest : public S3Test { void SetUp() override { S3Test::SetUp(); - auto hiveConfig = minioServer_->hiveConfig({{"hive.s3.log-level", "Info"}}); - filesystems::initializeS3(hiveConfig.get()); + auto hiveConfig = minioServer_->hiveConfig({}); + filesystems::initializeS3("Info"); } static void TearDownTestSuite() { @@ -47,15 +47,15 @@ TEST_F(S3FileSystemTest, writeAndRead) { setenv("HTTP_PROXY", "http://test:test@127.0.0.1:8888", 1); const char* bucketName = "data"; const char* file = "test.txt"; - const std::string filename = localPath(bucketName) + "/" + file; - const std::string s3File = s3URI(bucketName, file); + const auto filename = localPath(bucketName) + "/" + file; + const auto s3File = s3URI(bucketName, file); addBucket(bucketName); { LocalWriteFile writeFile(filename); writeData(&writeFile); } auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto readFile = s3fs.openFileForRead(s3File); readData(readFile.get()); } @@ -70,7 +70,7 @@ TEST_F(S3FileSystemTest, invalidCredentialsConfig) { // Both instance credentials and iam-role cannot be specified VELOX_ASSERT_THROW( - filesystems::S3FileSystem(hiveConfig), + filesystems::S3FileSystem("", hiveConfig), "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); } { @@ -82,7 +82,7 @@ TEST_F(S3FileSystemTest, invalidCredentialsConfig) { std::make_shared(std::move(config)); // Both access/secret keys and iam-role cannot be specified VELOX_ASSERT_THROW( - filesystems::S3FileSystem(hiveConfig), + filesystems::S3FileSystem("", hiveConfig), "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); } { @@ -94,7 +94,7 @@ TEST_F(S3FileSystemTest, invalidCredentialsConfig) { std::make_shared(std::move(config)); // Both access/secret keys and instance credentials cannot be specified VELOX_ASSERT_THROW( - filesystems::S3FileSystem(hiveConfig), + filesystems::S3FileSystem("", hiveConfig), "Invalid configuration: specify only one among 'access/secret keys', 'use instance credentials', 'IAM role'"); } { @@ -104,7 +104,7 @@ TEST_F(S3FileSystemTest, invalidCredentialsConfig) { std::make_shared(std::move(config)); // Both access key and secret key must be specified VELOX_ASSERT_THROW( - filesystems::S3FileSystem(hiveConfig), + filesystems::S3FileSystem("", hiveConfig), "Invalid configuration: both access key and secret key must be specified"); } } @@ -115,7 +115,7 @@ TEST_F(S3FileSystemTest, missingFile) { const std::string s3File = s3URI(bucketName, file); addBucket(bucketName); auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); VELOX_ASSERT_RUNTIME_THROW_CODE( s3fs.openFileForRead(s3File), error_code::kFileNotFound, @@ -124,7 +124,7 @@ TEST_F(S3FileSystemTest, missingFile) { TEST_F(S3FileSystemTest, missingBucket) { auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs("", hiveConfig); VELOX_ASSERT_RUNTIME_THROW_CODE( s3fs.openFileForRead(kDummyPath), error_code::kFileNotFound, @@ -134,7 +134,7 @@ TEST_F(S3FileSystemTest, missingBucket) { TEST_F(S3FileSystemTest, invalidAccessKey) { auto hiveConfig = minioServer_->hiveConfig({{"hive.s3.aws-access-key", "dummy-key"}}); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs("", hiveConfig); // Minio credentials are wrong and this should throw VELOX_ASSERT_THROW( s3fs.openFileForRead(kDummyPath), @@ -144,7 +144,7 @@ TEST_F(S3FileSystemTest, invalidAccessKey) { TEST_F(S3FileSystemTest, invalidSecretKey) { auto hiveConfig = minioServer_->hiveConfig({{"hive.s3.aws-secret-key", "dummy-key"}}); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs("", hiveConfig); // Minio credentials are wrong and this should throw. VELOX_ASSERT_THROW( s3fs.openFileForRead("s3://dummy/foo.txt"), @@ -154,7 +154,7 @@ TEST_F(S3FileSystemTest, invalidSecretKey) { TEST_F(S3FileSystemTest, noBackendServer) { auto hiveConfig = minioServer_->hiveConfig({{"hive.s3.aws-secret-key", "dummy-key"}}); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs("", hiveConfig); // Stop Minio and check error. minioServer_->stop(); VELOX_ASSERT_THROW( @@ -169,7 +169,7 @@ TEST_F(S3FileSystemTest, logLevel) { auto checkLogLevelName = [&config](std::string_view expected) { auto s3Config = std::make_shared(std::move(config)); - filesystems::S3FileSystem s3fs(s3Config); + filesystems::S3FileSystem s3fs("", s3Config); EXPECT_EQ(s3fs.getLogLevelName(), expected); }; @@ -189,7 +189,7 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { const auto s3File = s3URI(bucketName, file); auto hiveConfig = minioServer_->hiveConfig(); - filesystems::S3FileSystem s3fs(hiveConfig); + filesystems::S3FileSystem s3fs(bucketName, hiveConfig); auto pool = memory::memoryManager()->addLeafPool("S3FileSystemTest"); auto writeFile = s3fs.openFileForWrite(s3File, {{}, pool.get(), std::nullopt}); @@ -258,9 +258,11 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { TEST_F(S3FileSystemTest, invalidConnectionSettings) { auto hiveConfig = minioServer_->hiveConfig({{"hive.s3.connect-timeout", "400"}}); - VELOX_ASSERT_THROW(filesystems::S3FileSystem(hiveConfig), "Invalid duration"); + VELOX_ASSERT_THROW( + filesystems::S3FileSystem("", hiveConfig), "Invalid duration"); hiveConfig = minioServer_->hiveConfig({{"hive.s3.socket-timeout", "abc"}}); - VELOX_ASSERT_THROW(filesystems::S3FileSystem(hiveConfig), "Invalid duration"); + VELOX_ASSERT_THROW( + filesystems::S3FileSystem("", hiveConfig), "Invalid duration"); } -} // namespace facebook::velox +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp index bff4eea34e68..e5369511931b 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3InsertTest.cpp @@ -28,7 +28,7 @@ using namespace facebook::velox::exec::test; -namespace facebook::velox { +namespace facebook::velox::filesystems { namespace { class S3InsertTest : public S3Test { @@ -125,7 +125,7 @@ TEST_F(S3InsertTest, s3InsertTest) { auto copy = AssertQueryBuilder(plan).split(split).copyResults(pool()); assertEqualResults({input}, {copy}); } -} // namespace facebook::velox +} // namespace facebook::velox::filesystems int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp index c0ce3af68404..9b85ec7bb58e 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3MultipleEndpointsTest.cpp @@ -18,6 +18,7 @@ #include "gtest/gtest.h" #include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" @@ -27,6 +28,7 @@ static const std::string_view kConnectorId1 = "test-hive1"; static const std::string_view kConnectorId2 = "test-hive2"; +static const std::string_view kBucketName = "writedata"; using namespace facebook::velox::exec::test; @@ -38,43 +40,53 @@ class S3MultipleEndpoints : public S3Test { static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); } + static void TearDownTestCase() { + filesystems::finalizeS3FileSystem(); + } void SetUp() override { S3Test::SetUp(); + minioSecondServer_ = std::make_unique(); + minioSecondServer_->start(); + minioServer_->addBucket(kBucketName.data()); + minioSecondServer_->addBucket(kBucketName.data()); + filesystems::registerS3FileSystem(); connector::registerConnectorFactory( std::make_shared()); + parquet::registerParquetReaderFactory(); + parquet::registerParquetWriterFactory(); + } + + void registerConnectors( + std::string_view connectorId1, + std::string_view connectorId2, + const std::unordered_map config1Override = {}, + const std::unordered_map config2Override = {}) { auto hiveConnector1 = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - std::string(kConnectorId1), - minioServer_->hiveConfig(), + std::string(connectorId1), + minioServer_->hiveConfig(config1Override), ioExecutor_.get()); - connector::registerConnector(hiveConnector1); - minioSecondServer_ = std::make_unique(); - minioSecondServer_->start(); auto hiveConnector2 = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) ->newConnector( - std::string(kConnectorId2), - minioSecondServer_->hiveConfig(), + std::string(connectorId2), + minioSecondServer_->hiveConfig(config2Override), ioExecutor_.get()); + connector::registerConnector(hiveConnector1); connector::registerConnector(hiveConnector2); - parquet::registerParquetReaderFactory(); - parquet::registerParquetWriterFactory(); } void TearDown() override { parquet::unregisterParquetReaderFactory(); parquet::unregisterParquetWriterFactory(); - connector::unregisterConnector(std::string(kConnectorId1)); - connector::unregisterConnector(std::string(kConnectorId2)); connector::unregisterConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName); S3Test::TearDown(); - filesystems::finalizeS3FileSystem(); } folly::dynamic writeData( @@ -117,67 +129,106 @@ class S3MultipleEndpoints : public S3Test { .build(); } + void testJoin( + int numRows, + std::string_view outputDirectory, + std::string_view connectorId1, + std::string_view connectorId2) { + auto rowType1 = ROW( + {"a0", "a1", "a2", "a3"}, {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()}); + auto rowType2 = ROW( + {"b0", "b1", "b2", "b3"}, {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()}); + + auto input1 = makeRowVector( + rowType1->names(), + {makeFlatVector(numRows, [](auto row) { return row; }), + makeFlatVector(numRows, [](auto row) { return row; }), + makeFlatVector(numRows, [](auto row) { return row; }), + makeFlatVector(numRows, [](auto row) { return row; })}); + auto input2 = makeRowVector(rowType2->names(), input1->children()); + + // Insert input data into both tables. + auto table1WriteInfo = + writeData(input1, outputDirectory.data(), std::string(connectorId1)); + auto table2WriteInfo = + writeData(input2, outputDirectory.data(), std::string(connectorId2)); + + // Inner Join both the tables. + core::PlanNodeId scan1, scan2; + auto planNodeIdGenerator = std::make_shared(); + auto table1Scan = PlanBuilder(planNodeIdGenerator, pool()) + .startTableScan() + .tableName("hive_table1") + .outputType(rowType1) + .connectorId(std::string(connectorId1)) + .endTableScan() + .capturePlanNodeId(scan1) + .planNode(); + auto join = + PlanBuilder(planNodeIdGenerator, pool()) + .startTableScan() + .tableName("hive_table1") + .outputType(rowType2) + .connectorId(std::string(connectorId2)) + .endTableScan() + .capturePlanNodeId(scan2) + .hashJoin({"b0"}, {"a0"}, table1Scan, "", {"a0", "a1", "a2", "a3"}) + .planNode(); + + auto split1 = createSplit( + table1WriteInfo, outputDirectory.data(), std::string(connectorId1)); + auto split2 = createSplit( + table2WriteInfo, outputDirectory.data(), std::string(connectorId2)); + auto results = AssertQueryBuilder(join) + .split(scan1, split1) + .split(scan2, split2) + .copyResults(pool()); + assertEqualResults({input1}, {results}); + } + std::unique_ptr minioSecondServer_; }; } // namespace -TEST_F(S3MultipleEndpoints, s3Join) { +TEST_F(S3MultipleEndpoints, baseEndpoints) { + const int64_t kExpectedRows = 1'000; + const auto outputDirectory{filesystems::s3URI(kBucketName, "")}; + + registerConnectors(kConnectorId1, kConnectorId2); + + testJoin(kExpectedRows, outputDirectory, kConnectorId1, kConnectorId2); + + connector::unregisterConnector(std::string(kConnectorId1)); + connector::unregisterConnector(std::string(kConnectorId2)); +} + +TEST_F(S3MultipleEndpoints, bucketEndpoints) { const int64_t kExpectedRows = 1'000; - const std::string_view kOutputDirectory{"s3://writedata/"}; - - auto rowType1 = ROW( - {"a0", "a1", "a2", "a3"}, {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()}); - auto rowType2 = ROW( - {"b0", "b1", "b2", "b3"}, {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()}); - - auto input1 = makeRowVector( - rowType1->names(), - {makeFlatVector(kExpectedRows, [](auto row) { return row; }), - makeFlatVector(kExpectedRows, [](auto row) { return row; }), - makeFlatVector(kExpectedRows, [](auto row) { return row; }), - makeFlatVector(kExpectedRows, [](auto row) { return row; })}); - auto input2 = makeRowVector(rowType2->names(), input1->children()); - minioServer_->addBucket("writedata"); - minioSecondServer_->addBucket("writedata"); - - // Insert input data into both tables. - auto table1WriteInfo = - writeData(input1, kOutputDirectory.data(), std::string(kConnectorId1)); - auto table2WriteInfo = - writeData(input2, kOutputDirectory.data(), std::string(kConnectorId2)); - - // Inner Join both the tables. - core::PlanNodeId scan1, scan2; - auto planNodeIdGenerator = std::make_shared(); - auto table1Scan = PlanBuilder(planNodeIdGenerator, pool()) - .startTableScan() - .tableName("hive_table1") - .outputType(rowType1) - .connectorId(std::string(kConnectorId1)) - .endTableScan() - .capturePlanNodeId(scan1) - .planNode(); - auto join = - PlanBuilder(planNodeIdGenerator, pool()) - .startTableScan() - .tableName("hive_table1") - .outputType(rowType2) - .connectorId(std::string(kConnectorId2)) - .endTableScan() - .capturePlanNodeId(scan2) - .hashJoin({"b0"}, {"a0"}, table1Scan, "", {"a0", "a1", "a2", "a3"}) - .planNode(); - - auto split1 = createSplit( - table1WriteInfo, kOutputDirectory.data(), std::string(kConnectorId1)); - auto split2 = createSplit( - table2WriteInfo, kOutputDirectory.data(), std::string(kConnectorId2)); - auto results = AssertQueryBuilder(join) - .split(scan1, split1) - .split(scan2, split2) - .copyResults(pool()); - assertEqualResults({input1}, {results}); + const auto outputDirectory{filesystems::s3URI(kBucketName, "")}; + + auto configOverride = [](std::shared_ptr config) { + return std::unordered_map{ + {"hive.s3.bucket.writedata.endpoint", + config->get("hive.s3.endpoint").value()}, + {"hive.s3.bucket.writedata.aws-access-key", + config->get("hive.s3.aws-access-key").value()}, + {"hive.s3.bucket.writedata.aws-secret-key", + config->get("hive.s3.aws-secret-key").value()}, + {"hive.s3.endpoint", "fail"}, + {"hive.s3.aws-access-key", "fail"}, + {"hive.s3.aws-secret-key", "fail"}, + }; + }; + auto config1 = configOverride(minioServer_->hiveConfig()); + auto config2 = configOverride(minioSecondServer_->hiveConfig()); + registerConnectors(kConnectorId1, kConnectorId2, config1, config2); + + testJoin(kExpectedRows, outputDirectory, kConnectorId1, kConnectorId2); + + connector::unregisterConnector(std::string(kConnectorId1)); + connector::unregisterConnector(std::string(kConnectorId2)); } + } // namespace facebook::velox int main(int argc, char** argv) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3ReadTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3ReadTest.cpp index 4fd42a561301..08f6c255348a 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3ReadTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3ReadTest.cpp @@ -27,7 +27,7 @@ using namespace facebook::velox::exec::test; -namespace facebook::velox { +namespace facebook::velox::filesystems { namespace { class S3ReadTest : public S3Test { @@ -75,11 +75,9 @@ TEST_F(S3ReadTest, s3ReadTest) { dest.close(); // Read the parquet file via the S3 bucket. - const auto readDirectory{s3URI(bucketName)}; auto rowType = ROW({"int", "bigint"}, {INTEGER(), BIGINT()}); auto plan = PlanBuilder().tableScan(rowType).planNode(); - auto split = HiveConnectorSplitBuilder( - fmt::format("{}/{}", readDirectory, "int.parquet")) + auto split = HiveConnectorSplitBuilder(s3URI(bucketName, "int.parquet")) .fileFormat(dwio::common::FileFormat::PARQUET) .build(); auto copy = AssertQueryBuilder(plan).split(split).copyResults(pool()); @@ -93,7 +91,7 @@ TEST_F(S3ReadTest, s3ReadTest) { kExpectedRows, [](auto row) { return row + 1000; })}); assertEqualResults({expectedResults}, {copy}); } -} // namespace facebook::velox +} // namespace facebook::velox::filesystems int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp index eb3b5dbff450..0f70a6f1be72 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp @@ -18,7 +18,7 @@ #include "gtest/gtest.h" -namespace facebook::velox { +namespace facebook::velox::filesystems { // TODO: Each prefix should be implemented as its own filesystem. TEST(S3UtilTest, isS3File) { @@ -85,14 +85,13 @@ TEST(S3UtilTest, isCosNFile) { EXPECT_TRUE(isCosNFile("cosn://bucket/file.txt")); } -// TODO: Each prefix should be implemented as its own filesystem. TEST(S3UtilTest, s3Path) { - auto path_0 = s3Path("s3://bucket/file.txt"); - auto path_1 = s3Path("oss://bucket-name/file.txt"); - auto path_2 = s3Path("S3A://bucket-NAME/sub-PATH/my-file.txt"); - auto path_3 = s3Path("s3N://bucket-NAME/sub-PATH/my-file.txt"); - auto path_4 = s3Path("cos://bucket-name/file.txt"); - auto path_5 = s3Path("cosn://bucket-name/file.txt"); + auto path_0 = getPath("s3://bucket/file.txt"); + auto path_1 = getPath("oss://bucket-name/file.txt"); + auto path_2 = getPath("S3A://bucket-NAME/sub-PATH/my-file.txt"); + auto path_3 = getPath("s3N://bucket-NAME/sub-PATH/my-file.txt"); + auto path_4 = getPath("cos://bucket-name/file.txt"); + auto path_5 = getPath("cosn://bucket-name/file.txt"); EXPECT_EQ(path_0, "bucket/file.txt"); EXPECT_EQ(path_1, "bucket-name/file.txt"); EXPECT_NE(path_2, "bucket-NAME/sub-PATH/my-file.txt"); @@ -101,10 +100,10 @@ TEST(S3UtilTest, s3Path) { EXPECT_EQ(path_5, "bucket-name/file.txt"); } -TEST(S3UtilTest, bucketAndKeyFromS3Path) { +TEST(S3UtilTest, bucketAndKeyFromgetPath) { std::string bucket, key; auto path = "bucket/file.txt"; - getBucketAndKeyFromS3Path(path, bucket, key); + getBucketAndKeyFromPath(path, bucket, key); EXPECT_EQ(bucket, "bucket"); EXPECT_EQ(key, "file.txt"); } @@ -246,4 +245,4 @@ INSTANTIATE_TEST_SUITE_P( S3UtilProxyTest, ::testing::Values(true, false)); -} // namespace facebook::velox +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index 875002dbd0b1..5463e4578218 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -33,15 +33,6 @@ TEST(HiveConfigTest, defaultConfig) { InsertExistingPartitionsBehavior::kError); ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 100); ASSERT_EQ(hiveConfig.immutablePartitions(), false); - ASSERT_EQ(hiveConfig.s3UseVirtualAddressing(), true); - ASSERT_EQ(hiveConfig.s3GetLogLevel(), "FATAL"); - ASSERT_EQ(hiveConfig.s3UseSSL(), true); - ASSERT_EQ(hiveConfig.s3UseInstanceCredentials(), false); - ASSERT_EQ(hiveConfig.s3Endpoint(), ""); - ASSERT_EQ(hiveConfig.s3AccessKey(), std::nullopt); - ASSERT_EQ(hiveConfig.s3SecretKey(), std::nullopt); - ASSERT_EQ(hiveConfig.s3IAMRole(), std::nullopt); - ASSERT_EQ(hiveConfig.s3IAMRoleSessionName(), "velox-session"); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); ASSERT_EQ(hiveConfig.isOrcUseColumnNames(emptySession.get()), false); @@ -85,15 +76,6 @@ TEST(HiveConfigTest, overrideConfig) { {HiveConfig::kInsertExistingPartitionsBehavior, "OVERWRITE"}, {HiveConfig::kMaxPartitionsPerWriters, "120"}, {HiveConfig::kImmutablePartitions, "true"}, - {HiveConfig::kS3PathStyleAccess, "true"}, - {HiveConfig::kS3LogLevel, "Warning"}, - {HiveConfig::kS3SSLEnabled, "false"}, - {HiveConfig::kS3UseInstanceCredentials, "true"}, - {HiveConfig::kS3Endpoint, "hey"}, - {HiveConfig::kS3AwsAccessKey, "hello"}, - {HiveConfig::kS3AwsSecretKey, "hello"}, - {HiveConfig::kS3IamRole, "hello"}, - {HiveConfig::kS3IamRoleSessionName, "velox"}, {HiveConfig::kGCSEndpoint, "hey"}, {HiveConfig::kGCSCredentialsPath, "hey"}, {HiveConfig::kOrcUseColumnNames, "true"}, @@ -124,15 +106,6 @@ TEST(HiveConfigTest, overrideConfig) { InsertExistingPartitionsBehavior::kOverwrite); ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 120); ASSERT_EQ(hiveConfig.immutablePartitions(), true); - ASSERT_EQ(hiveConfig.s3UseVirtualAddressing(), false); - ASSERT_EQ(hiveConfig.s3GetLogLevel(), "Warning"); - ASSERT_EQ(hiveConfig.s3UseSSL(), false); - ASSERT_EQ(hiveConfig.s3UseInstanceCredentials(), true); - ASSERT_EQ(hiveConfig.s3Endpoint(), "hey"); - ASSERT_EQ(hiveConfig.s3AccessKey(), std::optional("hello")); - ASSERT_EQ(hiveConfig.s3SecretKey(), std::optional("hello")); - ASSERT_EQ(hiveConfig.s3IAMRole(), std::optional("hello")); - ASSERT_EQ(hiveConfig.s3IAMRoleSessionName(), "velox"); ASSERT_EQ(hiveConfig.gcsEndpoint(), "hey"); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "hey"); ASSERT_EQ(hiveConfig.isOrcUseColumnNames(emptySession.get()), true); @@ -198,15 +171,6 @@ TEST(HiveConfigTest, overrideSession) { InsertExistingPartitionsBehavior::kOverwrite); ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 100); ASSERT_EQ(hiveConfig.immutablePartitions(), false); - ASSERT_EQ(hiveConfig.s3UseVirtualAddressing(), true); - ASSERT_EQ(hiveConfig.s3GetLogLevel(), "FATAL"); - ASSERT_EQ(hiveConfig.s3UseSSL(), true); - ASSERT_EQ(hiveConfig.s3UseInstanceCredentials(), false); - ASSERT_EQ(hiveConfig.s3Endpoint(), ""); - ASSERT_EQ(hiveConfig.s3AccessKey(), std::nullopt); - ASSERT_EQ(hiveConfig.s3SecretKey(), std::nullopt); - ASSERT_EQ(hiveConfig.s3IAMRole(), std::nullopt); - ASSERT_EQ(hiveConfig.s3IAMRoleSessionName(), "velox-session"); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); ASSERT_EQ(hiveConfig.isOrcUseColumnNames(session.get()), true);