Skip to content

Commit

Permalink
Fix velox config refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Aug 15, 2024
1 parent b4ab1fc commit 0729874
Show file tree
Hide file tree
Showing 15 changed files with 59 additions and 47 deletions.
24 changes: 7 additions & 17 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,22 @@ gluten::Runtime* veloxRuntimeFactory(
} // namespace

void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf) {
backendConf_ = std::make_shared<facebook::velox::core::MemConfig>(conf);
backendConf_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(conf));

// Register Velox runtime factory
gluten::Runtime::registerFactory(gluten::kVeloxRuntimeKind, veloxRuntimeFactory);

if (backendConf_->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->values());
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs());
}

// Init glog and log level.
if (!backendConf_->get<bool>(kDebugModeEnabled, false)) {
FLAGS_v = backendConf_->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
FLAGS_minloglevel = backendConf_->get<uint32_t>(kGlogSeverityLevel, kGlogSeverityLevelDefault);
} else {
if (backendConf_->isValueExists(kGlogVerboseLevel)) {
if (backendConf_->valueExists(kGlogVerboseLevel)) {
FLAGS_v = backendConf_->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
} else {
FLAGS_v = kGlogVerboseLevelMaximum;
Expand Down Expand Up @@ -187,24 +188,13 @@ void VeloxBackend::initCache() {

void VeloxBackend::initConnector() {
// The configs below are used at process level.
std::unordered_map<std::string, std::string> connectorConfMap = backendConf_->values();
std::unordered_map<std::string, std::string> connectorConfMap = backendConf_->rawConfigsCopy();

auto hiveConf = getHiveConfig(backendConf_);
for (auto& [k, v] : hiveConf->valuesCopy()) {
for (auto& [k, v] : hiveConf->rawConfigsCopy()) {
connectorConfMap[k] = v;
}

#ifdef ENABLE_ABFS
const auto& confValue = backendConf_->values();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
connectorConfMap[k] = v;
} else if (k.find("spark.hadoop.fs.azure.account.key") == 0) {
constexpr int32_t accountKeyPrefixLength = 13;
connectorConfMap[k.substr(accountKeyPrefixLength)] = v;
}
}
#endif
connectorConfMap[velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
backendConf_->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

Expand Down Expand Up @@ -233,7 +223,7 @@ void VeloxBackend::initConnector() {
}
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
kHiveConnectorId,
std::make_shared<facebook::velox::core::MemConfig>(std::move(connectorConfMap)),
std::make_shared<facebook::velox::config::ConfigBase>(std::move(connectorConfMap)),
ioExecutor_.get()));
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#include <filesystem>

#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/config/Config.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/core/Config.h"

namespace gluten {
/// As a static instance in per executor, initialized at executor startup.
Expand All @@ -53,7 +53,7 @@ class VeloxBackend {

facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const;

std::shared_ptr<facebook::velox::Config> getBackendConf() const {
std::shared_ptr<facebook::velox::config::ConfigBase> getBackendConf() const {
return backendConf_;
}

Expand Down Expand Up @@ -92,7 +92,7 @@ class VeloxBackend {
std::string cachePathPrefix_;
std::string cacheFilePrefix_;

std::shared_ptr<facebook::velox::Config> backendConf_;
std::shared_ptr<facebook::velox::config::ConfigBase> backendConf_;
};

} // namespace gluten
5 changes: 3 additions & 2 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ VeloxRuntime::VeloxRuntime(
: Runtime(std::make_shared<VeloxMemoryManager>(std::move(listener)), confMap) {
// Refresh session config.
vmm_ = dynamic_cast<VeloxMemoryManager*>(memoryManager_.get());
veloxCfg_ = std::make_shared<facebook::velox::core::MemConfig>(confMap_);
veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_));
debugModeEnabled_ = veloxCfg_->get<bool>(kDebugModeEnabled, false);
FLAGS_minloglevel = veloxCfg_->get<uint32_t>(kGlogSeverityLevel, FLAGS_minloglevel);
FLAGS_v = veloxCfg_->get<uint32_t>(kGlogVerboseLevel, FLAGS_v);
Expand Down Expand Up @@ -270,7 +271,7 @@ std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerial
}

void VeloxRuntime::dumpConf(const std::string& path) {
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->values();
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigs();
auto allConfMap = backendConfMap;

for (const auto& pair : confMap_) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class VeloxRuntime final : public Runtime {
private:
VeloxMemoryManager* vmm_;
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan_;
std::shared_ptr<facebook::velox::Config> veloxCfg_;
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_;
bool debugModeEnabled_{false};

std::unordered_map<int32_t, std::shared_ptr<ColumnarBatch>> emptySchemaBatchLoopUp_;
Expand Down
13 changes: 7 additions & 6 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ WholeStageResultIterator::WholeStageResultIterator(
const std::unordered_map<std::string, std::string>& confMap,
const SparkTaskInfo& taskInfo)
: memoryManager_(memoryManager),
veloxCfg_(std::make_shared<facebook::velox::core::MemConfig>(confMap)),
veloxCfg_(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap))),
taskInfo_(taskInfo),
veloxPlan_(planNode),
scanNodeIds_(scanNodeIds),
Expand Down Expand Up @@ -175,7 +176,7 @@ WholeStageResultIterator::WholeStageResultIterator(
}

std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQueryCtx() {
std::unordered_map<std::string, std::shared_ptr<velox::Config>> connectorConfigs;
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> connectorConfigs;
connectorConfigs[kHiveConnectorId] = createConnectorConfig();

std::shared_ptr<velox::core::QueryCtx> ctx = velox::core::QueryCtx::create(
Expand Down Expand Up @@ -437,10 +438,10 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
// Find offheap size from Spark confs. If found, set the max memory usage of partial aggregation.
// FIXME this uses process-wise off-heap memory which is not for task
try {
if (veloxCfg_->isValueExists(kDefaultSessionTimezone)) {
if (veloxCfg_->valueExists(kDefaultSessionTimezone)) {
configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get<std::string>(kDefaultSessionTimezone, "");
}
if (veloxCfg_->isValueExists(kSessionTimezone)) {
if (veloxCfg_->valueExists(kSessionTimezone)) {
configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get<std::string>(kSessionTimezone, "");
}
// Adjust timestamp according to the above configured session timezone.
Expand Down Expand Up @@ -519,7 +520,7 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
return configs;
}

std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig() {
std::shared_ptr<velox::config::ConfigBase> WholeStageResultIterator::createConnectorConfig() {
// The configs below are used at session level.
std::unordered_map<std::string, std::string> configs = {};
// The semantics of reading as lower case is opposite with case-sensitive.
Expand All @@ -532,7 +533,7 @@ std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig()
std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));
configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
return std::make_shared<velox::core::MemConfig>(configs);
return std::make_shared<velox::config::ConfigBase>(std::move(configs));
}

} // namespace gluten
6 changes: 3 additions & 3 deletions cpp/velox/compute/WholeStageResultIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
#include "utils/metrics.h"
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/core/Config.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Task.h"

Expand Down Expand Up @@ -80,7 +80,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator {
std::vector<facebook::velox::core::PlanNodeId>& nodeIds);

/// Create connector config.
std::shared_ptr<facebook::velox::Config> createConnectorConfig();
std::shared_ptr<facebook::velox::config::ConfigBase> createConnectorConfig();

/// Construct partition columns.
void constructPartitionColumns(
Expand All @@ -103,7 +103,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator {
VeloxMemoryManager* memoryManager_;

/// Config, task and plan.
std::shared_ptr<Config> veloxCfg_;
std::shared_ptr<config::ConfigBase> veloxCfg_;
const SparkTaskInfo taskInfo_;
std::shared_ptr<facebook::velox::exec::Task> task_;
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan_;
Expand Down
10 changes: 6 additions & 4 deletions cpp/velox/jni/JniFileSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ class FileSystemWrapper : public facebook::velox::filesystems::FileSystem {

class JniFileSystem : public facebook::velox::filesystems::FileSystem {
public:
explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::Config> config) : FileSystem(config) {
explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::config::ConfigBase> config)
: FileSystem(config) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
obj_ = env->NewGlobalRef(obj);
Expand Down Expand Up @@ -374,9 +375,10 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
return [](std::string_view filePath) { return filePath.find(kJniFsScheme) == 0; };
}

static std::function<std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::Config>, std::string_view)>
static std::function<
std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::config::ConfigBase>, std::string_view)>
fileSystemGenerator() {
return [](std::shared_ptr<const facebook::velox::Config> properties, std::string_view filePath) {
return [](std::shared_ptr<const facebook::velox::config::ConfigBase> properties, std::string_view filePath) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem);
Expand Down Expand Up @@ -455,7 +457,7 @@ void gluten::registerJolFileSystem(uint64_t maxFileSize) {

auto fileSystemGenerator =
[maxFileSize](
std::shared_ptr<const facebook::velox::Config> properties,
std::shared_ptr<const facebook::velox::config::ConfigBase> properties,
std::string_view filePath) -> std::shared_ptr<facebook::velox::filesystems::FileSystem> {
// select JNI file if there is enough space
if (JniFileSystem::isCapableForNewFile(maxFileSize)) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VeloxParquetDatasourceABFS final : public VeloxParquetDatasource {
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}

void initSink(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto hiveConf = getHiveConfig(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_)));
auto fileSystem = filesystems::getFileSystem(filePath_, hiveConf);
auto* abfsFileSystem = dynamic_cast<filesystems::abfs::AbfsFileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource {
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}

void initSink(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto hiveConf = getHiveConfig(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_)));
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = hiveConf, .pool = sinkPool_.get()});
}
};
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource {
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}

void initSink(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto hiveConf = getHiveConfig(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_)));
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = hiveConf, .pool = sinkPool_.get()});
}
};
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
std::vector<TypePtr> veloxTypeList;
std::vector<ColumnType> columnTypes;
// Convert field names into lower case when not case-sensitive.
std::unique_ptr<facebook::velox::Config> veloxCfg = std::make_unique<facebook::velox::core::MemConfig>(confMap_);
std::unique_ptr<facebook::velox::config::ConfigBase> veloxCfg =
std::make_unique<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_));
bool asLowerCase = !veloxCfg->get<bool>(kCaseSensitive, false);
if (readRel.has_base_schema()) {
const auto& baseSchema = readRel.base_schema();
Expand Down
17 changes: 15 additions & 2 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ std::string getConfigValue(
return got->second;
}

std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<facebook::velox::Config> conf) {
std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf) {
std::unordered_map<std::string, std::string> hiveConfMap;

#ifdef ENABLE_S3
Expand Down Expand Up @@ -177,10 +178,22 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
}
#endif

