Skip to content

Commit

Permalink
[GLUTEN-3364][VL] Fix: SplitPreloadPerDriver (SplitPreloadPerDriver =…
Browse files Browse the repository at this point in the history
… true, IOThreads > 0) causes unexpected crashes (#3419)

Switching to global memory pool doesn't help since the async executor holds task's memory pool anyway. Added a wait strategy (up to 1.5s) until async allocations are released. Some minor cleanups to connector initialization procedure.

This fixes issue #3364.
  • Loading branch information
zhztheplayer authored Nov 24, 2023
1 parent e29a44e commit 0088054
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 184 deletions.
224 changes: 109 additions & 115 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
#include "velox/common/memory/MmapAllocator.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
#include "velox/serializers/PrestoSerializer.h"

DECLARE_int32(split_preload_per_driver);
Expand Down Expand Up @@ -122,6 +125,10 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
auto veloxmemcfg = std::make_shared<facebook::velox::core::MemConfigMutable>(conf);
const facebook::velox::Config* veloxcfg = veloxmemcfg.get();

if (veloxcfg->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(veloxcfg->valuesCopy());
}

uint32_t vlogLevel = veloxcfg->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
uint32_t severityLogLevel = veloxcfg->get<uint32_t>(kGlogSeverityLevel, kGlogSeverityLevelDefault);
FLAGS_v = vlogLevel;
Expand Down Expand Up @@ -152,111 +159,8 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem(veloxcfg);

#ifdef ENABLE_S3
std::string awsAccessKey = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = veloxcfg->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = veloxcfg->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = veloxcfg->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = veloxcfg->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}

std::unordered_map<std::string, std::string> s3Config({});
if (useInstanceCredentials) {
veloxmemcfg->setValue("hive.s3.use-instance-credentials", "true");
} else if (!iamRole.empty()) {
veloxmemcfg->setValue("hive.s3.iam-role", iamRole);
if (!iamRoleSessionName.empty()) {
veloxmemcfg->setValue("hive.s3.iam-role-session-name", iamRoleSessionName);
}
} else {
veloxmemcfg->setValue("hive.s3.aws-access-key", awsAccessKey);
veloxmemcfg->setValue("hive.s3.aws-secret-key", awsSecretKey);
}
// Only need to set s3 endpoint when not use instance credentials.
if (!useInstanceCredentials) {
veloxmemcfg->setValue("hive.s3.endpoint", awsEndpoint);
}
veloxmemcfg->setValue("hive.s3.ssl.enabled", sslEnabled ? "true" : "false");
veloxmemcfg->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false");
#endif
#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
std::string gsStorageRootUrl;
if (auto got = conf.find("spark.hadoop.fs.gs.storage.root.url"); got != conf.end()) {
gsStorageRootUrl = got->second;
}
if (!gsStorageRootUrl.empty()) {
std::string gcsScheme;
std::string gcsEndpoint;

const auto sep = std::string("://");
const auto pos = gsStorageRootUrl.find_first_of(sep);
if (pos != std::string::npos) {
gcsScheme = gsStorageRootUrl.substr(0, pos);
gcsEndpoint = gsStorageRootUrl.substr(pos + sep.length());
}

if (!gcsEndpoint.empty() && !gcsScheme.empty()) {
veloxmemcfg->setValue("hive.gcs.scheme", gcsScheme);
veloxmemcfg->setValue("hive.gcs.endpoint", gcsEndpoint);
}
}

// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
std::string gsAuthType;
if (auto got = conf.find("spark.hadoop.fs.gs.auth.type"); got != conf.end()) {
gsAuthType = got->second;
}
if (gsAuthType == "SERVICE_ACCOUNT_JSON_KEYFILE") {
std::string gsAuthServiceAccountJsonKeyfile;
if (auto got = conf.find("spark.hadoop.fs.gs.auth.service.account.json.keyfile"); got != conf.end()) {
gsAuthServiceAccountJsonKeyfile = got->second;
}

std::string gsAuthServiceAccountJson;
if (!gsAuthServiceAccountJsonKeyfile.empty()) {
auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile);
stream.exceptions(std::ios::badbit);
gsAuthServiceAccountJson = std::string(std::istreambuf_iterator<char>(stream.rdbuf()), {});
} else {
LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, "
"however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set";
throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set");
}

