Skip to content

Commit

Permalink
[GLUTEN-4121][VL] Initialize s3 filesystem with hive configuration in…
Browse files Browse the repository at this point in the history
… native write (#4129)

* Initialize s3 filesystem with hive configuration in native write

* add configuration in native session conf

* use FileSink
  • Loading branch information
dcoliversun authored Dec 22, 2023
1 parent 7e0779e commit 3e23ad4
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 105 deletions.
104 changes: 4 additions & 100 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"
#include "utils/ConfigExtractor.h"

#include "shuffle/VeloxShuffleReader.h"

Expand All @@ -31,9 +32,6 @@
#ifdef GLUTEN_ENABLE_IAA
#include "utils/qpl/qpl_codec.h"
#endif
#ifdef ENABLE_GCS
#include <fstream>
#endif
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "jni/JniFileSystem.h"
Expand All @@ -43,7 +41,6 @@
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/serializers/PrestoSerializer.h"
Expand Down Expand Up @@ -114,13 +111,6 @@ const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";
// VeloxShuffleReader print flag.
const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag";

const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
const bool kVeloxFileHandleCacheEnabledDefault = false;

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";

} // namespace

namespace gluten {
Expand Down Expand Up @@ -265,51 +255,10 @@ void VeloxBackend::initConnector(const std::shared_ptr<const facebook::velox::Co

auto mutableConf = std::make_shared<facebook::velox::core::MemConfigMutable>(conf->valuesCopy());

#ifdef ENABLE_S3
std::string awsAccessKey = conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");

std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
auto hiveConf = getHiveConfig(conf);
for (auto& [k, v] : hiveConf->valuesCopy()) {
mutableConf->setValue(k, v);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}

std::unordered_map<std::string, std::string> s3Config({});
if (useInstanceCredentials) {
mutableConf->setValue("hive.s3.use-instance-credentials", "true");
} else if (!iamRole.empty()) {
mutableConf->setValue("hive.s3.iam-role", iamRole);
if (!iamRoleSessionName.empty()) {
mutableConf->setValue("hive.s3.iam-role-session-name", iamRoleSessionName);
}
} else {
mutableConf->setValue("hive.s3.aws-access-key", awsAccessKey);
mutableConf->setValue("hive.s3.aws-secret-key", awsSecretKey);
}
// Only need to set s3 endpoint when not use instance credentials.
if (!useInstanceCredentials) {
mutableConf->setValue("hive.s3.endpoint", awsEndpoint);
}
mutableConf->setValue("hive.s3.ssl.enabled", sslEnabled ? "true" : "false");
mutableConf->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false");
mutableConf->setValue("hive.s3.log-level", awsSdkLogLevel);
#endif

#ifdef ENABLE_ABFS
const auto& confValue = conf->valuesCopy();
Expand All @@ -323,51 +272,6 @@ void VeloxBackend::initConnector(const std::shared_ptr<const facebook::velox::Co
}
#endif

#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
auto gsStorageRootUrl = conf->get("spark.hadoop.fs.gs.storage.root.url");
if (gsStorageRootUrl.hasValue()) {
std::string url = gsStorageRootUrl.value();
std::string gcsScheme;
std::string gcsEndpoint;

const auto sep = std::string("://");
const auto pos = url.find_first_of(sep);
if (pos != std::string::npos) {
gcsScheme = url.substr(0, pos);
gcsEndpoint = url.substr(pos + sep.length());
}

if (!gcsEndpoint.empty() && !gcsScheme.empty()) {
mutableConf->setValue("hive.gcs.scheme", gcsScheme);
mutableConf->setValue("hive.gcs.endpoint", gcsEndpoint);
}
}

// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
auto gsAuthType = conf->get("spark.hadoop.fs.gs.auth.type");
if (gsAuthType.hasValue()) {
std::string type = gsAuthType.value();
if (type == "SERVICE_ACCOUNT_JSON_KEYFILE") {
auto gsAuthServiceAccountJsonKeyfile = conf->get("spark.hadoop.fs.gs.auth.service.account.json.keyfile");
if (gsAuthServiceAccountJsonKeyfile.hasValue()) {
auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile.value());
stream.exceptions(std::ios::badbit);
std::string gsAuthServiceAccountJson = std::string(std::istreambuf_iterator<char>(stream.rdbuf()), {});
mutableConf->setValue("hive.gcs.credentials", gsAuthServiceAccountJson);
} else {
LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, "
"however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set";
throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set");
}
}
}
#endif

mutableConf->setValue(
velox::connector::hive::HiveConfig::kEnableFileHandleCache,
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false");

if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
Expand Down
11 changes: 7 additions & 4 deletions cpp/velox/operators/writer/VeloxParquetDatasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"

#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/common/compression/Compression.h"
#include "velox/core/QueryConfig.h"
Expand All @@ -49,10 +50,12 @@ void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::str
sink_ = std::make_unique<WriteFileSink>(std::move(localWriteFile), path);
} else if (isSupportedS3SdkPath(filePath_)) {
#ifdef ENABLE_S3
auto fileSystem = getFileSystem(filePath_, nullptr);
auto* s3FileSystem = dynamic_cast<filesystems::S3FileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
s3FileSystem->openFileForWrite(filePath_, {{}, s3SinkPool_.get()}), filePath_);
auto confs = std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
auto hiveConfs = getHiveConfig(confs);
sink_ = dwio::common::FileSink::create(
filePath_,
{.connectorProperties = std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()),
.pool = s3SinkPool_.get()});
#else
throw std::runtime_error(
"The write path is S3 path but the S3 haven't been enabled when writing parquet data in velox runtime!");
Expand Down
115 changes: 115 additions & 0 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@

