Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak committed Dec 3, 2024
1 parent 228ab9c commit 8740586
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 70 deletions.
18 changes: 9 additions & 9 deletions cpp/velox/tests/HiveConfigTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
namespace gluten {

TEST(S3ConfigTest, defaultConfig) {
auto s3Config = facebook::velox::filesystems::S3Config("", getHiveConfig({}));
ASSERT_EQ(s3Config.useVirtualAddressing(), true);
ASSERT_EQ(s3Config.useSSL(), false);
ASSERT_EQ(s3Config.useInstanceCredentials(), false);
ASSERT_EQ(s3Config.endpoint(), "");
ASSERT_EQ(s3Config.accessKey(), std::nullopt);
ASSERT_EQ(s3Config.secretKey(), std::nullopt);
ASSERT_EQ(s3Config.iamRole(), std::nullopt);
ASSERT_EQ(s3Config.iamRoleSessionName(), "spark-session");
auto s3Config = facebook::velox::filesystems::S3Config("", getHiveConfig({}));
ASSERT_EQ(s3Config.useVirtualAddressing(), true);
ASSERT_EQ(s3Config.useSSL(), false);
ASSERT_EQ(s3Config.useInstanceCredentials(), false);
ASSERT_EQ(s3Config.endpoint(), "");
ASSERT_EQ(s3Config.accessKey(), std::nullopt);
ASSERT_EQ(s3Config.secretKey(), std::nullopt);
ASSERT_EQ(s3Config.iamRole(), std::nullopt);
ASSERT_EQ(s3Config.iamRoleSessionName(), "spark-session");
}

} // namespace gluten
123 changes: 62 additions & 61 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,86 +60,87 @@ std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";

const std::unordered_map<S3Config::Keys, std::pair<std::string, std::optional<std::string>>> sparkSuffixes = {
{S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
{S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
{S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
{S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled", "false")},
{S3Config::Keys::kPathStyleAccess,std::make_pair("path.style.access", "false")},
{S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit", std::nullopt)},
{S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
{S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum" , "15")},
{S3Config::Keys::kConnectTimeout, std::make_pair("connect-timeout", "200s")},
{S3Config::Keys::kUseInstanceCredentials, std::make_pair("use-instance-credentials", "false")},
{S3Config::Keys::kIamRole, std::make_pair("iam-role", std::nullopt)},
{S3Config::Keys::kIamRoleSessionName,std::make_pair("iam-role-session-name", "gluten-session")},
};
{S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
{S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
{S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
{S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled", "false")},
{S3Config::Keys::kPathStyleAccess, std::make_pair("path.style.access", "false")},
{S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit", std::nullopt)},
{S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
{S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum", "15")},
{S3Config::Keys::kConnectTimeout, std::make_pair("connect-timeout", "200s")},
{S3Config::Keys::kUseInstanceCredentials, std::make_pair("use-instance-credentials", "false")},
{S3Config::Keys::kIamRole, std::make_pair("iam-role", std::nullopt)},
{S3Config::Keys::kIamRoleSessionName, std::make_pair("iam-role-session-name", "gluten-session")},
};

// get Velox S3 config key from Spark Suffix.
auto getVeloxKey = [&](std::string_view suffix) {
for (const auto& [key, value] : sparkSuffixes) {
if (value.first == suffix) {
return std::optional<S3Config::Keys>(key);
}
for (const auto& [key, value] : sparkSuffixes) {
if (value.first == suffix) {
return std::optional<S3Config::Keys>(key);
}
return std::optional<S3Config::Keys>(std::nullopt);
}
return std::optional<S3Config::Keys>(std::nullopt);
};

auto sparkBaseConfigValue = [&](S3Config::Keys key) {
std::stringstream ss;
auto keyValue = sparkSuffixes.find(key)->second;
ss << kSparkHadoopPrefix << keyValue.first;
auto sparkKey = ss.str();
if (conf->valueExists(sparkKey)) {
return static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
}
// Return default value.
return keyValue.second;
std::stringstream ss;
auto keyValue = sparkSuffixes.find(key)->second;
ss << kSparkHadoopPrefix << keyValue.first;
auto sparkKey = ss.str();
if (conf->valueExists(sparkKey)) {
return static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
}
// Return default value.
return keyValue.second;
};

auto setConfigIfPresent = [&](S3Config::Keys key) {
auto sparkConfig = sparkBaseConfigValue(key);
if (sparkConfig.has_value()) {
hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
}
auto sparkConfig = sparkBaseConfigValue(key);
if (sparkConfig.has_value()) {
hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
}
};

auto setFromEnvOrConfigIfPresent = [&](std::string_view envName, S3Config::Keys key) {
const char* envValue = std::getenv(envName.data());
if (envValue != nullptr) {
hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
} else {
setConfigIfPresent(key);
}
const char* envValue = std::getenv(envName.data());
if (envValue != nullptr) {
hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
} else {
setConfigIfPresent(key);
}
};

setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS", S3Config::Keys::kMaxAttempts);
setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY", S3Config::Keys::kSecretKey);
setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
setConfigIfPresent(S3Config::Keys::kIamRole);
setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
setConfigIfPresent(S3Config::Keys::kSSLEnabled);
setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
setConfigIfPresent(S3Config::Keys::kMaxConnections);
setConfigIfPresent(S3Config::Keys::kConnectTimeout);

hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);;
setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS", S3Config::Keys::kMaxAttempts);
setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY", S3Config::Keys::kSecretKey);
setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
setConfigIfPresent(S3Config::Keys::kIamRole);
setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
setConfigIfPresent(S3Config::Keys::kSSLEnabled);
setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
setConfigIfPresent(S3Config::Keys::kMaxConnections);
setConfigIfPresent(S3Config::Keys::kConnectTimeout);

hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);
;

// Convert all Spark bucket configs to Velox bucket configs.
for (const auto& [key, value] : conf->rawConfigs()) {
if (key.find(kSparkHadoopBucketPrefix) == 0) {
auto remaining = key.substr(kSparkHadoopBucketPrefix.size());
int dot = remaining.find(".");
auto bucketName = remaining.substr(0, dot);
auto suffix = remaining.substr(dot + 1);
auto veloxKey = getVeloxKey(suffix);

if (veloxKey.has_value()) {
hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] = value;
}
auto remaining = key.substr(kSparkHadoopBucketPrefix.size());
int dot = remaining.find(".");
auto bucketName = remaining.substr(0, dot);
auto suffix = remaining.substr(dot + 1);
auto veloxKey = getVeloxKey(suffix);

if (veloxKey.has_value()) {
hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] = value;
}
}
}
#endif
Expand Down

0 comments on commit 8740586

Please sign in to comment.