if (!gsAuthServiceAccountJson.empty()) {
veloxmemcfg->setValue("hive.gcs.credentials", gsAuthServiceAccountJson);
}
}
#endif

initCache(veloxcfg);
initIOExecutor(veloxcfg);

veloxmemcfg->setValue(
velox::connector::hive::HiveConfig::kEnableFileHandleCache,
veloxcfg->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false");
auto hiveConnector =
velox::connector::getConnectorFactory(velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(kHiveConnectorId, veloxmemcfg, ioExecutor_.get());

registerConnector(hiveConnector);
initConnector(veloxcfg);

// Register Velox functions
registerAllFunctions();
Expand All @@ -267,10 +171,6 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());

initUdf(veloxcfg);

if (veloxcfg->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(veloxcfg->valuesCopy());
}
registerSparkTokenizer();
}

Expand Down Expand Up @@ -335,18 +235,112 @@ void VeloxBackend::initCache(const facebook::velox::Config* conf) {
}
}

void VeloxBackend::initIOExecutor(const facebook::velox::Config* conf) {
void VeloxBackend::initConnector(const facebook::velox::Config* conf) {
int32_t ioThreads = conf->get<int32_t>(kVeloxIOThreads, kVeloxIOThreadsDefault);
int32_t splitPreloadPerDriver = conf->get<int32_t>(kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault);
if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
FLAGS_split_preload_per_driver = splitPreloadPerDriver;

auto mutableConf = std::make_shared<facebook::velox::core::MemConfigMutable>(conf->valuesCopy());

#ifdef ENABLE_S3
std::string awsAccessKey = conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}

if (splitPreloadPerDriver > 0 && ioThreads > 0) {
LOG(INFO) << "STARTUP: Using split preloading, Split preload per driver: " << splitPreloadPerDriver
<< ", IO threads: " << ioThreads;
std::unordered_map<std::string, std::string> s3Config({});
if (useInstanceCredentials) {
mutableConf->setValue("hive.s3.use-instance-credentials", "true");
} else if (!iamRole.empty()) {
mutableConf->setValue("hive.s3.iam-role", iamRole);
if (!iamRoleSessionName.empty()) {
mutableConf->setValue("hive.s3.iam-role-session-name", iamRoleSessionName);
}
} else {
mutableConf->setValue("hive.s3.aws-access-key", awsAccessKey);
mutableConf->setValue("hive.s3.aws-secret-key", awsSecretKey);
}
// Only need to set s3 endpoint when not use instance credentials.
if (!useInstanceCredentials) {
mutableConf->setValue("hive.s3.endpoint", awsEndpoint);
}
mutableConf->setValue("hive.s3.ssl.enabled", sslEnabled ? "true" : "false");
mutableConf->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false");
#endif

#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
auto gsStorageRootUrl = conf->get("spark.hadoop.fs.gs.storage.root.url");
if (gsStorageRootUrl.hasValue()) {
std::string url = gsStorageRootUrl.value();
std::string gcsScheme;
std::string gcsEndpoint;

const auto sep = std::string("://");
const auto pos = url.find_first_of(sep);
if (pos != std::string::npos) {
gcsScheme = url.substr(0, pos);
gcsEndpoint = url.substr(pos + sep.length());
}

if (!gcsEndpoint.empty() && !gcsScheme.empty()) {
mutableConf->setValue("hive.gcs.scheme", gcsScheme);
mutableConf->setValue("hive.gcs.endpoint", gcsEndpoint);
}
}

// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
auto gsAuthType = conf->get("spark.hadoop.fs.gs.auth.type");
if (gsAuthType.hasValue()) {
std::string type = gsAuthType.value();
if (type == "SERVICE_ACCOUNT_JSON_KEYFILE") {
auto gsAuthServiceAccountJsonKeyfile = conf->get("spark.hadoop.fs.gs.auth.service.account.json.keyfile");
if (gsAuthServiceAccountJsonKeyfile.hasValue()) {
auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile.value());
stream.exceptions(std::ios::badbit);
std::string gsAuthServiceAccountJson = std::string(std::istreambuf_iterator<char>(stream.rdbuf()), {});
mutableConf->setValue("hive.gcs.credentials", gsAuthServiceAccountJson);
} else {
LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, "
"however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set";
throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set");
}
}
}
#endif

mutableConf->setValue(
velox::connector::hive::HiveConfig::kEnableFileHandleCache,
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false");

if (ioThreads > 0) {
if (splitPreloadPerDriver > 0) {
// spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no effect if
// spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0
LOG(INFO) << "STARTUP: Using split preloading, Split preload per driver: " << splitPreloadPerDriver
<< ", IO threads: " << ioThreads;
FLAGS_split_preload_per_driver = splitPreloadPerDriver;
}
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, mutableConf, ioExecutor_.get()));
}