#ifdef ENABLE_ABFS
const auto& confValue = conf->rawConfigsCopy();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
connectorConfMap[k] = v;
} else if (k.find("spark.hadoop.fs.azure.account.key") == 0) {
constexpr int32_t accountKeyPrefixLength = 13;
connectorConfMap[k.substr(accountKeyPrefixLength)] = std::string(v);
}
}
#endif

hiveConfMap[facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

return std::make_shared<facebook::velox::core::MemConfig>(std::move(hiveConfMap));
return std::make_shared<facebook::velox::config::ConfigBase>(std::move(hiveConfMap));
}

} // namespace gluten
5 changes: 3 additions & 2 deletions cpp/velox/utils/ConfigExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <unordered_map>

#include "config/GlutenConfig.h"
#include "velox/core/Config.h"
#include "velox/common/config/Config.h"

namespace gluten {

Expand All @@ -33,6 +33,7 @@ std::string getConfigValue(
const std::string& key,
const std::optional<std::string>& fallbackValue);

std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<facebook::velox::Config> conf);
std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf);

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/velox/utils/HdfsUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct Credential {
};
} // namespace

void updateHdfsTokens(const facebook::velox::Config* veloxCfg) {
void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg) {
static std::mutex mtx;
std::lock_guard lock{mtx};

Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/utils/HdfsUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
* limitations under the License.
*/

#include <velox/core/Config.h>
#include <velox/common/config/Config.h>
#include <memory>
namespace gluten {
void updateHdfsTokens(const facebook::velox::Config* veloxCfg);
void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg);
}

0 comments on commit 0729874

Please sign in to comment.