Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Daily Update Velox Version(2024_08_15) #6851

Merged
merged 4 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,22 @@ gluten::Runtime* veloxRuntimeFactory(
} // namespace

void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf) {
backendConf_ = std::make_shared<facebook::velox::core::MemConfig>(conf);
backendConf_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(conf));

// Register Velox runtime factory
gluten::Runtime::registerFactory(gluten::kVeloxRuntimeKind, veloxRuntimeFactory);

if (backendConf_->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->values());
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs());
}

// Init glog and log level.
if (!backendConf_->get<bool>(kDebugModeEnabled, false)) {
FLAGS_v = backendConf_->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
FLAGS_minloglevel = backendConf_->get<uint32_t>(kGlogSeverityLevel, kGlogSeverityLevelDefault);
} else {
if (backendConf_->isValueExists(kGlogVerboseLevel)) {
if (backendConf_->valueExists(kGlogVerboseLevel)) {
FLAGS_v = backendConf_->get<uint32_t>(kGlogVerboseLevel, kGlogVerboseLevelDefault);
} else {
FLAGS_v = kGlogVerboseLevelMaximum;
Expand Down Expand Up @@ -187,15 +188,15 @@ void VeloxBackend::initCache() {

void VeloxBackend::initConnector() {
// The configs below are used at process level.
std::unordered_map<std::string, std::string> connectorConfMap = backendConf_->values();
std::unordered_map<std::string, std::string> connectorConfMap = backendConf_->rawConfigs();

auto hiveConf = getHiveConfig(backendConf_);
for (auto& [k, v] : hiveConf->valuesCopy()) {
for (auto& [k, v] : hiveConf->rawConfigsCopy()) {
connectorConfMap[k] = v;
}

#ifdef ENABLE_ABFS
const auto& confValue = backendConf_->values();
const auto& confValue = backendConf_->rawConfigs();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
connectorConfMap[k] = v;
Expand All @@ -205,6 +206,7 @@ void VeloxBackend::initConnector() {
}
}
#endif

connectorConfMap[velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
backendConf_->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

Expand Down Expand Up @@ -233,7 +235,7 @@ void VeloxBackend::initConnector() {
}
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
kHiveConnectorId,
std::make_shared<facebook::velox::core::MemConfig>(std::move(connectorConfMap)),
std::make_shared<facebook::velox::config::ConfigBase>(std::move(connectorConfMap)),
ioExecutor_.get()));
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#include <filesystem>

#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/config/Config.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/core/Config.h"

namespace gluten {
/// As a static instance in per executor, initialized at executor startup.
Expand All @@ -53,7 +53,7 @@ class VeloxBackend {

facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const;

std::shared_ptr<facebook::velox::Config> getBackendConf() const {
std::shared_ptr<facebook::velox::config::ConfigBase> getBackendConf() const {
return backendConf_;
}

Expand Down Expand Up @@ -92,7 +92,7 @@ class VeloxBackend {
std::string cachePathPrefix_;
std::string cacheFilePrefix_;

std::shared_ptr<facebook::velox::Config> backendConf_;
std::shared_ptr<facebook::velox::config::ConfigBase> backendConf_;
};

} // namespace gluten
5 changes: 3 additions & 2 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ VeloxRuntime::VeloxRuntime(
: Runtime(std::make_shared<VeloxMemoryManager>(std::move(listener)), confMap) {
// Refresh session config.
vmm_ = dynamic_cast<VeloxMemoryManager*>(memoryManager_.get());
veloxCfg_ = std::make_shared<facebook::velox::core::MemConfig>(confMap_);
veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_));
debugModeEnabled_ = veloxCfg_->get<bool>(kDebugModeEnabled, false);
FLAGS_minloglevel = veloxCfg_->get<uint32_t>(kGlogSeverityLevel, FLAGS_minloglevel);
FLAGS_v = veloxCfg_->get<uint32_t>(kGlogVerboseLevel, FLAGS_v);
Expand Down Expand Up @@ -270,7 +271,7 @@ std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerial
}

void VeloxRuntime::dumpConf(const std::string& path) {
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->values();
const auto& backendConfMap = VeloxBackend::get()->getBackendConf()->rawConfigs();
auto allConfMap = backendConfMap;

for (const auto& pair : confMap_) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class VeloxRuntime final : public Runtime {
private:
VeloxMemoryManager* vmm_;
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan_;
std::shared_ptr<facebook::velox::Config> veloxCfg_;
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_;
bool debugModeEnabled_{false};

std::unordered_map<int32_t, std::shared_ptr<ColumnarBatch>> emptySchemaBatchLoopUp_;
Expand Down
13 changes: 7 additions & 6 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ WholeStageResultIterator::WholeStageResultIterator(
const std::unordered_map<std::string, std::string>& confMap,
const SparkTaskInfo& taskInfo)
: memoryManager_(memoryManager),
veloxCfg_(std::make_shared<facebook::velox::core::MemConfig>(confMap)),
veloxCfg_(
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap))),
taskInfo_(taskInfo),
veloxPlan_(planNode),
scanNodeIds_(scanNodeIds),
Expand Down Expand Up @@ -175,7 +176,7 @@ WholeStageResultIterator::WholeStageResultIterator(
}

std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQueryCtx() {
std::unordered_map<std::string, std::shared_ptr<velox::Config>> connectorConfigs;
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> connectorConfigs;
connectorConfigs[kHiveConnectorId] = createConnectorConfig();

std::shared_ptr<velox::core::QueryCtx> ctx = velox::core::QueryCtx::create(
Expand Down Expand Up @@ -437,10 +438,10 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
// Find offheap size from Spark confs. If found, set the max memory usage of partial aggregation.
// FIXME this uses process-wise off-heap memory which is not for task
try {
if (veloxCfg_->isValueExists(kDefaultSessionTimezone)) {
if (veloxCfg_->valueExists(kDefaultSessionTimezone)) {
configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get<std::string>(kDefaultSessionTimezone, "");
}
if (veloxCfg_->isValueExists(kSessionTimezone)) {
if (veloxCfg_->valueExists(kSessionTimezone)) {
configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get<std::string>(kSessionTimezone, "");
}
// Adjust timestamp according to the above configured session timezone.
Expand Down Expand Up @@ -519,7 +520,7 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
return configs;
}

std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig() {
std::shared_ptr<velox::config::ConfigBase> WholeStageResultIterator::createConnectorConfig() {
// The configs below are used at session level.
std::unordered_map<std::string, std::string> configs = {};
// The semantics of reading as lower case is opposite with case-sensitive.
Expand All @@ -532,7 +533,7 @@ std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig()
std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));
configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] =
std::to_string(veloxCfg_->get<bool>(kIgnoreMissingFiles, false));
return std::make_shared<velox::core::MemConfig>(configs);
return std::make_shared<velox::config::ConfigBase>(std::move(configs));
}

} // namespace gluten
6 changes: 3 additions & 3 deletions cpp/velox/compute/WholeStageResultIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
#include "utils/metrics.h"
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/core/Config.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Task.h"

Expand Down Expand Up @@ -80,7 +80,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator {
std::vector<facebook::velox::core::PlanNodeId>& nodeIds);

/// Create connector config.
std::shared_ptr<facebook::velox::Config> createConnectorConfig();
std::shared_ptr<facebook::velox::config::ConfigBase> createConnectorConfig();

/// Construct partition columns.
void constructPartitionColumns(
Expand All @@ -103,7 +103,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator {
VeloxMemoryManager* memoryManager_;

/// Config, task and plan.
std::shared_ptr<Config> veloxCfg_;
std::shared_ptr<config::ConfigBase> veloxCfg_;
const SparkTaskInfo taskInfo_;
std::shared_ptr<facebook::velox::exec::Task> task_;
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan_;
Expand Down
10 changes: 6 additions & 4 deletions cpp/velox/jni/JniFileSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ class FileSystemWrapper : public facebook::velox::filesystems::FileSystem {

class JniFileSystem : public facebook::velox::filesystems::FileSystem {
public:
explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::Config> config) : FileSystem(config) {
explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::config::ConfigBase> config)
: FileSystem(config) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
obj_ = env->NewGlobalRef(obj);
Expand Down Expand Up @@ -374,9 +375,10 @@ class JniFileSystem : public facebook::velox::filesystems::FileSystem {
return [](std::string_view filePath) { return filePath.find(kJniFsScheme) == 0; };
}

static std::function<std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::Config>, std::string_view)>
static std::function<
std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::config::ConfigBase>, std::string_view)>
fileSystemGenerator() {
return [](std::shared_ptr<const facebook::velox::Config> properties, std::string_view filePath) {
return [](std::shared_ptr<const facebook::velox::config::ConfigBase> properties, std::string_view filePath) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm, &env);
jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem);
Expand Down Expand Up @@ -455,7 +457,7 @@ void gluten::registerJolFileSystem(uint64_t maxFileSize) {

auto fileSystemGenerator =
[maxFileSize](
std::shared_ptr<const facebook::velox::Config> properties,
std::shared_ptr<const facebook::velox::config::ConfigBase> properties,
std::string_view filePath) -> std::shared_ptr<facebook::velox::filesystems::FileSystem> {
// select JNI file if there is enough space
if (JniFileSystem::isCapableForNewFile(maxFileSize)) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VeloxParquetDatasourceABFS final : public VeloxParquetDatasource {
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}

void initSink(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
std::unordered_map<std::string, std::string>(sparkConfs)));
auto fileSystem = filesystems::getFileSystem(filePath_, hiveConf);
auto* abfsFileSystem = dynamic_cast<filesystems::abfs::AbfsFileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource {
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}

void initSink(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
std::unordered_map<std::string, std::string>(sparkConfs)));
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = hiveConf, .pool = sinkPool_.get()});
}
};
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource {
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}

void initSink(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::core::MemConfig>(sparkConfs));
auto hiveConf = getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
std::unordered_map<std::string, std::string>(sparkConfs)));
sink_ = dwio::common::FileSink::create(filePath_, {.connectorProperties = hiveConf, .pool = sinkPool_.get()});
}
};
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
std::vector<TypePtr> veloxTypeList;
std::vector<ColumnType> columnTypes;
// Convert field names into lower case when not case-sensitive.
std::unique_ptr<facebook::velox::Config> veloxCfg = std::make_unique<facebook::velox::core::MemConfig>(confMap_);
std::unique_ptr<facebook::velox::config::ConfigBase> veloxCfg =
std::make_unique<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_));
bool asLowerCase = !veloxCfg->get<bool>(kCaseSensitive, false);
if (readRel.has_base_schema()) {
const auto& baseSchema = readRel.base_schema();
Expand Down
16 changes: 9 additions & 7 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ std::string getConfigValue(
return got->second;
}

std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<facebook::velox::Config> conf) {
std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf) {
std::unordered_map<std::string, std::string> hiveConfMap;

#ifdef ENABLE_S3
Expand Down Expand Up @@ -125,7 +126,7 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<

#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
auto gsStorageRootUrl = conf->get("spark.hadoop.fs.gs.storage.root.url");
auto gsStorageRootUrl = conf->get<std::string>("spark.hadoop.fs.gs.storage.root.url");
if (gsStorageRootUrl.hasValue()) {
std::string url = gsStorageRootUrl.value();
std::string gcsScheme;
Expand All @@ -146,23 +147,24 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<

// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#http-transport-configuration
// https://cloud.google.com/cpp/docs/reference/storage/latest/classgoogle_1_1cloud_1_1storage_1_1LimitedErrorCountRetryPolicy
auto gsMaxRetryCount = conf->get("spark.hadoop.fs.gs.http.max.retry");
auto gsMaxRetryCount = conf->get<std::string>("spark.hadoop.fs.gs.http.max.retry");
if (gsMaxRetryCount.hasValue()) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kGCSMaxRetryCount] = gsMaxRetryCount.value();
}

