Skip to content

Commit

Permalink
Support viewfs file system in velox
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Dec 10, 2024
1 parent 1bd480e commit 18270d2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
26 changes: 16 additions & 10 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
namespace facebook::velox::filesystems {
std::string_view HdfsFileSystem::kScheme("hdfs://");

std::string_view HdfsFileSystem::kViewfsScheme("viewfs://");

class HdfsFileSystem::Impl {
public:
// Keep config here for possible use in the future.
Expand All @@ -35,8 +37,15 @@ class HdfsFileSystem::Impl {

// connect to HDFS with the builder object
hdfsBuilder* builder = driver_->NewBuilder();
driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
if (endpoint.isViewfs) {
// The default NameNode configuration will be used (from the XML
// configuration files). See:
// https://github.com/facebookincubator/velox/blob/main/velox/external/hdfs/hdfs.h#L289
driver_->BuilderSetNameNode(builder, "default");
} else {
driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
}
driver_->BuilderSetForceNewInstance(builder);
hdfsClient_ = driver_->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
Expand Down Expand Up @@ -82,13 +91,6 @@ std::string HdfsFileSystem::name() const {
std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
}
if (auto index = path.find('/')) {
path.remove_prefix(index);
}

return std::make_unique<HdfsReadFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
}
Expand All @@ -101,14 +103,18 @@ std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
}

bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) {
return filePath.find(kScheme) == 0;
return (filePath.find(kScheme) == 0) || (filePath.find(kViewfsScheme) == 0);
}

/// Gets hdfs endpoint from a given file path. If not found, fall back to get a
/// fixed one from configuration.
HdfsServiceEndpoint HdfsFileSystem::getServiceEndpoint(
const std::string_view filePath,
const config::ConfigBase* config) {
if (filePath.find(kViewfsScheme) == 0) {
return HdfsServiceEndpoint{"viewfs", "", true};
}

auto endOfIdentityInfo = filePath.find('/', kScheme.size());
std::string hdfsIdentity{
filePath.data(), kScheme.size(), endOfIdentityInfo - kScheme.size()};
Expand Down
10 changes: 8 additions & 2 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ class LibHdfsShim;
namespace facebook::velox::filesystems {

struct HdfsServiceEndpoint {
HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort)
: host(hdfsHost), port(hdfsPort) {}
HdfsServiceEndpoint(
const std::string& hdfsHost,
const std::string& hdfsPort,
bool isViewfs = false)
: host(hdfsHost), port(hdfsPort), isViewfs(isViewfs) {}

/// In HDFS HA mode, the identity is a nameservice ID with no port, e.g.,
/// the identity is nameservice_id for
Expand All @@ -36,6 +39,7 @@ struct HdfsServiceEndpoint {

const std::string host;
const std::string port;
bool isViewfs;
};

/**
Expand Down Expand Up @@ -98,6 +102,8 @@ class HdfsFileSystem : public FileSystem {

static std::string_view kScheme;

static std::string_view kViewfsScheme;

protected:
class Impl;
std::shared_ptr<Impl> impl_;
Expand Down

0 comments on commit 18270d2

Please sign in to comment.