Skip to content

Commit

Permalink
Add support for S3 bucket config (facebookincubator#11321)
Browse files Browse the repository at this point in the history
Summary:
Allow all `hive.s3` options to be set on a per-bucket basis.
The bucket-specific option is set by replacing the hive.s3. prefix on an option
with hive.s3.bucket.BUCKETNAME., where BUCKETNAME is the name of the bucket.
When connecting to a bucket, all options explicitly set will override the base hive.s3. values.
These semantics are similar to the Apache Hadoop-Aws module.
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
Spark uses this for ETL workloads between 2 buckets.

Pull Request resolved: facebookincubator#11321

Reviewed By: Yuhta

Differential Revision: D65365858

Pulled By: kgpai

fbshipit-source-id: 0a0cf401ef22d7270da7e932a99c0d8614597299
  • Loading branch information
majetideepak authored and facebook-github-bot committed Nov 7, 2024
1 parent 327fa89 commit d76c05c
Show file tree
Hide file tree
Showing 21 changed files with 685 additions and 424 deletions.
68 changes: 0 additions & 68 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,70 +71,6 @@ bool HiveConfig::immutablePartitions() const {
return config_->get<bool>(kImmutablePartitions, false);
}

bool HiveConfig::s3UseVirtualAddressing() const {
return !config_->get(kS3PathStyleAccess, false);
}

std::string HiveConfig::s3GetLogLevel() const {
return config_->get(kS3LogLevel, std::string("FATAL"));
}

bool HiveConfig::s3UseSSL() const {
return config_->get(kS3SSLEnabled, true);
}

bool HiveConfig::s3UseInstanceCredentials() const {
return config_->get(kS3UseInstanceCredentials, false);
}

std::string HiveConfig::s3Endpoint() const {
return config_->get(kS3Endpoint, std::string(""));
}

std::optional<std::string> HiveConfig::s3AccessKey() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3AwsAccessKey));
}

std::optional<std::string> HiveConfig::s3SecretKey() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3AwsSecretKey));
}

std::optional<std::string> HiveConfig::s3IAMRole() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3IamRole));
}

std::string HiveConfig::s3IAMRoleSessionName() const {
return config_->get(kS3IamRoleSessionName, std::string("velox-session"));
}

std::optional<std::string> HiveConfig::s3ConnectTimeout() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3ConnectTimeout));
}

std::optional<std::string> HiveConfig::s3SocketTimeout() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3SocketTimeout));
}

std::optional<uint32_t> HiveConfig::s3MaxConnections() const {
return static_cast<std::optional<std::uint32_t>>(
config_->get<uint32_t>(kS3MaxConnections));
}

std::optional<int32_t> HiveConfig::s3MaxAttempts() const {
return static_cast<std::optional<std::int32_t>>(
config_->get<int32_t>(kS3MaxAttempts));
}

std::optional<std::string> HiveConfig::s3RetryMode() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kS3RetryMode));
}

std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
}
Expand Down Expand Up @@ -325,10 +261,6 @@ uint64_t HiveConfig::filePreloadThreshold() const {
return config_->get<uint64_t>(kFilePreloadThreshold, 8UL << 20);
}

bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::readTimestampUnit(const config::ConfigBase* session) const {
const auto unit = session->get<uint8_t>(
kReadTimestampUnitSession,
Expand Down
80 changes: 0 additions & 80 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,53 +57,6 @@ class HiveConfig {
static constexpr const char* kImmutablePartitions =
"hive.immutable-partitions";

/// Virtual addressing is used for AWS S3 and is the default
/// (path-style-access is false). Path access style is used for some on-prem
/// systems like Minio.
static constexpr const char* kS3PathStyleAccess = "hive.s3.path-style-access";

/// Log granularity of AWS C++ SDK.
static constexpr const char* kS3LogLevel = "hive.s3.log-level";

/// Use HTTPS to communicate with the S3 API.
static constexpr const char* kS3SSLEnabled = "hive.s3.ssl.enabled";

/// Use the EC2 metadata service to retrieve API credentials.
static constexpr const char* kS3UseInstanceCredentials =
"hive.s3.use-instance-credentials";

/// The S3 storage endpoint server. This can be used to connect to an
/// S3-compatible storage system instead of AWS.
static constexpr const char* kS3Endpoint = "hive.s3.endpoint";

/// Default AWS access key to use.
static constexpr const char* kS3AwsAccessKey = "hive.s3.aws-access-key";

/// Default AWS secret key to use.
static constexpr const char* kS3AwsSecretKey = "hive.s3.aws-secret-key";

/// IAM role to assume.
static constexpr const char* kS3IamRole = "hive.s3.iam-role";

/// Session name associated with the IAM role.
static constexpr const char* kS3IamRoleSessionName =
"hive.s3.iam-role-session-name";

/// Socket connect timeout.
static constexpr const char* kS3ConnectTimeout = "hive.s3.connect-timeout";

/// Socket read timeout.
static constexpr const char* kS3SocketTimeout = "hive.s3.socket-timeout";

/// Maximum concurrent TCP connections for a single http client.
static constexpr const char* kS3MaxConnections = "hive.s3.max-connections";

/// Maximum retry attempts for a single http client.
static constexpr const char* kS3MaxAttempts = "hive.s3.max-attempts";

/// Retry mode for a single http client.
static constexpr const char* kS3RetryMode = "hive.s3.retry-mode";

/// The GCS storage endpoint server.
static constexpr const char* kGCSEndpoint = "hive.gcs.endpoint";

Expand Down Expand Up @@ -244,9 +197,6 @@ class HiveConfig {
static constexpr const char* kSortWriterFinishTimeSliceLimitMsSession =
"sort_writer_finish_time_slice_limit_ms";

static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

// The unit for reading timestamps from files.
static constexpr const char* kReadTimestampUnit =
"hive.reader.timestamp-unit";
Expand All @@ -263,34 +213,6 @@ class HiveConfig {

bool immutablePartitions() const;

bool s3UseVirtualAddressing() const;

std::string s3GetLogLevel() const;

bool s3UseSSL() const;

bool s3UseInstanceCredentials() const;

std::string s3Endpoint() const;

std::optional<std::string> s3AccessKey() const;

std::optional<std::string> s3SecretKey() const;

std::optional<std::string> s3IAMRole() const;

std::string s3IAMRoleSessionName() const;

std::optional<std::string> s3ConnectTimeout() const;

std::optional<std::string> s3SocketTimeout() const;

std::optional<uint32_t> s3MaxConnections() const;

std::optional<int32_t> s3MaxAttempts() const;

std::optional<std::string> s3RetryMode() const;

std::string gcsEndpoint() const;

std::string gcsCredentialsPath() const;
Expand Down Expand Up @@ -364,8 +286,6 @@ class HiveConfig {

uint64_t filePreloadThreshold() const;

bool s3UseProxyFromEnv() const;

// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const config::ConfigBase* session) const;

Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

velox_add_library(velox_s3fs RegisterS3FileSystem.cpp)
if(VELOX_ENABLE_S3)
velox_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)
velox_sources(
velox_s3fs
PRIVATE
S3FileSystem.cpp
S3Util.cpp
S3Config.cpp)

velox_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})
velox_link_libraries(velox_s3fs velox_dwio_common Folly::folly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
*/

#ifdef VELOX_ENABLE_S3
#include "velox/connectors/hive/HiveConfig.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" // @manual
#include "velox/dwio/common/FileSink.h"
#endif

#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" // @manual

namespace facebook::velox::filesystems {

#ifdef VELOX_ENABLE_S3
Expand All @@ -31,44 +29,53 @@ using FileSystemMap = folly::Synchronized<

/// Multiple S3 filesystems are supported.
/// Key is the endpoint value specified in the config using hive.s3.endpoint.
/// If the endpoint is empty, it will default to AWS S3.
/// If the endpoint is empty, it will default to AWS S3 Library.
/// Different S3 buckets can be accessed with different client configurations.
/// This allows for different endpoints, data read and write strategies.
/// The bucket specific option is set by replacing the hive.s3. prefix on an
/// option with hive.s3.bucket.BUCKETNAME., where BUCKETNAME is the name of the
/// bucket. When connecting to a bucket, all options explicitly set will
/// override the base hive.s3. values.

FileSystemMap& fileSystems() {
static FileSystemMap instances;
return instances;
}

std::string getS3Identity(const std::shared_ptr<config::ConfigBase>& config) {
HiveConfig hiveConfig = HiveConfig(config);
auto endpoint = hiveConfig.s3Endpoint();
if (!endpoint.empty()) {
// The identity is the endpoint.
return endpoint;
}
// Default key value.
return "aws-s3-key";
}

std::shared_ptr<FileSystem> fileSystemGenerator(
std::shared_ptr<const config::ConfigBase> properties,
std::string_view /*filePath*/) {
std::shared_ptr<config::ConfigBase> config =
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>());
if (properties) {
config = std::make_shared<config::ConfigBase>(properties->rawConfigsCopy());
std::string_view s3Path) {
std::string bucketName, key;
getBucketAndKeyFromPath(getPath(s3Path), bucketName, key);
auto identity = S3Config::identity(bucketName, properties);

// Check if an instance exists with a read lock (shared).
auto fs = fileSystems().withRLock(
[&](auto& instanceMap) -> std::shared_ptr<FileSystem> {
auto iterator = instanceMap.find(identity);
if (iterator != instanceMap.end()) {
return iterator->second;
}
return nullptr;
});
if (fs != nullptr) {
return fs;
}
const auto s3Identity = getS3Identity(config);

return fileSystems().withWLock(
[&](auto& instanceMap) -> std::shared_ptr<FileSystem> {
initializeS3(config.get());
auto iterator = instanceMap.find(s3Identity);
if (iterator == instanceMap.end()) {
auto fs = std::make_shared<S3FileSystem>(properties);
instanceMap.insert({s3Identity, fs});
return fs;
// Repeat the checks with a write lock.
auto iterator = instanceMap.find(identity);
if (iterator != instanceMap.end()) {
return iterator->second;
}
return iterator->second;

auto logLevel =
properties->get(S3Config::kS3LogLevel, std::string("FATAL"));
initializeS3(logLevel);
auto fs = std::make_shared<S3FileSystem>(bucketName, properties);
instanceMap.insert({identity, fs});
return fs;
});
}

Expand Down
77 changes: 77 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3Config.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"

#include "velox/common/config/Config.h"

namespace facebook::velox::filesystems {

std::string S3Config::identity(
std::string_view bucket,
std::shared_ptr<const config::ConfigBase> config) {
auto bucketEndpoint = bucketConfigKey(Keys::kEndpoint, bucket);
if (config->valueExists(bucketEndpoint)) {
auto value = config->get<std::string>(bucketEndpoint);
if (value.has_value()) {
return value.value();
}
}
auto baseEndpoint = baseConfigKey(Keys::kEndpoint);
if (config->valueExists(baseEndpoint)) {
auto value = config->get<std::string>(baseEndpoint);
if (value.has_value()) {
return value.value();
}
}
return kDefaultS3Identity;
}

S3Config::S3Config(
std::string_view bucket,
const std::shared_ptr<const config::ConfigBase> properties) {
for (int key = static_cast<int>(Keys::kBegin);
key < static_cast<int>(Keys::kEnd);
key++) {
auto s3Key = static_cast<Keys>(key);
auto value = S3Config::configTraits().find(s3Key)->second;
auto configSuffix = value.first;
auto configDefault = value.second;

// Set bucket S3 config "hive.s3.bucket.*" if present.
std::stringstream bucketConfig;
bucketConfig << kS3BucketPrefix << bucket << "." << configSuffix;
auto configVal = static_cast<std::optional<std::string>>(
properties->get<std::string>(bucketConfig.str()));
if (configVal.has_value()) {
config_[s3Key] = configVal.value();
} else {
// Set base config "hive.s3.*" if present.
std::stringstream baseConfig;
baseConfig << kS3Prefix << configSuffix;
configVal = static_cast<std::optional<std::string>>(
properties->get<std::string>(baseConfig.str()));
if (configVal.has_value()) {
config_[s3Key] = configVal.value();
} else {
// Set the default value.
config_[s3Key] = configDefault;
}
}
}
}

} // namespace facebook::velox::filesystems
Loading

0 comments on commit d76c05c

Please sign in to comment.