// https://cloud.google.com/cpp/docs/reference/storage/latest/classgoogle_1_1cloud_1_1storage_1_1LimitedTimeRetryPolicy
auto gsMaxRetryTime = conf->get("spark.hadoop.fs.gs.http.max.retry-time");
auto gsMaxRetryTime = conf->get<std::string>("spark.hadoop.fs.gs.http.max.retry-time");
if (gsMaxRetryTime.hasValue()) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kGCSMaxRetryTime] = gsMaxRetryTime.value();
}

// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
auto gsAuthType = conf->get("spark.hadoop.fs.gs.auth.type");
auto gsAuthType = conf->get<std::string>("spark.hadoop.fs.gs.auth.type");
if (gsAuthType.hasValue()) {
std::string type = gsAuthType.value();
if (type == "SERVICE_ACCOUNT_JSON_KEYFILE") {
auto gsAuthServiceAccountJsonKeyfile = conf->get("spark.hadoop.fs.gs.auth.service.account.json.keyfile");
auto gsAuthServiceAccountJsonKeyfile =
conf->get<std::string>("spark.hadoop.fs.gs.auth.service.account.json.keyfile");
if (gsAuthServiceAccountJsonKeyfile.hasValue()) {
auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile.value());
stream.exceptions(std::ios::badbit);
Expand All @@ -180,7 +182,7 @@ std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache] =
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false";

return std::make_shared<facebook::velox::core::MemConfig>(std::move(hiveConfMap));
return std::make_shared<facebook::velox::config::ConfigBase>(std::move(hiveConfMap));
}

} // namespace gluten
5 changes: 3 additions & 2 deletions cpp/velox/utils/ConfigExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <unordered_map>

#include "config/GlutenConfig.h"
#include "velox/core/Config.h"
#include "velox/common/config/Config.h"

namespace gluten {

Expand All @@ -33,6 +33,7 @@ std::string getConfigValue(
const std::string& key,
const std::optional<std::string>& fallbackValue);

std::shared_ptr<facebook::velox::core::MemConfig> getHiveConfig(std::shared_ptr<facebook::velox::Config> conf);
std::shared_ptr<facebook::velox::config::ConfigBase> getHiveConfig(
std::shared_ptr<facebook::velox::config::ConfigBase> conf);

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/velox/utils/HdfsUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct Credential {
};
} // namespace

void updateHdfsTokens(const facebook::velox::Config* veloxCfg) {
void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg) {
static std::mutex mtx;
std::lock_guard lock{mtx};

Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/utils/HdfsUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
* limitations under the License.
*/

#include <velox/core/Config.h>
#include <velox/common/config/Config.h>
#include <memory>
namespace gluten {
void updateHdfsTokens(const facebook::velox::Config* veloxCfg);
void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg);
}
2 changes: 1 addition & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_08_14
VELOX_BRANCH=2024_08_15
VELOX_HOME=""

OS=`uname -s`
Expand Down
Loading