Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7964][VL] Support S3 Bucket Config #8123

Merged
merged 2 commits into from
Jan 1, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 91 additions & 79 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,6 @@ namespace {

const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
const bool kVeloxFileHandleCacheEnabledDefault = false;

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
// Retry mode for AWS s3
const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
const std::string kVeloxS3RetryModeDefault = "legacy";
// Connection timeout for AWS s3
const std::string kVeloxS3ConnectTimeout = "spark.gluten.velox.fs.s3a.connect.timeout";
// Using default fs.s3a.connection.timeout value in hadoop
const std::string kVeloxS3ConnectTimeoutDefault = "200s";
} // namespace

namespace gluten {
Expand All @@ -62,76 +51,99 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
std::unordered_map<std::string, std::string> hiveConfMap;

#ifdef ENABLE_S3
std::string awsAccessKey = conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
std::string retryMaxAttempts = conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode, kVeloxS3RetryModeDefault);
std::string maxConnections = conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout, kVeloxS3ConnectTimeoutDefault);

std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}
const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
if (envRetryMaxAttempts != nullptr) {
retryMaxAttempts = std::string(envRetryMaxAttempts);
}
const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
if (envRetryMode != nullptr) {
retryMode = std::string(envRetryMode);
}

if (useInstanceCredentials) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kUseInstanceCredentials)] = "true";
} else if (!iamRole.empty()) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kIamRole)] = iamRole;
if (!iamRoleSessionName.empty()) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kIamRoleSessionName)] = iamRoleSessionName;
using namespace facebook::velox::filesystems;
std::string_view kSparkHadoopPrefix = "spark.hadoop.fs.s3a.";
std::string_view kSparkHadoopBucketPrefix = "spark.hadoop.fs.s3a.bucket.";

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";

const std::unordered_map<S3Config::Keys, std::pair<std::string, std::optional<std::string>>> sparkSuffixes = {
{S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
{S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
{S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
{S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled", "false")},
{S3Config::Keys::kPathStyleAccess, std::make_pair("path.style.access", "false")},
{S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit", std::nullopt)},
{S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
{S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum", "15")},
{S3Config::Keys::kConnectTimeout, std::make_pair("connection.timeout", "200s")},
{S3Config::Keys::kUseInstanceCredentials, std::make_pair("instance.credentials", "false")},
{S3Config::Keys::kIamRole, std::make_pair("iam.role", std::nullopt)},
{S3Config::Keys::kIamRoleSessionName, std::make_pair("iam.role.session.name", "gluten-session")},
};

// get Velox S3 config key from Spark Suffix.
auto getVeloxKey = [&](std::string_view suffix) {
for (const auto& [key, value] : sparkSuffixes) {
if (value.first == suffix) {
return std::optional<S3Config::Keys>(key);
}
}
return std::optional<S3Config::Keys>(std::nullopt);
};

auto sparkBaseConfigValue = [&](S3Config::Keys key) {
std::stringstream ss;
auto keyValue = sparkSuffixes.find(key)->second;
ss << kSparkHadoopPrefix << keyValue.first;
auto sparkKey = ss.str();
if (conf->valueExists(sparkKey)) {
return static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
}
// Return default value.
return keyValue.second;
};

auto setConfigIfPresent = [&](S3Config::Keys key) {
auto sparkConfig = sparkBaseConfigValue(key);
if (sparkConfig.has_value()) {
hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
}
};

auto setFromEnvOrConfigIfPresent = [&](std::string_view envName, S3Config::Keys key) {
const char* envValue = std::getenv(envName.data());
if (envValue != nullptr) {
hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
} else {
setConfigIfPresent(key);
}
};

setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS", S3Config::Keys::kMaxAttempts);
setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY", S3Config::Keys::kSecretKey);
setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
setConfigIfPresent(S3Config::Keys::kIamRole);
setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
setConfigIfPresent(S3Config::Keys::kSSLEnabled);
setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
setConfigIfPresent(S3Config::Keys::kMaxConnections);
setConfigIfPresent(S3Config::Keys::kConnectTimeout);

hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);
;

// Convert all Spark bucket configs to Velox bucket configs.
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopBucketPrefix) == 0) {
std::string_view skey = key;
auto remaining = skey.substr(kSparkHadoopBucketPrefix.size());
int dot = remaining.find(".");
auto bucketName = remaining.substr(0, dot);
auto suffix = remaining.substr(dot + 1);
auto veloxKey = getVeloxKey(suffix);

if (veloxKey.has_value()) {
hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] = value;
}
}
} else {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kAccessKey)] = awsAccessKey;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kSecretKey)] = awsSecretKey;
}
// Only need to set s3 endpoint when not use instance credentials.
if (!useInstanceCredentials) {
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kEndpoint)] = awsEndpoint;
}
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kSSLEnabled)] = sslEnabled ? "true" : "false";
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kPathStyleAccess)] = pathStyleAccess ? "true" : "false";
hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] = awsSdkLogLevel;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kMaxAttempts)] = retryMaxAttempts;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kRetryMode)] = retryMode;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kMaxConnections)] = maxConnections;
hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
facebook::velox::filesystems::S3Config::Keys::kConnectTimeout)] = connectTimeout;
#endif

#ifdef ENABLE_GCS
Expand Down
Loading