#include "ConfigExtractor.h"
#include <stdexcept>
#ifdef ENABLE_GCS
#include <fstream>
#endif

#include "utils/exception.h"
#include "velox/connectors/hive/HiveConfig.h"

namespace {

const std::string kVeloxFileHandleCacheEnabled = "spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
const bool kVeloxFileHandleCacheEnabledDefault = false;

// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
} // namespace

namespace gluten {

Expand All @@ -41,4 +57,103 @@ bool debugModeEnabled(const std::unordered_map<std::string, std::string>& confMa
std::make_shared<const facebook::velox::core::MemConfigMutable>(confMap);
return veloxCfg->get<bool>(kDebugModeEnabled, false);
}

std::shared_ptr<facebook::velox::core::MemConfigMutable> getHiveConfig(
const std::shared_ptr<const facebook::velox::Config>& conf) {
auto hiveConf = std::make_shared<facebook::velox::core::MemConfigMutable>();

#ifdef ENABLE_S3
std::string awsAccessKey = conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
std::string awsSecretKey = conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
std::string awsEndpoint = conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
bool sslEnabled = conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
bool pathStyleAccess = conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
bool useInstanceCredentials = conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", "");
std::string iamRoleSessionName = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");

std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, kVeloxAwsSdkLogLevelDefault);

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}

if (useInstanceCredentials) {
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials, "true");
} else if (!iamRole.empty()) {
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3IamRole, iamRole);
if (!iamRoleSessionName.empty()) {
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3IamRoleSessionName, iamRoleSessionName);
}
} else {
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3AwsAccessKey, awsAccessKey);
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3AwsSecretKey, awsSecretKey);
}
// Only need to set s3 endpoint when not use instance credentials.
if (!useInstanceCredentials) {
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3Endpoint, awsEndpoint);
}
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled, sslEnabled ? "true" : "false");
hiveConf->setValue(
facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess, pathStyleAccess ? "true" : "false");
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kS3LogLevel, awsSdkLogLevel);
#endif

#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
auto gsStorageRootUrl = conf->get("spark.hadoop.fs.gs.storage.root.url");
if (gsStorageRootUrl.hasValue()) {
std::string url = gsStorageRootUrl.value();
std::string gcsScheme;
std::string gcsEndpoint;

const auto sep = std::string("://");
const auto pos = url.find_first_of(sep);
if (pos != std::string::npos) {
gcsScheme = url.substr(0, pos);
gcsEndpoint = url.substr(pos + sep.length());
}

if (!gcsEndpoint.empty() && !gcsScheme.empty()) {
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kGCSScheme, gcsScheme);
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kGCSEndpoint, gcsEndpoint);
}
}

// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
auto gsAuthType = conf->get("spark.hadoop.fs.gs.auth.type");
if (gsAuthType.hasValue()) {
std::string type = gsAuthType.value();
if (type == "SERVICE_ACCOUNT_JSON_KEYFILE") {
auto gsAuthServiceAccountJsonKeyfile = conf->get("spark.hadoop.fs.gs.auth.service.account.json.keyfile");
if (gsAuthServiceAccountJsonKeyfile.hasValue()) {
auto stream = std::ifstream(gsAuthServiceAccountJsonKeyfile.value());
stream.exceptions(std::ios::badbit);
std::string gsAuthServiceAccountJson = std::string(std::istreambuf_iterator<char>(stream.rdbuf()), {});
hiveConf->setValue(facebook::velox::connector::hive::HiveConfig::kGCSCredentials, gsAuthServiceAccountJson);
} else {
LOG(WARNING) << "STARTUP: conf spark.hadoop.fs.gs.auth.type is set to SERVICE_ACCOUNT_JSON_KEYFILE, "
"however conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set";
throw GlutenException("Conf spark.hadoop.fs.gs.auth.service.account.json.keyfile is not set");
}
}
}
#endif

hiveConf->setValue(
facebook::velox::connector::hive::HiveConfig::kEnableFileHandleCache,
conf->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false");

return hiveConf;
}

} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/velox/utils/ConfigExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ std::string getConfigValue(

bool debugModeEnabled(const std::unordered_map<std::string, std::string>& confMap);

std::shared_ptr<facebook::velox::core::MemConfigMutable> getHiveConfig(
const std::shared_ptr<const facebook::velox::Config>& conf);

} // namespace gluten
26 changes: 25 additions & 1 deletion shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,16 @@ object GlutenConfig {
val ABFS_ACCOUNT_KEY = "hadoop.fs.azure.account.key"
val SPARK_ABFS_ACCOUNT_KEY: String = "spark." + ABFS_ACCOUNT_KEY

// GCS config
val GCS_PREFIX = "fs.gs."
val GCS_STORAGE_ROOT_URL = "fs.gs.storage.root.url"
val SPARK_GCS_STORAGE_ROOT_URL: String = HADOOP_PREFIX + GCS_STORAGE_ROOT_URL
val GCS_AUTH_TYPE = "fs.gs.auth.type"
val SPARK_GCS_AUTH_TYPE: String = HADOOP_PREFIX + GCS_AUTH_TYPE
val GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE = "fs.gs.auth.service.account.json.keyfile"
val SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE: String =
HADOOP_PREFIX + GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE

// QAT config
val GLUTEN_QAT_BACKEND_NAME = "qat"
val GLUTEN_QAT_SUPPORTED_CODEC: Set[String] = Set("gzip", "zstd")
Expand Down Expand Up @@ -461,7 +471,21 @@ object GlutenConfig {
"spark.io.compression.codec",
COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key,
COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS.key,
COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key
COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key,
// s3 config
SPARK_S3_ACCESS_KEY,
SPARK_S3_SECRET_KEY,
SPARK_S3_ENDPOINT,
SPARK_S3_CONNECTION_SSL_ENABLED,
SPARK_S3_PATH_STYLE_ACCESS,
SPARK_S3_USE_INSTANCE_CREDENTIALS,
SPARK_S3_IAM,
SPARK_S3_IAM_SESSION_NAME,
AWS_SDK_LOG_LEVEL.key,
// gcs config
SPARK_GCS_STORAGE_ROOT_URL,
SPARK_GCS_AUTH_TYPE,
SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE
)
keys.forEach(
k => {
Expand Down

0 comments on commit 3e23ad4

Please sign in to comment.