Skip to content

Commit

Permalink
[GLUTEN-5659][VL] Add more configs for AWS s3 (#5660)
Browse files Browse the repository at this point in the history
Add more configs for AWS s3

spark.gluten.velox.fs.s3a.retry.mode
spark.gluten.velox.fs.s3a.connect.timeout
spark.hadoop.fs.s3a.retry.limit
spark.hadoop.fs.s3a.connection.maximum
  • Loading branch information
yma11 authored Jun 25, 2024
1 parent 4c52976 commit cf04f0f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 0 deletions.
23 changes: 23 additions & 0 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const bool kVeloxFileHandleCacheEnabledDefault = false;
// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
// Retry mode for AWS s3
const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
const std::string kVeloxS3RetryModeDefault = "legacy";
// Connection timeout for AWS s3
const std::string kVeloxS3ConnectTimeout = "spark.gluten.velox.fs.s3a.connect.timeout";
// Using default fs.s3a.connection.timeout value in hadoop
const std::string kVeloxS3ConnectTimeoutDefault = "200s";
} // namespace

namespace gluten {
Expand Down Expand Up @@ -64,6 +71,10 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
std::string retryMaxAttempts = conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode, kVeloxS3RetryModeDefault);
std::string maxConnections = conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout, kVeloxS3ConnectTimeoutDefault);

std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);

Expand All @@ -79,6 +90,14 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}
const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
if (envRetryMaxAttempts != nullptr) {
retryMaxAttempts = std::string(envRetryMaxAttempts);
}
const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
if (envRetryMode != nullptr) {
retryMode = std::string(envRetryMode);
}

if (useInstanceCredentials) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials] = "true";
Expand All @@ -98,6 +117,10 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled] = sslEnabled ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess] = pathStyleAccess ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3LogLevel] = awsSdkLogLevel;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxAttempts] = retryMaxAttempts;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3RetryMode] = retryMode;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxConnections] = maxConnections;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3ConnectTimeout] = connectTimeout;
#endif

#ifdef ENABLE_GCS
Expand Down
2 changes: 2 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ The following configurations are related to Velox settings.
| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | Set the max coalesced bytes for velox file scan. | |
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | Set prefetch cache min pct for velox file scan. | |
| spark.gluten.velox.awsSdkLogLevel | Log granularity of AWS C++ SDK in velox. | FATAL |
| spark.gluten.velox.fs.s3a.retry.mode | Retry mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive". | legacy |
| spark.gluten.velox.fs.s3a.connect.timeout | Timeout for AWS s3 connection. | 1s |
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | true |
| spark.gluten.sql.complexType.scan.fallback.enabled | Force fallback for complex type scan, including struct, map, array. | true |

Expand Down
30 changes: 30 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL)

def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE)

def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT)

def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)

def enableGlutenCostEvaluator: Boolean = conf.getConf(COST_EVALUATOR_ENABLED)
Expand Down Expand Up @@ -488,6 +492,10 @@ object GlutenConfig {
val SPARK_S3_IAM: String = HADOOP_PREFIX + S3_IAM_ROLE
val S3_IAM_ROLE_SESSION_NAME = "fs.s3a.iam.role.session.name"
val SPARK_S3_IAM_SESSION_NAME: String = HADOOP_PREFIX + S3_IAM_ROLE_SESSION_NAME
val S3_RETRY_MAX_ATTEMPTS = "fs.s3a.retry.limit"
val SPARK_S3_RETRY_MAX_ATTEMPTS: String = HADOOP_PREFIX + S3_RETRY_MAX_ATTEMPTS
val S3_CONNECTION_MAXIMUM = "fs.s3a.connection.maximum"
val SPARK_S3_CONNECTION_MAXIMUM: String = HADOOP_PREFIX + S3_CONNECTION_MAXIMUM

// Hardware acceleraters backend
val GLUTEN_SHUFFLE_CODEC_BACKEND = "spark.gluten.sql.columnar.shuffle.codecBackend"
Expand Down Expand Up @@ -642,6 +650,10 @@ object GlutenConfig {
SPARK_S3_USE_INSTANCE_CREDENTIALS,
SPARK_S3_IAM,
SPARK_S3_IAM_SESSION_NAME,
SPARK_S3_RETRY_MAX_ATTEMPTS,
SPARK_S3_CONNECTION_MAXIMUM,
AWS_S3_CONNECT_TIMEOUT.key,
AWS_S3_RETRY_MODE.key,
AWS_SDK_LOG_LEVEL.key,
// gcs config
SPARK_GCS_STORAGE_ROOT_URL,
Expand Down Expand Up @@ -693,6 +705,10 @@ object GlutenConfig {
(SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
(SPARK_S3_IAM, ""),
(SPARK_S3_IAM_SESSION_NAME, ""),
(SPARK_S3_RETRY_MAX_ATTEMPTS, "20"),
(SPARK_S3_CONNECTION_MAXIMUM, "15"),
(AWS_S3_CONNECT_TIMEOUT.key, AWS_S3_CONNECT_TIMEOUT.defaultValueString),
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
Expand Down Expand Up @@ -1941,6 +1957,20 @@ object GlutenConfig {
.stringConf
.createWithDefault("FATAL")

val AWS_S3_RETRY_MODE =
buildConf("spark.gluten.velox.fs.s3a.retry.mode")
.internal()
.doc("Retry mode for AWS s3 connection error: legacy, standard and adaptive.")
.stringConf
.createWithDefault("legacy")

val AWS_S3_CONNECT_TIMEOUT =
buildConf("spark.gluten.velox.fs.s3a.connect.timeout")
.internal()
.doc("Timeout for AWS s3 connection.")
.stringConf
.createWithDefault("200s")

val VELOX_ORC_SCAN_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.orc.scan.enabled")
.internal()
Expand Down

0 comments on commit cf04f0f

Please sign in to comment.