From bc25d074e299753aa5464367e1af266e2c5dad20 Mon Sep 17 00:00:00 2001 From: BInwei Yang Date: Tue, 14 Nov 2023 01:16:01 -0800 Subject: [PATCH] [VL] use velox config lib to extract the configurations from spark (#3694) The PR is to remove the hardcoded check, use velox config lib to get the real config. conf.set('spark.hadoop.fs.s3a.use.instance.credentials',True) ==> conf.set('spark.hadoop.fs.s3a.use.instance.credentials','true') The root cause is that we use got=='true' to check if the flag is set. The PR use velox's config lib to parse the configurations from Spark. True can be set by 'true', 'True', 1, 'ON', 'Y', 'T', etc. limitation: 'KB', 'MB', 'GB' is still not supported, only number is expected --- cpp/velox/compute/VeloxBackend.cc | 209 +++++++++++------------------- cpp/velox/compute/VeloxBackend.h | 9 +- 2 files changed, 79 insertions(+), 139 deletions(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 0921cd8e0977..e6f3629298a3 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -52,51 +52,51 @@ namespace { const std::string kEnableUserExceptionStacktrace = "spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace"; -const std::string kEnableUserExceptionStacktraceDefault = "true"; +const bool kEnableUserExceptionStacktraceDefault = true; const std::string kGlogVerboseLevel = "spark.gluten.sql.columnar.backend.velox.glogVerboseLevel"; -const std::string kGlogVerboseLevelDefault = "0"; +const uint32_t kGlogVerboseLevelDefault = 0; const std::string kGlogSeverityLevel = "spark.gluten.sql.columnar.backend.velox.glogSeverityLevel"; -const std::string kGlogSeverityLevelDefault = "0"; +const uint32_t kGlogSeverityLevelDefault = 0; const std::string kEnableSystemExceptionStacktrace = "spark.gluten.sql.columnar.backend.velox.enableSystemExceptionStacktrace"; -const std::string kEnableSystemExceptionStacktraceDefault = "true"; +const bool kEnableSystemExceptionStacktraceDefault = true; const std::string kMemoryUseHugePages = "spark.gluten.sql.columnar.backend.velox.memoryUseHugePages"; -const std::string kMemoryUseHugePagesDefault = "false"; +const bool kMemoryUseHugePagesDefault = false; const std::string kHiveConnectorId = "test-hive"; const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled"; // memory cache const std::string kVeloxMemCacheSize = "spark.gluten.sql.columnar.backend.velox.memCacheSize"; -const std::string kVeloxMemCacheSizeDefault = "1073741824"; +const uint64_t kVeloxMemCacheSizeDefault = 1073741824; // 1G // ssd cache const std::string kVeloxSsdCacheSize = "spark.gluten.sql.columnar.backend.velox.ssdCacheSize"; -const std::string kVeloxSsdCacheSizeDefault = "1073741824"; +const uint64_t kVeloxSsdCacheSizeDefault = 1073741824; // 1G const std::string kVeloxSsdCachePath = "spark.gluten.sql.columnar.backend.velox.ssdCachePath"; const std::string kVeloxSsdCachePathDefault = "/tmp/"; const std::string kVeloxSsdCacheShards = "spark.gluten.sql.columnar.backend.velox.ssdCacheShards"; -const std::string kVeloxSsdCacheShardsDefault = "1"; +const uint32_t kVeloxSsdCacheShardsDefault = 1; const std::string kVeloxSsdCacheIOThreads = "spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads"; -const std::string kVeloxSsdCacheIOThreadsDefault = "1"; +const uint32_t kVeloxSsdCacheIOThreadsDefault = 1; const std::string kVeloxSsdODirectEnabled = "spark.gluten.sql.columnar.backend.velox.ssdODirect"; const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads"; -const std::string kVeloxIOThreadsDefault = "0"; +const uint32_t kVeloxIOThreadsDefault = 0; const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver"; -const std::string kVeloxSplitPreloadPerDriverDefault = "2"; +const uint32_t kVeloxSplitPreloadPerDriverDefault = 2; // udf const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.udfLibraryPaths"; // spill const std::string kMaxSpillFileSize = "spark.gluten.sql.columnar.backend.velox.maxSpillFileSize"; -const std::string kMaxSpillFileSizeDefault = std::to_string(20L * 1024 * 1024); +const uint64_t kMaxSpillFileSizeDefault = 20L * 1024 * 1024; // backtrace allocation const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation"; @@ -120,79 +120,49 @@ void VeloxBackend::printConf(const std::unordered_map& void VeloxBackend::init(const std::unordered_map& conf) { // Init glog and log level. - { - auto vlogLevel = stoi(getConfigValue(conf, kGlogVerboseLevel, kGlogVerboseLevelDefault)); - auto severityLogLevel = stoi(getConfigValue(conf, kGlogSeverityLevel, kGlogSeverityLevelDefault)); - FLAGS_v = vlogLevel; - FLAGS_minloglevel = severityLogLevel; - FLAGS_logtostderr = true; - google::InitGoogleLogging("gluten"); - } + auto veloxmemcfg = std::make_shared(conf); + const facebook::velox::Config* veloxcfg = veloxmemcfg.get(); + + uint32_t vlogLevel = veloxcfg->get(kGlogVerboseLevel, kGlogVerboseLevelDefault); + uint32_t severityLogLevel = veloxcfg->get(kGlogSeverityLevel, kGlogSeverityLevelDefault); + FLAGS_v = vlogLevel; + FLAGS_minloglevel = severityLogLevel; + FLAGS_logtostderr = true; + google::InitGoogleLogging("gluten"); // Avoid creating too many shared leaf pools. FLAGS_velox_memory_num_shared_leaf_pools = 0; // Set velox_exception_user_stacktrace_enabled. - { - auto got = conf.find(kEnableUserExceptionStacktrace); - std::string enableUserExceptionStacktrace = kEnableUserExceptionStacktraceDefault; - if (got != conf.end()) { - enableUserExceptionStacktrace = got->second; - } - FLAGS_velox_exception_user_stacktrace_enabled = (enableUserExceptionStacktrace == "true"); - } + FLAGS_velox_exception_user_stacktrace_enabled = + veloxcfg->get(kEnableUserExceptionStacktrace, kEnableUserExceptionStacktraceDefault); // Set velox_exception_system_stacktrace_enabled. - { - auto got = conf.find(kEnableSystemExceptionStacktrace); - std::string enableSystemExceptionStacktrace = kEnableSystemExceptionStacktraceDefault; - if (got != conf.end()) { - enableSystemExceptionStacktrace = got->second; - } - FLAGS_velox_exception_system_stacktrace_enabled = (enableSystemExceptionStacktrace == "true"); - } + FLAGS_velox_exception_system_stacktrace_enabled = + veloxcfg->get(kEnableSystemExceptionStacktrace, kEnableSystemExceptionStacktraceDefault); // Set velox_memory_use_hugepages. - { - auto got = conf.find(kMemoryUseHugePages); - std::string memoryUseHugePages = kMemoryUseHugePagesDefault; - if (got != conf.end()) { - memoryUseHugePages = got->second; - } - FLAGS_velox_memory_use_hugepages = (memoryUseHugePages == "true"); - } + FLAGS_velox_memory_use_hugepages = veloxcfg->get(kMemoryUseHugePages, kMemoryUseHugePagesDefault); // Set backtrace_allocation - { - auto got = conf.find(kBacktraceAllocation); - if (got != conf.end()) { - gluten::backtrace_allocation = (got->second == "true"); - } - } + gluten::backtrace_allocation = veloxcfg->get(kBacktraceAllocation, false); // Set veloxShuffleReaderPrintFlag - { - auto got = conf.find(kVeloxShuffleReaderPrintFlag); - if (got != conf.end()) { - gluten::veloxShuffleReaderPrintFlag = (got->second == "true"); - } - } + gluten::veloxShuffleReaderPrintFlag = veloxcfg->get(kVeloxShuffleReaderPrintFlag, false); // Setup and register. velox::filesystems::registerLocalFileSystem(); - initJolFilesystem(conf); - - std::unordered_map configurationValues; + initJolFilesystem(veloxcfg); #ifdef ENABLE_S3 - std::string awsAccessKey = conf.at("spark.hadoop.fs.s3a.access.key"); - std::string awsSecretKey = conf.at("spark.hadoop.fs.s3a.secret.key"); - std::string awsEndpoint = conf.at("spark.hadoop.fs.s3a.endpoint"); - std::string sslEnabled = conf.at("spark.hadoop.fs.s3a.connection.ssl.enabled"); - std::string pathStyleAccess = conf.at("spark.hadoop.fs.s3a.path.style.access"); - std::string useInstanceCredentials = conf.at("spark.hadoop.fs.s3a.use.instance.credentials"); - std::string iamRole = conf.at("spark.hadoop.fs.s3a.iam.role"); - std::string iamRoleSessionName = conf.at("spark.hadoop.fs.s3a.iam.role.session.name"); + std::string awsAccessKey = veloxcfg->get("spark.hadoop.fs.s3a.access.key", ""); + std::string awsSecretKey = veloxcfg->get("spark.hadoop.fs.s3a.secret.key", ""); + std::string awsEndpoint = veloxcfg->get("spark.hadoop.fs.s3a.endpoint", ""); + bool sslEnabled = veloxcfg->get("spark.hadoop.fs.s3a.connection.ssl.enabled", false); + bool pathStyleAccess = veloxcfg->get("spark.hadoop.fs.s3a.path.style.access", false); + bool useInstanceCredentials = veloxcfg->get("spark.hadoop.fs.s3a.use.instance.credentials", false); + std::string iamRole = veloxcfg->get("spark.hadoop.fs.s3a.iam.role", ""); + std::string iamRoleSessionName = veloxcfg->get("spark.hadoop.fs.s3a.iam.role.session.name", ""); const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID"); if (envAwsAccessKey != nullptr) { @@ -208,50 +178,35 @@ void VeloxBackend::init(const std::unordered_map& conf } std::unordered_map s3Config({}); - if (useInstanceCredentials == "true") { - s3Config.insert({ - {"hive.s3.use-instance-credentials", useInstanceCredentials}, - }); + if (useInstanceCredentials) { + veloxmemcfg->setValue("hive.s3.use-instance-credentials", "true"); } else if (!iamRole.empty()) { - s3Config.insert({ - {"hive.s3.iam-role", iamRole}, - }); + veloxmemcfg->setValue("hive.s3.iam-role", iamRole); if (!iamRoleSessionName.empty()) { - s3Config.insert({ - {"hive.s3.iam-role-session-name", iamRoleSessionName}, - }); + veloxmemcfg->setValue("hive.s3.iam-role-session-name", iamRoleSessionName); } } else { - s3Config.insert({ - {"hive.s3.aws-access-key", awsAccessKey}, - {"hive.s3.aws-secret-key", awsSecretKey}, - }); + veloxmemcfg->setValue("hive.s3.aws-access-key", awsAccessKey); + veloxmemcfg->setValue("hive.s3.aws-secret-key", awsSecretKey); } // Only need to set s3 endpoint when not use instance credentials. - if (useInstanceCredentials != "true") { - s3Config.insert({ - {"hive.s3.endpoint", awsEndpoint}, - }); + if (!useInstanceCredentials) { + veloxmemcfg->setValue("hive.s3.endpoint", awsEndpoint); } - s3Config.insert({ - {"hive.s3.ssl.enabled", sslEnabled}, - {"hive.s3.path-style-access", pathStyleAccess}, - }); - - configurationValues.merge(s3Config); + veloxmemcfg->setValue("hive.s3.ssl.enabled", sslEnabled ? "true" : "false"); + veloxmemcfg->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false"); #endif - initCache(conf); - initIOExecutor(conf); + initCache(veloxcfg); + initIOExecutor(veloxcfg); #ifdef GLUTEN_PRINT_DEBUG - printConf(conf); + printConf(veloxcfg); #endif - auto properties = std::make_shared(configurationValues); auto hiveConnector = velox::connector::getConnectorFactory(velox::connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, properties, ioExecutor_.get()); + ->newConnector(kHiveConnectorId, veloxmemcfg, ioExecutor_.get()); registerConnector(hiveConnector); @@ -263,7 +218,7 @@ void VeloxBackend::init(const std::unordered_map& conf } velox::exec::Operator::registerOperator(std::make_unique()); - initUdf(conf); + initUdf(veloxcfg); } facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { @@ -271,43 +226,28 @@ facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const } // JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces -void VeloxBackend::initJolFilesystem(const std::unordered_map& conf) { - int64_t maxSpillFileSize = std::stol(kMaxSpillFileSizeDefault); - auto got = conf.find(kMaxSpillFileSize); - if (got != conf.end()) { - maxSpillFileSize = std::stol(got->second); - } +void VeloxBackend::initJolFilesystem(const facebook::velox::Config* conf) { + int64_t maxSpillFileSize = conf->get(kMaxSpillFileSize, kMaxSpillFileSizeDefault); + // FIXME It's known that if spill compression is disabled, the actual spill file size may // in crease beyond this limit a little (maximum 64 rows which is by default // one compression page) gluten::registerJolFileSystem(maxSpillFileSize); } -void VeloxBackend::initCache(const std::unordered_map& conf) { - auto key = conf.find(kVeloxCacheEnabled); - if (key != conf.end() && boost::algorithm::to_lower_copy(conf.at(kVeloxCacheEnabled)) == "true") { +void VeloxBackend::initCache(const facebook::velox::Config* conf) { + bool veloxCacheEnabled = conf->get(kVeloxCacheEnabled, false); + if (veloxCacheEnabled) { FLAGS_ssd_odirect = true; - if (conf.find(kVeloxSsdODirectEnabled) != conf.end() && - boost::algorithm::to_lower_copy(conf.at(kVeloxSsdODirectEnabled)) == "false") { - FLAGS_ssd_odirect = false; - } - uint64_t memCacheSize = std::stol(kVeloxMemCacheSizeDefault); - uint64_t ssdCacheSize = std::stol(kVeloxSsdCacheSizeDefault); - int32_t ssdCacheShards = std::stoi(kVeloxSsdCacheShardsDefault); - int32_t ssdCacheIOThreads = std::stoi(kVeloxSsdCacheIOThreadsDefault); - std::string ssdCachePathPrefix = kVeloxSsdCachePathDefault; - for (auto& [k, v] : conf) { - if (k == kVeloxMemCacheSize) - memCacheSize = std::stol(v); - if (k == kVeloxSsdCacheSize) - ssdCacheSize = std::stol(v); - if (k == kVeloxSsdCacheShards) - ssdCacheShards = std::stoi(v); - if (k == kVeloxSsdCachePath) - ssdCachePathPrefix = v; - if (k == kVeloxSsdCacheIOThreads) - ssdCacheIOThreads = std::stoi(v); - } + + FLAGS_ssd_odirect = conf->get(kVeloxSsdODirectEnabled, false); + + uint64_t memCacheSize = conf->get(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault); + uint64_t ssdCacheSize = conf->get(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault); + int32_t ssdCacheShards = conf->get(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault); + int32_t ssdCacheIOThreads = conf->get(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault); + std::string ssdCachePathPrefix = conf->get(kVeloxSsdCachePath, kVeloxSsdCachePathDefault); + cachePathPrefix_ = ssdCachePathPrefix; cacheFilePrefix_ = getCacheFilePrefix(); std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_; @@ -342,10 +282,9 @@ void VeloxBackend::initCache(const std::unordered_map& } } -void VeloxBackend::initIOExecutor(const std::unordered_map& conf) { - int32_t ioThreads = std::stoi(getConfigValue(conf, kVeloxIOThreads, kVeloxIOThreadsDefault)); - int32_t splitPreloadPerDriver = - std::stoi(getConfigValue(conf, kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault)); +void VeloxBackend::initIOExecutor(const facebook::velox::Config* conf) { + int32_t ioThreads = conf->get(kVeloxIOThreads, kVeloxIOThreadsDefault); + int32_t splitPreloadPerDriver = conf->get(kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault); if (ioThreads > 0) { ioExecutor_ = std::make_unique(ioThreads); FLAGS_split_preload_per_driver = splitPreloadPerDriver; @@ -357,11 +296,11 @@ void VeloxBackend::initIOExecutor(const std::unordered_map& conf) { - auto got = conf.find(kVeloxUdfLibraryPaths); - if (got != conf.end() && !got->second.empty()) { +void VeloxBackend::initUdf(const facebook::velox::Config* conf) { + auto got = conf->get(kVeloxUdfLibraryPaths, ""); + if (!got.empty()) { auto udfLoader = gluten::UdfLoader::getInstance(); - udfLoader->loadUdfLibraries(got->second); + udfLoader->loadUdfLibraries(got); udfLoader->registerUdf(); } } diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 1c85b953c460..64709463da61 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -26,6 +26,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/memory/MemoryPool.h" +#include "velox/core/Config.h" namespace gluten { /// As a static instance in per executor, initialized at executor startup. @@ -56,11 +57,11 @@ class VeloxBackend { } void init(const std::unordered_map& conf); - void initCache(const std::unordered_map& conf); - void initIOExecutor(const std::unordered_map& conf); - void initUdf(const std::unordered_map& conf); + void initCache(const facebook::velox::Config* conf); + void initIOExecutor(const facebook::velox::Config* conf); + void initUdf(const facebook::velox::Config* conf); - void initJolFilesystem(const std::unordered_map& conf); + void initJolFilesystem(const facebook::velox::Config* conf); void printConf(const std::unordered_map& conf);