From 18270d201bc30a5f609534b8fbe359653798d938 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 10 Dec 2024 21:30:21 +0800 Subject: [PATCH] Support viewfs file system in velox --- .../storage_adapters/hdfs/HdfsFileSystem.cpp | 26 ++++++++++++------- .../storage_adapters/hdfs/HdfsFileSystem.h | 10 +++++-- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index d1bbcb483e01..9479279ee54a 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -22,6 +22,8 @@ namespace facebook::velox::filesystems { std::string_view HdfsFileSystem::kScheme("hdfs://"); +std::string_view HdfsFileSystem::kViewfsScheme("viewfs://"); + class HdfsFileSystem::Impl { public: // Keep config here for possible use in the future. @@ -35,8 +37,15 @@ class HdfsFileSystem::Impl { // connect to HDFS with the builder object hdfsBuilder* builder = driver_->NewBuilder(); - driver_->BuilderSetNameNode(builder, endpoint.host.c_str()); - driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + if (endpoint.isViewfs) { + // The default NameNode configuration will be used (from the XML + // configuration files). See: + // https://github.com/facebookincubator/velox/blob/main/velox/external/hdfs/hdfs.h#L289 + driver_->BuilderSetNameNode(builder, "default"); + } else { + driver_->BuilderSetNameNode(builder, endpoint.host.c_str()); + driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data())); + } driver_->BuilderSetForceNewInstance(builder); hdfsClient_ = driver_->BuilderConnect(builder); VELOX_CHECK_NOT_NULL( @@ -82,13 +91,6 @@ std::string HdfsFileSystem::name() const { std::unique_ptr HdfsFileSystem::openFileForRead( std::string_view path, const FileOptions& /*unused*/) { - if (path.find(kScheme) == 0) { - path.remove_prefix(kScheme.length()); - } - if (auto index = path.find('/')) { - path.remove_prefix(index); - } - return std::make_unique( impl_->hdfsShim(), impl_->hdfsClient(), path); } @@ -101,7 +103,7 @@ std::unique_ptr HdfsFileSystem::openFileForWrite( } bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { - return filePath.find(kScheme) == 0; + return (filePath.find(kScheme) == 0) || (filePath.find(kViewfsScheme) == 0); } /// Gets hdfs endpoint from a given file path. If not found, fall back to get a @@ -109,6 +111,10 @@ bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) { HdfsServiceEndpoint HdfsFileSystem::getServiceEndpoint( const std::string_view filePath, const config::ConfigBase* config) { + if (filePath.find(kViewfsScheme) == 0) { + return HdfsServiceEndpoint{"viewfs", "", true}; + } + auto endOfIdentityInfo = filePath.find('/', kScheme.size()); std::string hdfsIdentity{ filePath.data(), kScheme.size(), endOfIdentityInfo - kScheme.size()}; diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index a3e9a8efe5d1..77565a532a4f 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -22,8 +22,11 @@ class LibHdfsShim; namespace facebook::velox::filesystems { struct HdfsServiceEndpoint { - HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort) - : host(hdfsHost), port(hdfsPort) {} + HdfsServiceEndpoint( + const std::string& hdfsHost, + const std::string& hdfsPort, + bool isViewfs = false) + : host(hdfsHost), port(hdfsPort), isViewfs(isViewfs) {} /// In HDFS HA mode, the identity is a nameservice ID with no port, e.g., /// the identity is nameservice_id for @@ -36,6 +39,7 @@ struct HdfsServiceEndpoint { const std::string host; const std::string port; + bool isViewfs; }; /** @@ -98,6 +102,8 @@ class HdfsFileSystem : public FileSystem { static std::string_view kScheme; + static std::string_view kViewfsScheme; + protected: class Impl; std::shared_ptr impl_;