void VeloxBackend::initUdf(const facebook::velox::Config* conf) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class VeloxBackend {

void init(const std::unordered_map<std::string, std::string>& conf);
void initCache(const facebook::velox::Config* conf);
void initIOExecutor(const facebook::velox::Config* conf);
void initConnector(const facebook::velox::Config* conf);
void initUdf(const facebook::velox::Config* conf);

void initJolFilesystem(const facebook::velox::Config* conf);
Expand Down
49 changes: 49 additions & 0 deletions cpp/velox/memory/VeloxMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,55 @@ void VeloxMemoryManager::hold() {
holdInternal(heldVeloxPools_, veloxAggregatePool_.get());
}

bool VeloxMemoryManager::tryDestructSafe() {
// Velox memory pools considered safe to destruct when no alive allocations.
for (const auto& pool : heldVeloxPools_) {
if (pool && pool->currentBytes() != 0) {
return false;
}
}
if (veloxLeafPool_ && veloxLeafPool_->currentBytes() != 0) {
return false;
}
if (veloxAggregatePool_ && veloxAggregatePool_->currentBytes() != 0) {
return false;
}
heldVeloxPools_.clear();
veloxLeafPool_.reset();
veloxAggregatePool_.reset();

// Velox memory manager considered safe to destruct when no alive pools.
if (veloxMemoryManager_ && veloxMemoryManager_->numPools() != 0) {
return false;
}
veloxMemoryManager_.reset();

// Applies similar rule for Arrow memory pool.
if (arrowPool_ && arrowPool_->bytes_allocated() != 0) {
return false;
}
arrowPool_.reset();

// Successfully destructed.
return true;
}

VeloxMemoryManager::~VeloxMemoryManager() {
// Wait (50 + 100 + 200 + 400 + 800)ms = 1550ms to let possible async tasks (e.g. preload split) complete.
for (int32_t tryCount = 0; tryCount < 5; tryCount++) {
if (tryDestructSafe()) {
if (tryCount > 0) {
LOG(INFO) << "All the outstanding memory resources successfully released. ";
}
break;
}
uint32_t waitMs = 50 * static_cast<uint32_t>(pow(2, tryCount));
LOG(INFO) << "There are still outstanding Velox memory allocations. Waiting for " << waitMs
<< " ms to let possible async tasks done... ";
usleep(waitMs * 1000);
}
}

velox::memory::MemoryManager* getDefaultVeloxMemoryManager() {
return &(facebook::velox::memory::defaultMemoryManager());
}
Expand Down
5 changes: 4 additions & 1 deletion cpp/velox/memory/VeloxMemoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ namespace gluten {

class VeloxMemoryManager final : public MemoryManager {
public:
explicit VeloxMemoryManager(
VeloxMemoryManager(
const std::string& name,
std::shared_ptr<MemoryAllocator> allocator,
std::unique_ptr<AllocationListener> listener);

~VeloxMemoryManager() override;
VeloxMemoryManager(const VeloxMemoryManager&) = delete;
VeloxMemoryManager(VeloxMemoryManager&&) = delete;
VeloxMemoryManager& operator=(const VeloxMemoryManager&) = delete;
Expand Down Expand Up @@ -60,6 +61,8 @@ class VeloxMemoryManager final : public MemoryManager {
void hold() override;

private:
bool tryDestructSafe();

std::string name_;

#ifdef GLUTEN_ENABLE_HBM
Expand Down
Loading

0 comments on commit 0088054

Please sign in to comment.