From 3e23ad40f49590b0d5c4d2cdda88ed5d8bf64102 Mon Sep 17 00:00:00 2001 From: Qian Sun Date: Fri, 22 Dec 2023 20:06:42 +0800 Subject: [PATCH] [GLUTEN-4121][VL] Initialize s3 filesystem with hive configuration in native write (#4129) * Initialize s3 filesystem with hive configuration in native write * add configuration in native session conf * use FileSink --- cpp/velox/compute/VeloxBackend.cc | 104 +--------------- .../writer/VeloxParquetDatasource.cc | 11 +- cpp/velox/utils/ConfigExtractor.cc | 115 ++++++++++++++++++ cpp/velox/utils/ConfigExtractor.h | 3 + .../scala/io/glutenproject/GlutenConfig.scala | 26 +++- 5 files changed, 154 insertions(+), 105 deletions(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index a27563f5a877..c14b8b2eb7fc 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -22,6 +22,7 @@ #include "operators/functions/RegistrationAllFunctions.h" #include "operators/plannodes/RowVectorStream.h" +#include "utils/ConfigExtractor.h" #include "shuffle/VeloxShuffleReader.h" @@ -31,9 +32,6 @@ #ifdef GLUTEN_ENABLE_IAA #include "utils/qpl/qpl_codec.h" #endif -#ifdef ENABLE_GCS -#include -#endif #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" #include "jni/JniFileSystem.h" @@ -43,7 +41,6 @@ #include "velox/common/caching/SsdCache.h" #include "velox/common/file/FileSystems.h" #include "velox/common/memory/MmapAllocator.h" -#include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" #include "velox/serializers/PrestoSerializer.h" @@ -114,13 +111,6 @@ const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation"; // VeloxShuffleReader print flag. const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag"; -const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled"; -const bool kVeloxFileHandleCacheEnabledDefault = false; - -// Log granularity of AWS C++ SDK -const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel"; -const std::string kVeloxAwsSdkLogLevelDefault = "FATAL"; - } // namespace namespace gluten { @@ -265,51 +255,10 @@ void VeloxBackend::initConnector(const std::shared_ptr(conf->valuesCopy()); -#ifdef ENABLE_S3 - std::string awsAccessKey = conf->get("spark.hadoop.fs.s3a.access.key", ""); - std::string awsSecretKey = conf->get("spark.hadoop.fs.s3a.secret.key", ""); - std::string awsEndpoint = conf->get("spark.hadoop.fs.s3a.endpoint", ""); - bool sslEnabled = conf->get("spark.hadoop.fs.s3a.connection.ssl.enabled", false); - bool pathStyleAccess = conf->get("spark.hadoop.fs.s3a.path.style.access", false); - bool useInstanceCredentials = conf->get("spark.hadoop.fs.s3a.use.instance.credentials", false); - std::string iamRole = conf->get("spark.hadoop.fs.s3a.iam.role", ""); - std::string iamRoleSessionName = conf->get("spark.hadoop.fs.s3a.iam.role.session.name", ""); - - std::string awsSdkLogLevel = conf->get(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault); - - const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID"); - if (envAwsAccessKey != nullptr) { - awsAccessKey = std::string(envAwsAccessKey); + auto hiveConf = getHiveConfig(conf); + for (auto& [k, v] : hiveConf->valuesCopy()) { + mutableConf->setValue(k, v); } - const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY"); - if (envAwsSecretKey != nullptr) { - awsSecretKey = std::string(envAwsSecretKey); - } - const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT"); - if (envAwsEndpoint != nullptr) { - awsEndpoint = std::string(envAwsEndpoint); - } - - std::unordered_map s3Config({}); - if (useInstanceCredentials) { - mutableConf->setValue("hive.s3.use-instance-credentials", "true"); - } else if (!iamRole.empty()) { - mutableConf->setValue("hive.s3.iam-role", iamRole); - if (!iamRoleSessionName.empty()) { - mutableConf->setValue("hive.s3.iam-role-session-name", iamRoleSessionName); - } - } else { - mutableConf->setValue("hive.s3.aws-access-key", awsAccessKey); - mutableConf->setValue("hive.s3.aws-secret-key", awsSecretKey); - } - // Only need to set s3 endpoint when not use instance credentials. - if (!useInstanceCredentials) { - mutableConf->setValue("hive.s3.endpoint", awsEndpoint); - } - mutableConf->setValue("hive.s3.ssl.enabled", sslEnabled ? "true" : "false"); - mutableConf->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false"); - mutableConf->setValue("hive.s3.log-level", awsSdkLogLevel); -#endif #ifdef ENABLE_ABFS const auto& confValue = conf->valuesCopy(); @@ -323,51 +272,6 @@ void VeloxBackend::initConnector(const std::shared_ptrget("spark.hadoop.fs.gs.storage.root.url"); - if (gsStorageRootUrl.hasValue()) { - std::string url = gsStorageRootUrl.value(); - std::string gcsScheme; - std::string gcsEndpoint; - - const auto sep = std::string("://"); - const auto pos = url.find_first_of(sep); - if (pos != std::string::npos) { - gcsScheme = url.substr(0, pos); - gcsEndpoint = url.substr(pos + sep.length()); - } - - if (!gcsEndpoint.empty() && !gcsScheme.empty()) { - mutableConf->setValue("hive.gcs.scheme", gcsScheme); - mutableConf->setValue("hive.gcs.endpoint", gcsEndpoint); - } - } - - // https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication - auto gsAuthType = conf->get("spark.hadoop.fs.gs.auth.type"); - if (gsAuthType.hasValue()) { - std::string type = gsAuthType.value(); - if (type == "SERVICE_ACCOUNT_JSON_KEYFILE") { - auto gsAuthServiceAccountJsonKeyfile = conf->get("spark.hadoop.fs.gs.auth.service.account.json.keyfile"); - if (gsAuthServiceAccountJsonKeyfile.hasValue()) { - auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile.value()); - stream.exceptions(std::ios::badbit); - std::string gsAuthServiceAccountJson = std::string(std::istreambuf_iterator(stream.rdbuf()), {}); - mutableConf->setValue("hive.gcs.credentials", gsAuthServiceAccountJson); - } else { - LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, " - "however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set"; - throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set"); - } - } - } -#endif - - mutableConf->setValue( - velox::connector::hive::HiveConfig::kEnableFileHandleCache, - conf->get(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false"); - if (ioThreads > 0) { ioExecutor_ = std::make_unique(ioThreads); } diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc b/cpp/velox/operators/writer/VeloxParquetDatasource.cc index 2fba5d05c4a5..3fd9a003ebac 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc @@ -25,6 +25,7 @@ #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" +#include "utils/ConfigExtractor.h" #include "utils/VeloxArrowUtils.h" #include "velox/common/compression/Compression.h" #include "velox/core/QueryConfig.h" @@ -49,10 +50,12 @@ void VeloxParquetDatasource::init(const std::unordered_map(std::move(localWriteFile), path); } else if (isSupportedS3SdkPath(filePath_)) { #ifdef ENABLE_S3 - auto fileSystem = getFileSystem(filePath_, nullptr); - auto* s3FileSystem = dynamic_cast(fileSystem.get()); - sink_ = std::make_unique( - s3FileSystem->openFileForWrite(filePath_, {{}, s3SinkPool_.get()}), filePath_); + auto confs = std::make_shared(sparkConfs); + auto hiveConfs = getHiveConfig(confs); + sink_ = dwio::common::FileSink::create( + filePath_, + {.connectorProperties = std::make_shared(hiveConfs->valuesCopy()), + .pool = s3SinkPool_.get()}); #else throw std::runtime_error( "The write path is S3 path but the S3 haven't been enabled when writing parquet data in velox runtime!"); diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index 59f309bb0c14..a4b5280e1640 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -19,6 +19,22 @@ #include "ConfigExtractor.h" #include +#ifdef ENABLE_GCS +#include +#endif + +#include "utils/exception.h" +#include "velox/connectors/hive/HiveConfig.h" + +namespace { + +const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled"; +const bool kVeloxFileHandleCacheEnabledDefault = false; + +// Log granularity of AWS C++ SDK +const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel"; +const std::string kVeloxAwsSdkLogLevelDefault = "FATAL"; +} // namespace namespace gluten { @@ -41,4 +57,103 @@ bool debugModeEnabled(const std::unordered_map& confMa std::make_shared(confMap); return veloxCfg->get(kDebugModeEnabled, false); } + +std::shared_ptr getHiveConfig( + const std::shared_ptr& conf) { + auto hiveConf = std::make_shared(); + +#ifdef ENABLE_S3 + std::string awsAccessKey = conf->get("spark.hadoop.fs.s3a.access.key", ""); + std::string awsSecretKey = conf->get("spark.hadoop.fs.s3a.secret.key", ""); + std::string awsEndpoint = conf->get("spark.hadoop.fs.s3a.endpoint", ""); + bool sslEnabled = conf->get("spark.hadoop.fs.s3a.connection.ssl.enabled", false); + bool pathStyleAccess = conf->get("spark.hadoop.fs.s3a.path.style.access", false); + bool useInstanceCredentials = conf->get("spark.hadoop.fs.s3a.use.instance.credentials", false); + std::string iamRole = conf->get("spark.hadoop.fs.s3a.iam.role", ""); + std::string iamRoleSessionName = conf->get("spark.hadoop.fs.s3a.iam.role.session.name", ""); + + std::string awsSdkLogLevel = conf->get(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault); + + const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID"); + if (envAwsAccessKey != nullptr) { + awsAccessKey = std::string(envAwsAccessKey); + } + const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY"); + if (envAwsSecretKey != nullptr) { + awsSecretKey = std::string(envAwsSecretKey); + } + const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT"); + if (envAwsEndpoint != nullptr) { + awsEndpoint = std::string(envAwsEndpoint); + } + + if (useInstanceCredentials) { + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials, "true"); + } else if (!iamRole.empty()) { + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3IamRole, iamRole); + if (!iamRoleSessionName.empty()) { + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3IamRoleSessionName, iamRoleSessionName); + } + } else { + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3AwsAccessKey, awsAccessKey); + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3AwsSecretKey, awsSecretKey); + } + // Only need to set s3 endpoint when not use instance credentials. + if (!useInstanceCredentials) { + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3Endpoint, awsEndpoint); + } + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled, sslEnabled ? "true" : "false"); + hiveConf->setValue( + facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess, pathStyleAccess ? "true" : "false"); + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3LogLevel, awsSdkLogLevel); +#endif + +#ifdef ENABLE_GCS + // https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration + auto gsStorageRootUrl = conf->get("spark.hadoop.fs.gs.storage.root.url"); + if (gsStorageRootUrl.hasValue()) { + std::string url = gsStorageRootUrl.value(); + std::string gcsScheme; + std::string gcsEndpoint; + + const auto sep = std::string("://"); + const auto pos = url.find_first_of(sep); + if (pos != std::string::npos) { + gcsScheme = url.substr(0, pos); + gcsEndpoint = url.substr(pos + sep.length()); + } + + if (!gcsEndpoint.empty() && !gcsScheme.empty()) { + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kGCSScheme, gcsScheme); + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kGCSEndpoint, gcsEndpoint); + } + } + + // https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication + auto gsAuthType = conf->get("spark.hadoop.fs.gs.auth.type"); + if (gsAuthType.hasValue()) { + std::string type = gsAuthType.value(); + if (type == "SERVICE_ACCOUNT_JSON_KEYFILE") { + auto gsAuthServiceAccountJsonKeyfile = conf->get("spark.hadoop.fs.gs.auth.service.account.json.keyfile"); + if (gsAuthServiceAccountJsonKeyfile.hasValue()) { + auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile.value()); + stream.exceptions(std::ios::badbit); + std::string gsAuthServiceAccountJson = std::string(std::istreambuf_iterator(stream.rdbuf()), {}); + hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kGCSCredentials, gsAuthServiceAccountJson); + } else { + LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, " + "however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set"; + throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set"); + } + } + } +#endif + + hiveConf->setValue( + facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache, + conf->get(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false"); + + return hiveConf; +} + } // namespace gluten diff --git a/cpp/velox/utils/ConfigExtractor.h b/cpp/velox/utils/ConfigExtractor.h index 1df2d2dbf39e..e50d058d97c6 100644 --- a/cpp/velox/utils/ConfigExtractor.h +++ b/cpp/velox/utils/ConfigExtractor.h @@ -35,4 +35,7 @@ std::string getConfigValue( bool debugModeEnabled(const std::unordered_map& confMap); +std::shared_ptr getHiveConfig( + const std::shared_ptr& conf); + } // namespace gluten diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 28edb2e55bf2..6a70d8193af6 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -356,6 +356,16 @@ object GlutenConfig { val ABFS_ACCOUNT_KEY = "hadoop.fs.azure.account.key" val SPARK_ABFS_ACCOUNT_KEY: String = "spark." + ABFS_ACCOUNT_KEY + // GCS config + val GCS_PREFIX = "fs.gs." + val GCS_STORAGE_ROOT_URL = "fs.gs.storage.root.url" + val SPARK_GCS_STORAGE_ROOT_URL: String = HADOOP_PREFIX + GCS_STORAGE_ROOT_URL + val GCS_AUTH_TYPE = "fs.gs.auth.type" + val SPARK_GCS_AUTH_TYPE: String = HADOOP_PREFIX + GCS_AUTH_TYPE + val GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE = "fs.gs.auth.service.account.json.keyfile" + val SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE: String = + HADOOP_PREFIX + GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE + // QAT config val GLUTEN_QAT_BACKEND_NAME = "qat" val GLUTEN_QAT_SUPPORTED_CODEC: Set[String] = Set("gzip", "zstd") @@ -461,7 +471,21 @@ object GlutenConfig { "spark.io.compression.codec", COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key, COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS.key, - COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key + COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key, + // s3 config + SPARK_S3_ACCESS_KEY, + SPARK_S3_SECRET_KEY, + SPARK_S3_ENDPOINT, + SPARK_S3_CONNECTION_SSL_ENABLED, + SPARK_S3_PATH_STYLE_ACCESS, + SPARK_S3_USE_INSTANCE_CREDENTIALS, + SPARK_S3_IAM, + SPARK_S3_IAM_SESSION_NAME, + AWS_SDK_LOG_LEVEL.key, + // gcs config + SPARK_GCS_STORAGE_ROOT_URL, + SPARK_GCS_AUTH_TYPE, + SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE ) keys.forEach( k => {