diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 2dad6adf2e70..25d1990b6057 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -63,13 +63,14 @@ gluten::Runtime* veloxRuntimeFactory( } // namespace void VeloxBackend::init(const std::unordered_map& conf) { - backendConf_ = std::make_shared(conf); + backendConf_ = + std::make_shared(std::unordered_map(conf)); // Register Velox runtime factory gluten::Runtime::registerFactory(gluten::kVeloxRuntimeKind, veloxRuntimeFactory); if (backendConf_->get(kDebugModeEnabled, false)) { - LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->values()); + LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs()); } // Init glog and log level. @@ -77,7 +78,7 @@ void VeloxBackend::init(const std::unordered_map& conf FLAGS_v = backendConf_->get(kGlogVerboseLevel, kGlogVerboseLevelDefault); FLAGS_minloglevel = backendConf_->get(kGlogSeverityLevel, kGlogSeverityLevelDefault); } else { - if (backendConf_->isValueExists(kGlogVerboseLevel)) { + if (backendConf_->valueExists(kGlogVerboseLevel)) { FLAGS_v = backendConf_->get(kGlogVerboseLevel, kGlogVerboseLevelDefault); } else { FLAGS_v = kGlogVerboseLevelMaximum; @@ -187,24 +188,13 @@ void VeloxBackend::initCache() { void VeloxBackend::initConnector() { // The configs below are used at process level. - std::unordered_map connectorConfMap = backendConf_->values(); + std::unordered_map connectorConfMap = backendConf_->rawConfigsCopy(); auto hiveConf = getHiveConfig(backendConf_); - for (auto& [k, v] : hiveConf->valuesCopy()) { + for (auto& [k, v] : hiveConf->rawConfigsCopy()) { connectorConfMap[k] = v; } -#ifdef ENABLE_ABFS - const auto& confValue = backendConf_->values(); - for (auto& [k, v] : confValue) { - if (k.find("fs.azure.account.key") == 0) { - connectorConfMap[k] = v; - } else if (k.find("spark.hadoop.fs.azure.account.key") == 0) { - constexpr int32_t accountKeyPrefixLength = 13; - connectorConfMap[k.substr(accountKeyPrefixLength)] = v; - } - } -#endif connectorConfMap[velox::connector::hive::HiveConfig::kEnableFileHandleCache] = backendConf_->get(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false"; @@ -233,7 +223,7 @@ void VeloxBackend::initConnector() { } velox::connector::registerConnector(std::make_shared( kHiveConnectorId, - std::make_shared(std::move(connectorConfMap)), + std::make_shared(std::move(connectorConfMap)), ioExecutor_.get())); } diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index e8298eeed192..df0442855978 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -25,9 +25,9 @@ #include #include "velox/common/caching/AsyncDataCache.h" +#include "velox/common/config/Config.h" #include "velox/common/memory/MemoryPool.h" #include "velox/common/memory/MmapAllocator.h" -#include "velox/core/Config.h" namespace gluten { /// As a static instance in per executor, initialized at executor startup. @@ -53,7 +53,7 @@ class VeloxBackend { facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const; - std::shared_ptr getBackendConf() const { + std::shared_ptr getBackendConf() const { return backendConf_; } @@ -92,7 +92,7 @@ class VeloxBackend { std::string cachePathPrefix_; std::string cacheFilePrefix_; - std::shared_ptr backendConf_; + std::shared_ptr backendConf_; }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index cdce781bd528..996fcb850727 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -62,7 +62,8 @@ VeloxRuntime::VeloxRuntime( : Runtime(std::make_shared(std::move(listener)), confMap) { // Refresh session config. vmm_ = dynamic_cast(memoryManager_.get()); - veloxCfg_ = std::make_shared(confMap_); + veloxCfg_ = + std::make_shared(std::unordered_map(confMap_)); debugModeEnabled_ = veloxCfg_->get(kDebugModeEnabled, false); FLAGS_minloglevel = veloxCfg_->get(kGlogSeverityLevel, FLAGS_minloglevel); FLAGS_v = veloxCfg_->get(kGlogVerboseLevel, FLAGS_v); @@ -270,7 +271,7 @@ std::unique_ptr VeloxRuntime::createColumnarBatchSerial } void VeloxRuntime::dumpConf(const std::string& path) { - const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->values(); + const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigs(); auto allConfMap = backendConfMap; for (const auto& pair : confMap_) { diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 952a103ed8ad..3460677d9113 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -101,7 +101,7 @@ class VeloxRuntime final : public Runtime { private: VeloxMemoryManager* vmm_; std::shared_ptr veloxPlan_; - std::shared_ptr veloxCfg_; + std::shared_ptr veloxCfg_; bool debugModeEnabled_{false}; std::unordered_map> emptySchemaBatchLoopUp_; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index eb700c6489ec..b9c7900017d9 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -62,7 +62,8 @@ WholeStageResultIterator::WholeStageResultIterator( const std::unordered_map& confMap, const SparkTaskInfo& taskInfo) : memoryManager_(memoryManager), - veloxCfg_(std::make_shared(confMap)), + veloxCfg_( + std::make_shared(std::unordered_map(confMap))), taskInfo_(taskInfo), veloxPlan_(planNode), scanNodeIds_(scanNodeIds), @@ -175,7 +176,7 @@ WholeStageResultIterator::WholeStageResultIterator( } std::shared_ptr WholeStageResultIterator::createNewVeloxQueryCtx() { - std::unordered_map> connectorConfigs; + std::unordered_map> connectorConfigs; connectorConfigs[kHiveConnectorId] = createConnectorConfig(); std::shared_ptr ctx = velox::core::QueryCtx::create( @@ -437,10 +438,10 @@ std::unordered_map WholeStageResultIterator::getQueryC // Find offheap size from Spark confs. If found, set the max memory usage of partial aggregation. // FIXME this uses process-wise off-heap memory which is not for task try { - if (veloxCfg_->isValueExists(kDefaultSessionTimezone)) { + if (veloxCfg_->valueExists(kDefaultSessionTimezone)) { configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get(kDefaultSessionTimezone, ""); } - if (veloxCfg_->isValueExists(kSessionTimezone)) { + if (veloxCfg_->valueExists(kSessionTimezone)) { configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get(kSessionTimezone, ""); } // Adjust timestamp according to the above configured session timezone. @@ -519,7 +520,7 @@ std::unordered_map WholeStageResultIterator::getQueryC return configs; } -std::shared_ptr WholeStageResultIterator::createConnectorConfig() { +std::shared_ptr WholeStageResultIterator::createConnectorConfig() { // The configs below are used at session level. std::unordered_map configs = {}; // The semantics of reading as lower case is opposite with case-sensitive. @@ -532,7 +533,7 @@ std::shared_ptr WholeStageResultIterator::createConnectorConfig() std::to_string(veloxCfg_->get(kMaxPartitions, 10000)); configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] = std::to_string(veloxCfg_->get(kIgnoreMissingFiles, false)); - return std::make_shared(configs); + return std::make_shared(std::move(configs)); } } // namespace gluten diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 5e661f40485a..371ec0c14c53 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -23,8 +23,8 @@ #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" #include "utils/metrics.h" +#include "velox/common/config/Config.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" -#include "velox/core/Config.h" #include "velox/core/PlanNode.h" #include "velox/exec/Task.h" @@ -80,7 +80,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator { std::vector& nodeIds); /// Create connector config. - std::shared_ptr createConnectorConfig(); + std::shared_ptr createConnectorConfig(); /// Construct partition columns. void constructPartitionColumns( @@ -103,7 +103,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator { VeloxMemoryManager* memoryManager_; /// Config, task and plan. - std::shared_ptr veloxCfg_; + std::shared_ptr veloxCfg_; const SparkTaskInfo taskInfo_; std::shared_ptr task_; std::shared_ptr veloxPlan_; diff --git a/cpp/velox/jni/JniFileSystem.cc b/cpp/velox/jni/JniFileSystem.cc index 8bf791c234e8..55c1dc734969 100644 --- a/cpp/velox/jni/JniFileSystem.cc +++ b/cpp/velox/jni/JniFileSystem.cc @@ -261,7 +261,8 @@ class FileSystemWrapper : public facebook::velox::filesystems::FileSystem { class JniFileSystem : public facebook::velox::filesystems::FileSystem { public: - explicit JniFileSystem(jobject obj, std::shared_ptr config) : FileSystem(config) { + explicit JniFileSystem(jobject obj, std::shared_ptr config) + : FileSystem(config) { JNIEnv* env; attachCurrentThreadAsDaemonOrThrow(vm, &env); obj_ = env->NewGlobalRef(obj); @@ -374,9 +375,10 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem { return [](std::string_view filePath) { return filePath.find(kJniFsScheme) == 0; }; } - static std::function(std::shared_ptr, std::string_view)> + static std::function< + std::shared_ptr(std::shared_ptr, std::string_view)> fileSystemGenerator() { - return [](std::shared_ptr properties, std::string_view filePath) { + return [](std::shared_ptr properties, std::string_view filePath) { JNIEnv* env; attachCurrentThreadAsDaemonOrThrow(vm, &env); jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem); @@ -455,7 +457,7 @@ void gluten::registerJolFileSystem(uint64_t maxFileSize) { auto fileSystemGenerator = [maxFileSize]( - std::shared_ptr properties, + std::shared_ptr properties, std::string_view filePath) -> std::shared_ptr { // select JNI file if there is enough space if (JniFileSystem::isCapableForNewFile(maxFileSize)) { diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h b/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h index 82e8f794cbce..ee54865ff66c 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h +++ b/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h @@ -43,7 +43,8 @@ class VeloxParquetDatasourceABFS final : public VeloxParquetDatasource { : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {} void initSink(const std::unordered_map& sparkConfs) override { - auto hiveConf = getHiveConfig(std::make_shared(sparkConfs)); + auto hiveConf = getHiveConfig( + std::make_shared(std::unordered_map(confMap_))); auto fileSystem = filesystems::getFileSystem(filePath_, hiveConf); auto* abfsFileSystem = dynamic_cast(fileSystem.get()); sink_ = std::make_unique( diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h index 7722c8e51993..a91377a32ec3 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h +++ b/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h @@ -43,7 +43,8 @@ class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource { : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {} void initSink(const std::unordered_map& sparkConfs) override { - auto hiveConf = getHiveConfig(std::make_shared(sparkConfs)); + auto hiveConf = getHiveConfig( + std::make_shared(std::unordered_map(confMap_))); sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = hiveConf, .pool = sinkPool_.get()}); } }; diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h index 3231a8a1ee5c..26928b297571 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h +++ b/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h @@ -43,7 +43,8 @@ class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource { : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {} void initSink(const std::unordered_map& sparkConfs) override { - auto hiveConf = getHiveConfig(std::make_shared(sparkConfs)); + auto hiveConf = getHiveConfig( + std::make_shared(std::unordered_map(confMap_))); sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = hiveConf, .pool = sinkPool_.get()}); } }; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index d7de841191ed..1604c15e338a 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1148,7 +1148,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::vector veloxTypeList; std::vector columnTypes; // Convert field names into lower case when not case-sensitive. - std::unique_ptr veloxCfg = std::make_unique(confMap_); + std::unique_ptr veloxCfg = + std::make_unique(std::unordered_map(confMap_)); bool asLowerCase = !veloxCfg->get(kCaseSensitive, false); if (readRel.has_base_schema()) { const auto& baseSchema = readRel.base_schema(); diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index 816166351c0e..076427cd6aaa 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -59,7 +59,8 @@ std::string getConfigValue( return got->second; } -std::shared_ptr getHiveConfig(std::shared_ptr conf) { +std::shared_ptr getHiveConfig( + std::shared_ptr conf) { std::unordered_map hiveConfMap; #ifdef ENABLE_S3 @@ -177,10 +178,22 @@ std::shared_ptr getHiveConfig(std::shared_ptr< } #endif +#ifdef ENABLE_ABFS + const auto& confValue = conf->rawConfigsCopy(); + for (auto& [k, v] : confValue) { + if (k.find("fs.azure.account.key") == 0) { + connectorConfMap[k] = v; + } else if (k.find("spark.hadoop.fs.azure.account.key") == 0) { + constexpr int32_t accountKeyPrefixLength = 13; + connectorConfMap[k.substr(accountKeyPrefixLength)] = std::string(v); + } + } +#endif + hiveConfMap[facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache] = conf->get(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false"; - return std::make_shared(std::move(hiveConfMap)); + return std::make_shared(std::move(hiveConfMap)); } } // namespace gluten diff --git a/cpp/velox/utils/ConfigExtractor.h b/cpp/velox/utils/ConfigExtractor.h index c5f662c950de..4cbfdf991f42 100644 --- a/cpp/velox/utils/ConfigExtractor.h +++ b/cpp/velox/utils/ConfigExtractor.h @@ -24,7 +24,7 @@ #include #include "config/GlutenConfig.h" -#include "velox/core/Config.h" +#include "velox/common/config/Config.h" namespace gluten { @@ -33,6 +33,7 @@ std::string getConfigValue( const std::string& key, const std::optional& fallbackValue); -std::shared_ptr getHiveConfig(std::shared_ptr conf); +std::shared_ptr getHiveConfig( + std::shared_ptr conf); } // namespace gluten diff --git a/cpp/velox/utils/HdfsUtils.cc b/cpp/velox/utils/HdfsUtils.cc index a912c04eee7e..1bc326d4ca48 100644 --- a/cpp/velox/utils/HdfsUtils.cc +++ b/cpp/velox/utils/HdfsUtils.cc @@ -36,7 +36,7 @@ struct Credential { }; } // namespace -void updateHdfsTokens(const facebook::velox::Config* veloxCfg) { +void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg) { static std::mutex mtx; std::lock_guard lock{mtx}; diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h index cd017f250ad2..2e07d7ddf41b 100644 --- a/cpp/velox/utils/HdfsUtils.h +++ b/cpp/velox/utils/HdfsUtils.h @@ -15,8 +15,8 @@ * limitations under the License. */ -#include +#include #include namespace gluten { -void updateHdfsTokens(const facebook::velox::Config* veloxCfg); +void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg); }