Skip to content

Commit

Permalink
[VL] use velox config lib to extract the configurations from spark (a…
Browse files Browse the repository at this point in the history
…pache#3694)

The PR is to remove the hardcoded check, use velox config lib to get the real config.

conf.set('spark.hadoop.fs.s3a.use.instance.credentials',True)
==>
conf.set('spark.hadoop.fs.s3a.use.instance.credentials','true')

The root cause is that we use got=='true' to check if the flag is set.

The PR use velox's config lib to parse the configurations from Spark.

True can be set by 'true', 'True', 1, 'ON', 'Y', 'T', etc.

limitation: 'KB', 'MB', 'GB' is still not supported, only number is expected
  • Loading branch information
FelixYBW authored Nov 14, 2023
1 parent 6c38a76 commit bc25d07
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 139 deletions.
209 changes: 74 additions & 135 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,51 +52,51 @@ namespace {

const std::string kEnableUserExceptionStacktrace =
"spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace";
const std::string kEnableUserExceptionStacktraceDefault = "true";
const bool kEnableUserExceptionStacktraceDefault = true;

const std::string kGlogVerboseLevel = "spark.gluten.sql.columnar.backend.velox.glogVerboseLevel";
const std::string kGlogVerboseLevelDefault = "0";
const uint32_t kGlogVerboseLevelDefault = 0;

const std::string kGlogSeverityLevel = "spark.gluten.sql.columnar.backend.velox.glogSeverityLevel";
const std::string kGlogSeverityLevelDefault = "0";
const uint32_t kGlogSeverityLevelDefault = 0;

const std::string kEnableSystemExceptionStacktrace =
"spark.gluten.sql.columnar.backend.velox.enableSystemExceptionStacktrace";
const std::string kEnableSystemExceptionStacktraceDefault = "true";
const bool kEnableSystemExceptionStacktraceDefault = true;

const std::string kMemoryUseHugePages = "spark.gluten.sql.columnar.backend.velox.memoryUseHugePages";
const std::string kMemoryUseHugePagesDefault = "false";
const bool kMemoryUseHugePagesDefault = false;

const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";

// memory cache
const std::string kVeloxMemCacheSize = "spark.gluten.sql.columnar.backend.velox.memCacheSize";
const std::string kVeloxMemCacheSizeDefault = "1073741824";
const uint64_t kVeloxMemCacheSizeDefault = 1073741824; // 1G

// ssd cache
const std::string kVeloxSsdCacheSize = "spark.gluten.sql.columnar.backend.velox.ssdCacheSize";
const std::string kVeloxSsdCacheSizeDefault = "1073741824";
const uint64_t kVeloxSsdCacheSizeDefault = 1073741824; // 1G
const std::string kVeloxSsdCachePath = "spark.gluten.sql.columnar.backend.velox.ssdCachePath";
const std::string kVeloxSsdCachePathDefault = "/tmp/";
const std::string kVeloxSsdCacheShards = "spark.gluten.sql.columnar.backend.velox.ssdCacheShards";
const std::string kVeloxSsdCacheShardsDefault = "1";
const uint32_t kVeloxSsdCacheShardsDefault = 1;
const std::string kVeloxSsdCacheIOThreads = "spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads";
const std::string kVeloxSsdCacheIOThreadsDefault = "1";
const uint32_t kVeloxSsdCacheIOThreadsDefault = 1;
const std::string kVeloxSsdODirectEnabled = "spark.gluten.sql.columnar.backend.velox.ssdODirect";

const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads";
const std::string kVeloxIOThreadsDefault = "0";
const uint32_t kVeloxIOThreadsDefault = 0;

const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver";
const std::string kVeloxSplitPreloadPerDriverDefault = "2";
const uint32_t kVeloxSplitPreloadPerDriverDefault = 2;

// udf
const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.udfLibraryPaths";

// spill
const std::string kMaxSpillFileSize = "spark.gluten.sql.columnar.backend.velox.maxSpillFileSize";
const std::string kMaxSpillFileSizeDefault = std::to_string(20L * 1024 * 1024);
const uint64_t kMaxSpillFileSizeDefault = 20L * 1024 * 1024;

// backtrace allocation
const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";
Expand All @@ -120,79 +120,49 @@ void VeloxBackend::printConf(const std::unordered_map<std::string, std::string>&

void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf) {
// Init glog and log level.
{
auto vlogLevel = stoi(getConfigValue(conf, kGlogVerboseLevel, kGlogVerboseLevelDefault));
auto severityLogLevel = stoi(getConfigValue(conf, kGlogSeverityLevel, kGlogSeverityLevelDefault));
FLAGS_v = vlogLevel;
FLAGS_minloglevel = severityLogLevel;
FLAGS_logtostderr = true;
google::InitGoogleLogging("gluten");
}
auto veloxmemcfg = std::make_shared<facebook::velox::core::MemConfigMutable>(conf);
const facebook::velox::Config* veloxcfg = veloxmemcfg.get();

uint32_t vlogLevel = veloxcfg->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
uint32_t severityLogLevel = veloxcfg->get<uint32_t>(kGlogSeverityLevel, kGlogSeverityLevelDefault);
FLAGS_v = vlogLevel;
FLAGS_minloglevel = severityLogLevel;
FLAGS_logtostderr = true;
google::InitGoogleLogging("gluten");

// Avoid creating too many shared leaf pools.
FLAGS_velox_memory_num_shared_leaf_pools = 0;

// Set velox_exception_user_stacktrace_enabled.
{
auto got = conf.find(kEnableUserExceptionStacktrace);
std::string enableUserExceptionStacktrace = kEnableUserExceptionStacktraceDefault;
if (got != conf.end()) {
enableUserExceptionStacktrace = got->second;
}
FLAGS_velox_exception_user_stacktrace_enabled = (enableUserExceptionStacktrace == "true");
}
FLAGS_velox_exception_user_stacktrace_enabled =
veloxcfg->get<bool>(kEnableUserExceptionStacktrace, kEnableUserExceptionStacktraceDefault);

// Set velox_exception_system_stacktrace_enabled.
{
auto got = conf.find(kEnableSystemExceptionStacktrace);
std::string enableSystemExceptionStacktrace = kEnableSystemExceptionStacktraceDefault;
if (got != conf.end()) {
enableSystemExceptionStacktrace = got->second;
}
FLAGS_velox_exception_system_stacktrace_enabled = (enableSystemExceptionStacktrace == "true");
}
FLAGS_velox_exception_system_stacktrace_enabled =
veloxcfg->get<bool>(kEnableSystemExceptionStacktrace, kEnableSystemExceptionStacktraceDefault);

// Set velox_memory_use_hugepages.
{
auto got = conf.find(kMemoryUseHugePages);
std::string memoryUseHugePages = kMemoryUseHugePagesDefault;
if (got != conf.end()) {
memoryUseHugePages = got->second;
}
FLAGS_velox_memory_use_hugepages = (memoryUseHugePages == "true");
}
FLAGS_velox_memory_use_hugepages = veloxcfg->get<bool>(kMemoryUseHugePages, kMemoryUseHugePagesDefault);

// Set backtrace_allocation
{
auto got = conf.find(kBacktraceAllocation);
if (got != conf.end()) {
gluten::backtrace_allocation = (got->second == "true");
}
}
gluten::backtrace_allocation = veloxcfg->get<bool>(kBacktraceAllocation, false);

// Set veloxShuffleReaderPrintFlag
{
auto got = conf.find(kVeloxShuffleReaderPrintFlag);
if (got != conf.end()) {
gluten::veloxShuffleReaderPrintFlag = (got->second == "true");
}
}
gluten::veloxShuffleReaderPrintFlag = veloxcfg->get<bool>(kVeloxShuffleReaderPrintFlag, false);

// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem(conf);

std::unordered_map<std::string, std::string> configurationValues;
initJolFilesystem(veloxcfg);

#ifdef ENABLE_S3
std::string awsAccessKey = conf.at("spark.hadoop.fs.s3a.access.key");
std::string awsSecretKey = conf.at("spark.hadoop.fs.s3a.secret.key");
std::string awsEndpoint = conf.at("spark.hadoop.fs.s3a.endpoint");
std::string sslEnabled = conf.at("spark.hadoop.fs.s3a.connection.ssl.enabled");
std::string pathStyleAccess = conf.at("spark.hadoop.fs.s3a.path.style.access");
std::string useInstanceCredentials = conf.at("spark.hadoop.fs.s3a.use.instance.credentials");
std::string iamRole = conf.at("spark.hadoop.fs.s3a.iam.role");
std::string iamRoleSessionName = conf.at("spark.hadoop.fs.s3a.iam.role.session.name");
std::string awsAccessKey = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = veloxcfg->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = veloxcfg->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = veloxcfg->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
Expand All @@ -208,50 +178,35 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
}

std::unordered_map<std::string, std::string> s3Config({});
if (useInstanceCredentials == "true") {
s3Config.insert({
{"hive.s3.use-instance-credentials", useInstanceCredentials},
});
if (useInstanceCredentials) {
veloxmemcfg->setValue("hive.s3.use-instance-credentials", "true");
} else if (!iamRole.empty()) {
s3Config.insert({
{"hive.s3.iam-role", iamRole},
});
veloxmemcfg->setValue("hive.s3.iam-role", iamRole);
if (!iamRoleSessionName.empty()) {
s3Config.insert({
{"hive.s3.iam-role-session-name", iamRoleSessionName},
});
veloxmemcfg->setValue("hive.s3.iam-role-session-name", iamRoleSessionName);
}
} else {
s3Config.insert({
{"hive.s3.aws-access-key", awsAccessKey},
{"hive.s3.aws-secret-key", awsSecretKey},
});
veloxmemcfg->setValue("hive.s3.aws-access-key", awsAccessKey);
veloxmemcfg->setValue("hive.s3.aws-secret-key", awsSecretKey);
}
// Only need to set s3 endpoint when not use instance credentials.
if (useInstanceCredentials != "true") {
s3Config.insert({
{"hive.s3.endpoint", awsEndpoint},
});
if (!useInstanceCredentials) {
veloxmemcfg->setValue("hive.s3.endpoint", awsEndpoint);
}
s3Config.insert({
{"hive.s3.ssl.enabled", sslEnabled},
{"hive.s3.path-style-access", pathStyleAccess},
});

configurationValues.merge(s3Config);
veloxmemcfg->setValue("hive.s3.ssl.enabled", sslEnabled ? "true" : "false");
veloxmemcfg->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false");
#endif

initCache(conf);
initIOExecutor(conf);
initCache(veloxcfg);
initIOExecutor(veloxcfg);

#ifdef GLUTEN_PRINT_DEBUG
printConf(conf);
printConf(veloxcfg);
#endif

auto properties = std::make_shared<const velox::core::MemConfig>(configurationValues);
auto hiveConnector =
velox::connector::getConnectorFactory(velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(kHiveConnectorId, properties, ioExecutor_.get());
->newConnector(kHiveConnectorId, veloxmemcfg, ioExecutor_.get());

registerConnector(hiveConnector);

Expand All @@ -263,51 +218,36 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
}
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());

initUdf(conf);
initUdf(veloxcfg);
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
return asyncDataCache_.get();
}

// JNI-or-local filesystem, for spilling-to-heap if we have extra JVM heap spaces
void VeloxBackend::initJolFilesystem(const std::unordered_map<std::string, std::string>& conf) {
int64_t maxSpillFileSize = std::stol(kMaxSpillFileSizeDefault);
auto got = conf.find(kMaxSpillFileSize);
if (got != conf.end()) {
maxSpillFileSize = std::stol(got->second);
}
void VeloxBackend::initJolFilesystem(const facebook::velox::Config* conf) {
int64_t maxSpillFileSize = conf->get<int64_t>(kMaxSpillFileSize, kMaxSpillFileSizeDefault);

// FIXME It's known that if spill compression is disabled, the actual spill file size may
// in crease beyond this limit a little (maximum 64 rows which is by default
// one compression page)
gluten::registerJolFileSystem(maxSpillFileSize);
}

void VeloxBackend::initCache(const std::unordered_map<std::string, std::string>& conf) {
auto key = conf.find(kVeloxCacheEnabled);
if (key != conf.end() && boost::algorithm::to_lower_copy(conf.at(kVeloxCacheEnabled)) == "true") {
void VeloxBackend::initCache(const facebook::velox::Config* conf) {
bool veloxCacheEnabled = conf->get<bool>(kVeloxCacheEnabled, false);
if (veloxCacheEnabled) {
FLAGS_ssd_odirect = true;
if (conf.find(kVeloxSsdODirectEnabled) != conf.end() &&
boost::algorithm::to_lower_copy(conf.at(kVeloxSsdODirectEnabled)) == "false") {
FLAGS_ssd_odirect = false;
}
uint64_t memCacheSize = std::stol(kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = std::stol(kVeloxSsdCacheSizeDefault);
int32_t ssdCacheShards = std::stoi(kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = std::stoi(kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = kVeloxSsdCachePathDefault;
for (auto& [k, v] : conf) {
if (k == kVeloxMemCacheSize)
memCacheSize = std::stol(v);
if (k == kVeloxSsdCacheSize)
ssdCacheSize = std::stol(v);
if (k == kVeloxSsdCacheShards)
ssdCacheShards = std::stoi(v);
if (k == kVeloxSsdCachePath)
ssdCachePathPrefix = v;
if (k == kVeloxSsdCacheIOThreads)
ssdCacheIOThreads = std::stoi(v);
}

FLAGS_ssd_odirect = conf->get<bool>(kVeloxSsdODirectEnabled, false);

uint64_t memCacheSize = conf->get<uint64_t>(kVeloxMemCacheSize, kVeloxMemCacheSizeDefault);
uint64_t ssdCacheSize = conf->get<uint64_t>(kVeloxSsdCacheSize, kVeloxSsdCacheSizeDefault);
int32_t ssdCacheShards = conf->get<int32_t>(kVeloxSsdCacheShards, kVeloxSsdCacheShardsDefault);
int32_t ssdCacheIOThreads = conf->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = conf->get<std::string>(kVeloxSsdCachePath, kVeloxSsdCachePathDefault);

cachePathPrefix_ = ssdCachePathPrefix;
cacheFilePrefix_ = getCacheFilePrefix();
std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_;
Expand Down Expand Up @@ -342,10 +282,9 @@ void VeloxBackend::initCache(const std::unordered_map<std::string, std::string>&
}
}

void VeloxBackend::initIOExecutor(const std::unordered_map<std::string, std::string>& conf) {
int32_t ioThreads = std::stoi(getConfigValue(conf, kVeloxIOThreads, kVeloxIOThreadsDefault));
int32_t splitPreloadPerDriver =
std::stoi(getConfigValue(conf, kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault));
void VeloxBackend::initIOExecutor(const facebook::velox::Config* conf) {
int32_t ioThreads = conf->get<int32_t>(kVeloxIOThreads, kVeloxIOThreadsDefault);
int32_t splitPreloadPerDriver = conf->get<int32_t>(kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault);
if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
FLAGS_split_preload_per_driver = splitPreloadPerDriver;
Expand All @@ -357,11 +296,11 @@ void VeloxBackend::initIOExecutor(const std::unordered_map<std::string, std::str
}
}

void VeloxBackend::initUdf(const std::unordered_map<std::string, std::string>& conf) {
auto got = conf.find(kVeloxUdfLibraryPaths);
if (got != conf.end() && !got->second.empty()) {
void VeloxBackend::initUdf(const facebook::velox::Config* conf) {
auto got = conf->get<std::string>(kVeloxUdfLibraryPaths, "");
if (!got.empty()) {
auto udfLoader = gluten::UdfLoader::getInstance();
udfLoader->loadUdfLibraries(got->second);
udfLoader->loadUdfLibraries(got);
udfLoader->registerUdf();
}
}
Expand Down
9 changes: 5 additions & 4 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/core/Config.h"

namespace gluten {
/// As a static instance in per executor, initialized at executor startup.
Expand Down Expand Up @@ -56,11 +57,11 @@ class VeloxBackend {
}

void init(const std::unordered_map<std::string, std::string>& conf);
void initCache(const std::unordered_map<std::string, std::string>& conf);
void initIOExecutor(const std::unordered_map<std::string, std::string>& conf);
void initUdf(const std::unordered_map<std::string, std::string>& conf);
void initCache(const facebook::velox::Config* conf);
void initIOExecutor(const facebook::velox::Config* conf);
void initUdf(const facebook::velox::Config* conf);

void initJolFilesystem(const std::unordered_map<std::string, std::string>& conf);
void initJolFilesystem(const facebook::velox::Config* conf);

void printConf(const std::unordered_map<std::string, std::string>& conf);

Expand Down

0 comments on commit bc25d07

Please sign in to comment.