Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5659][VL] Add more configs for AWS s3 #5660

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const bool kVeloxFileHandleCacheEnabledDefault = false;
// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
// Retry mode for AWS s3
const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
const std::string kVeloxS3RetryModeDefault = "legacy";
// Connection timeout for AWS s3
const std::string kVeloxS3ConnectTimeout = "spark.gluten.velox.fs.s3a.connect.timeout";
// Using default fs.s3a.connection.timeout value in hadoop
const std::string kVeloxS3ConnectTimeoutDefault = "200s";
} // namespace

namespace gluten {
Expand Down Expand Up @@ -64,6 +71,10 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
std::string retryMaxAttempts = conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode, kVeloxS3RetryModeDefault);
std::string maxConnections = conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout, kVeloxS3ConnectTimeoutDefault);

std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);

Expand All @@ -79,6 +90,14 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}
const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
if (envRetryMaxAttempts != nullptr) {
retryMaxAttempts = std::string(envRetryMaxAttempts);
}
const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
if (envRetryMode != nullptr) {
retryMode = std::string(envRetryMode);
}

if (useInstanceCredentials) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials] = "true";
Expand All @@ -98,6 +117,10 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled] = sslEnabled ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess] = pathStyleAccess ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3LogLevel] = awsSdkLogLevel;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxAttempts] = retryMaxAttempts;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3RetryMode] = retryMode;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxConnections] = maxConnections;
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3ConnectTimeout] = connectTimeout;
#endif

#ifdef ENABLE_GCS
Expand Down
2 changes: 2 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ The following configurations are related to Velox settings.
| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | Set the max coalesced bytes for velox file scan. | |
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | Set prefetch cache min pct for velox file scan. | |
| spark.gluten.velox.awsSdkLogLevel | Log granularity of AWS C++ SDK in velox. | FATAL |
| spark.gluten.velox.fs.s3a.retry.mode | Retry mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive". | legacy |
| spark.gluten.velox.fs.s3a.connect.timeout | Timeout for AWS s3 connection. | 1s |
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | true |
| spark.gluten.sql.complexType.scan.fallback.enabled | Force fallback for complex type scan, including struct, map, array. | true |

Expand Down
30 changes: 30 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL)

def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE)

def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT)

def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)

def enableGlutenCostEvaluator: Boolean = conf.getConf(COST_EVALUATOR_ENABLED)
Expand Down Expand Up @@ -488,6 +492,10 @@ object GlutenConfig {
val SPARK_S3_IAM: String = HADOOP_PREFIX + S3_IAM_ROLE
val S3_IAM_ROLE_SESSION_NAME = "fs.s3a.iam.role.session.name"
val SPARK_S3_IAM_SESSION_NAME: String = HADOOP_PREFIX + S3_IAM_ROLE_SESSION_NAME
val S3_RETRY_MAX_ATTEMPTS = "fs.s3a.retry.limit"
val SPARK_S3_RETRY_MAX_ATTEMPTS: String = HADOOP_PREFIX + S3_RETRY_MAX_ATTEMPTS
val S3_CONNECTION_MAXIMUM = "fs.s3a.connection.maximum"
val SPARK_S3_CONNECTION_MAXIMUM: String = HADOOP_PREFIX + S3_CONNECTION_MAXIMUM

// Hardware acceleraters backend
val GLUTEN_SHUFFLE_CODEC_BACKEND = "spark.gluten.sql.columnar.shuffle.codecBackend"
Expand Down Expand Up @@ -642,6 +650,10 @@ object GlutenConfig {
SPARK_S3_USE_INSTANCE_CREDENTIALS,
SPARK_S3_IAM,
SPARK_S3_IAM_SESSION_NAME,
SPARK_S3_RETRY_MAX_ATTEMPTS,
SPARK_S3_CONNECTION_MAXIMUM,
AWS_S3_CONNECT_TIMEOUT.key,
AWS_S3_RETRY_MODE.key,
AWS_SDK_LOG_LEVEL.key,
// gcs config
SPARK_GCS_STORAGE_ROOT_URL,
Expand Down Expand Up @@ -693,6 +705,10 @@ object GlutenConfig {
(SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
(SPARK_S3_IAM, ""),
(SPARK_S3_IAM_SESSION_NAME, ""),
(SPARK_S3_RETRY_MAX_ATTEMPTS, "20"),
(SPARK_S3_CONNECTION_MAXIMUM, "15"),
(AWS_S3_CONNECT_TIMEOUT.key, AWS_S3_CONNECT_TIMEOUT.defaultValueString),
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
Expand Down Expand Up @@ -1943,6 +1959,20 @@ object GlutenConfig {
.stringConf
.createWithDefault("FATAL")

val AWS_S3_RETRY_MODE =
buildConf("spark.gluten.velox.fs.s3a.retry.mode")
.internal()
.doc("Retry mode for AWS s3 connection error: legacy, standard and adaptive.")
.stringConf
.createWithDefault("legacy")

val AWS_S3_CONNECT_TIMEOUT =
buildConf("spark.gluten.velox.fs.s3a.connect.timeout")
.internal()
.doc("Timeout for AWS s3 connection.")
.stringConf
.createWithDefault("200s")

val VELOX_ORC_SCAN_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.orc.scan.enabled")
.internal()
Expand Down
Loading