Skip to content

Commit

Permalink
[GLUTEN-4811][VL] Abfs FileSink Onboard (#5527)
Browse files Browse the repository at this point in the history
Support ABFS write
  • Loading branch information
gaoyangxiaozhu authored May 7, 2024
1 parent eaa2761 commit fe04e77
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
11 changes: 11 additions & 0 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
#include "operators/writer/VeloxParquetDatasourceGCS.h"
#endif

#ifdef ENABLE_ABFS
#include "operators/writer/VeloxParquetDatasourceABFS.h"
#endif

using namespace facebook;

namespace gluten {
Expand Down Expand Up @@ -218,6 +222,13 @@ std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
#else
throw std::runtime_error(
"The write path is GCS path but the GCS haven't been enabled when writing parquet data in velox runtime!");
#endif
} else if (isSupportedABFSPath(filePath)) {
#ifdef ENABLE_ABFS
return std::make_shared<VeloxParquetDatasourceABFS>(filePath, veloxPool, sinkPool, schema);
#else
throw std::runtime_error(
"The write path is ABFS path but the ABFS haven't been enabled when writing parquet data in velox runtime!");
#endif
}
return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool, sinkPool, schema);
Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/operators/writer/VeloxParquetDatasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsUtil.h"
#endif
#ifdef ENABLE_ABFS
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#endif
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
Expand Down Expand Up @@ -72,6 +75,10 @@ inline bool isSupportedHDFSPath(const std::string& filePath) {
return strncmp(filePath.c_str(), "hdfs:", 5) == 0;
}

inline bool isSupportedABFSPath(const std::string& filePath) {
return strncmp(filePath.c_str(), "abfs:", 5) == 0 || strncmp(filePath.c_str(), "abfss:", 6) == 0;
}

class VeloxParquetDatasource : public Datasource {
public:
VeloxParquetDatasource(
Expand Down
55 changes: 55 additions & 0 deletions cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "operators/writer/VeloxParquetDatasource.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"

#include <string>

#include "arrow/c/bridge.h"
#include "compute/VeloxRuntime.h"

#include "velox/common/compression/Compression.h"
#include "velox/core/QueryConfig.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/common/Options.h"

namespace gluten {

class VeloxParquetDatasourceABFS final : public VeloxParquetDatasource {
public:
VeloxParquetDatasourceABFS(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
: VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
void init(const std::unordered_map<std::string, std::string>& sparkConfs) override {
auto confs = std::make_shared<facebook::velox::core::MemConfigMutable>(sparkConfs);
auto hiveConfs = getHiveConfig(confs);
auto fileSystem = filesystems::getFileSystem(
filePath_, std::make_shared<facebook::velox::core::MemConfig>(hiveConfs->valuesCopy()));
auto* abfsFileSystem = dynamic_cast<filesystems::abfs::AbfsFileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
abfsFileSystem->openFileForWrite(filePath_, {{}, sinkPool_.get()}), filePath_);
VeloxParquetDatasource::init(sparkConfs);
}
};
} // namespace gluten

0 comments on commit fe04e77

Please sign in to comment.