From db4a8fd0eb40bb8ddfdcd1bee53a628f3dfe6e43 Mon Sep 17 00:00:00 2001 From: yan ma Date: Thu, 9 May 2024 00:43:17 +0800 Subject: [PATCH 1/2] [GLUTEN-5659] Add more configs for AWS s3 --- cpp/velox/utils/ConfigExtractor.cc | 22 ++++++++++++++ docs/Configuration.md | 2 ++ .../org/apache/gluten/GlutenConfig.scala | 30 +++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index a71f143225b9..ec72879a35cf 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -34,6 +34,12 @@ const bool kVeloxFileHandleCacheEnabledDefault = false; // Log granularity of AWS C++ SDK const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel"; const std::string kVeloxAwsSdkLogLevelDefault = "FATAL"; +// Retry mode for AWS s3 +const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode"; +const std::string kVeloxS3RetryModeDefault = "legacy"; +// Connection timeout for AWS s3 +const std::string kVeloxS3ConnectTimeout = "spark.gluten.velox.fs.s3a.connect.timeout"; +const std::string kVeloxS3ConnectTimeoutDefault = "1s"; } // namespace namespace gluten { @@ -64,6 +70,10 @@ std::shared_ptr getHiveConfig(std::shared_ptr< bool useInstanceCredentials = conf->get("spark.hadoop.fs.s3a.use.instance.credentials", false); std::string iamRole = conf->get("spark.hadoop.fs.s3a.iam.role", ""); std::string iamRoleSessionName = conf->get("spark.hadoop.fs.s3a.iam.role.session.name", ""); + std::string retryMaxAttempts = conf->get("spark.hadoop.fs.s3a.retry.limit", "3"); + std::string retryMode = conf->get(kVeloxS3RetryMode, kVeloxS3RetryModeDefault); + std::string maxConnections = conf->get("spark.hadoop.fs.s3a.connection.maximum", "96"); + std::string connectTimeout = conf->get(kVeloxS3ConnectTimeout, kVeloxS3ConnectTimeoutDefault); std::string awsSdkLogLevel = conf->get(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault); @@ -79,6 +89,14 @@ std::shared_ptr getHiveConfig(std::shared_ptr< if (envAwsEndpoint != nullptr) { awsEndpoint = std::string(envAwsEndpoint); } + const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS"); + if (envRetryMaxAttempts != nullptr) { + retryMaxAttempts = std::string(envRetryMaxAttempts); + } + const char* envRetryMode = std::getenv("AWS_RETRY_MODE"); + if (envRetryMode != nullptr) { + retryMode = std::string(envRetryMode); + } if (useInstanceCredentials) { hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials] = "true"; @@ -98,6 +116,10 @@ std::shared_ptr getHiveConfig(std::shared_ptr< hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled] = sslEnabled ? "true" : "false"; hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess] = pathStyleAccess ? "true" : "false"; hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3LogLevel] = awsSdkLogLevel; + hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxAttempts] = retryMaxAttempts; + hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3RetryMode] = retryMode; + hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxConnections] = maxConnections; + hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3ConnectTimeout] = connectTimeout; #endif #ifdef ENABLE_GCS diff --git a/docs/Configuration.md b/docs/Configuration.md index 089675286f68..2c2bd4de11f2 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -89,6 +89,8 @@ The following configurations are related to Velox settings. | spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | Set the max coalesced bytes for velox file scan. | | | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | Set prefetch cache min pct for velox file scan. | | | spark.gluten.velox.awsSdkLogLevel | Log granularity of AWS C++ SDK in velox. | FATAL | +| spark.gluten.velox.fs.s3a.retry.mode | Retry mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive". | legacy | +| spark.gluten.velox.fs.s3a.connect.timeout | Timeout for AWS s3 connection. | 1s | | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | true | | spark.gluten.sql.complexType.scan.fallback.enabled | Force fallback for complex type scan, including struct, map, array. | true | diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 462032488548..eb52897c013b 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -435,6 +435,10 @@ class GlutenConfig(conf: SQLConf) extends Logging { def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL) + def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE) + + def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT) + def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED) def enableGlutenCostEvaluator: Boolean = conf.getConf(COST_EVALUATOR_ENABLED) @@ -487,6 +491,10 @@ object GlutenConfig { val SPARK_S3_IAM: String = HADOOP_PREFIX + S3_IAM_ROLE val S3_IAM_ROLE_SESSION_NAME = "fs.s3a.iam.role.session.name" val SPARK_S3_IAM_SESSION_NAME: String = HADOOP_PREFIX + S3_IAM_ROLE_SESSION_NAME + val S3_RETRY_MAX_ATTEMPTS = "fs.s3a.retry.limit" + val SPARK_S3_RETRY_MAX_ATTEMPTS: String = HADOOP_PREFIX + S3_RETRY_MAX_ATTEMPTS + val S3_CONNECTION_MAXIMUM = "fs.s3a.connection.maximum" + val SPARK_S3_CONNECTION_MAXIMUM: String = HADOOP_PREFIX + S3_CONNECTION_MAXIMUM // Hardware acceleraters backend val GLUTEN_SHUFFLE_CODEC_BACKEND = "spark.gluten.sql.columnar.shuffle.codecBackend" @@ -641,6 +649,10 @@ object GlutenConfig { SPARK_S3_USE_INSTANCE_CREDENTIALS, SPARK_S3_IAM, SPARK_S3_IAM_SESSION_NAME, + SPARK_S3_RETRY_MAX_ATTEMPTS, + SPARK_S3_CONNECTION_MAXIMUM, + AWS_S3_CONNECT_TIMEOUT.key, + AWS_S3_RETRY_MODE.key, AWS_SDK_LOG_LEVEL.key, // gcs config SPARK_GCS_STORAGE_ROOT_URL, @@ -692,6 +704,10 @@ object GlutenConfig { (SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"), (SPARK_S3_IAM, ""), (SPARK_S3_IAM_SESSION_NAME, ""), + (SPARK_S3_RETRY_MAX_ATTEMPTS, "3"), + (SPARK_S3_CONNECTION_MAXIMUM, "96"), + (AWS_S3_CONNECT_TIMEOUT.key, AWS_S3_CONNECT_TIMEOUT.defaultValueString), + (AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString), ( COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key, conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")), @@ -1932,6 +1948,20 @@ object GlutenConfig { .stringConf .createWithDefault("FATAL") + val AWS_S3_RETRY_MODE = + buildConf("spark.gluten.velox.fs.s3a.retry.mode") + .internal() + .doc("Retry mode for AWS s3 connection error.") + .stringConf + .createWithDefault("legacy") + + val AWS_S3_CONNECT_TIMEOUT = + buildConf("spark.gluten.velox.fs.s3a.connect.timeout") + .internal() + .doc("Timeout for AWS s3 connection.") + .stringConf + .createWithDefault("1s") + val VELOX_ORC_SCAN_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.orc.scan.enabled") .internal() From 67f3a903c795ed12996a4f37592b171de479a805 Mon Sep 17 00:00:00 2001 From: yan ma Date: Wed, 19 Jun 2024 16:47:57 +0800 Subject: [PATCH 2/2] address comments --- cpp/velox/utils/ConfigExtractor.cc | 7 ++++--- .../src/main/scala/org/apache/gluten/GlutenConfig.scala | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index ec72879a35cf..816166351c0e 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -39,7 +39,8 @@ const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode"; const std::string kVeloxS3RetryModeDefault = "legacy"; // Connection timeout for AWS s3 const std::string kVeloxS3ConnectTimeout = "spark.gluten.velox.fs.s3a.connect.timeout"; -const std::string kVeloxS3ConnectTimeoutDefault = "1s"; +// Using default fs.s3a.connection.timeout value in hadoop +const std::string kVeloxS3ConnectTimeoutDefault = "200s"; } // namespace namespace gluten { @@ -70,9 +71,9 @@ std::shared_ptr getHiveConfig(std::shared_ptr< bool useInstanceCredentials = conf->get("spark.hadoop.fs.s3a.use.instance.credentials", false); std::string iamRole = conf->get("spark.hadoop.fs.s3a.iam.role", ""); std::string iamRoleSessionName = conf->get("spark.hadoop.fs.s3a.iam.role.session.name", ""); - std::string retryMaxAttempts = conf->get("spark.hadoop.fs.s3a.retry.limit", "3"); + std::string retryMaxAttempts = conf->get("spark.hadoop.fs.s3a.retry.limit", "20"); std::string retryMode = conf->get(kVeloxS3RetryMode, kVeloxS3RetryModeDefault); - std::string maxConnections = conf->get("spark.hadoop.fs.s3a.connection.maximum", "96"); + std::string maxConnections = conf->get("spark.hadoop.fs.s3a.connection.maximum", "15"); std::string connectTimeout = conf->get(kVeloxS3ConnectTimeout, kVeloxS3ConnectTimeoutDefault); std::string awsSdkLogLevel = conf->get(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault); diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index eb52897c013b..39892568154a 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -704,8 +704,8 @@ object GlutenConfig { (SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"), (SPARK_S3_IAM, ""), (SPARK_S3_IAM_SESSION_NAME, ""), - (SPARK_S3_RETRY_MAX_ATTEMPTS, "3"), - (SPARK_S3_CONNECTION_MAXIMUM, "96"), + (SPARK_S3_RETRY_MAX_ATTEMPTS, "20"), + (SPARK_S3_CONNECTION_MAXIMUM, "15"), (AWS_S3_CONNECT_TIMEOUT.key, AWS_S3_CONNECT_TIMEOUT.defaultValueString), (AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString), ( @@ -1951,7 +1951,7 @@ object GlutenConfig { val AWS_S3_RETRY_MODE = buildConf("spark.gluten.velox.fs.s3a.retry.mode") .internal() - .doc("Retry mode for AWS s3 connection error.") + .doc("Retry mode for AWS s3 connection error: legacy, standard and adaptive.") .stringConf .createWithDefault("legacy") @@ -1960,7 +1960,7 @@ object GlutenConfig { .internal() .doc("Timeout for AWS s3 connection.") .stringConf - .createWithDefault("1s") + .createWithDefault("200s") val VELOX_ORC_SCAN_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.orc.scan.enabled")