Skip to content

Commit

Permalink
support read data from abfs with account key
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangyang Gao committed Dec 8, 2023
1 parent 305924a commit abbbd24
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 5 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ option(ENABLE_GCS "Enable GCS" OFF)
option(ENABLE_S3 "Enable S3" OFF)
option(ENABLE_HDFS "Enable HDFS" OFF)
option(ENABLE_ORC "Enable ORC" OFF)
option(VELOX_ENABLE_ABFS "Enable ABFS" OFF)

set(root_directory ${PROJECT_BINARY_DIR})
get_filename_component(GLUTEN_HOME ${CMAKE_SOURCE_DIR} DIRECTORY)
Expand Down
9 changes: 8 additions & 1 deletion cpp/compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ENABLE_HBM=OFF
ENABLE_GCS=OFF
ENABLE_S3=OFF
ENABLE_HDFS=OFF
ENABLE_ABFS=OFF
VELOX_HOME=
NPROC=$(nproc --ignore=2)

Expand Down Expand Up @@ -81,6 +82,10 @@ for arg in "$@"; do
ENABLE_S3=("${arg#*=}")
shift # Remove argument name from processing
;;
--enable_abfs=*)
ENABLE_ABFS=("${arg#*=}")
shift # Remove argument name from processing
;;
--enable_hdfs=*)
ENABLE_HDFS=("${arg#*=}")
shift # Remove argument name from processing
Expand Down Expand Up @@ -116,6 +121,7 @@ echo "BUILD_PROTOBUF=${BUILD_PROTOBUF}"
echo "ENABLE_GCS=${ENABLE_GCS}"
echo "ENABLE_S3=${ENABLE_S3}"
echo "ENABLE_HDFS=${ENABLE_HDFS}"
echo "ENABLE_ABFS=${ENABLE_ABFS}"

if [ -d build ]; then
rm -r build
Expand All @@ -135,5 +141,6 @@ cmake .. \
-DENABLE_HBM=${ENABLE_HBM} \
-DENABLE_GCS=${ENABLE_GCS} \
-DENABLE_S3=${ENABLE_S3} \
-DENABLE_HDFS=${ENABLE_HDFS}
-DENABLE_HDFS=${ENABLE_HDFS} \
-DENABLE_ABFS=${ENABLE_ABFS}
make -j$NPROC
14 changes: 13 additions & 1 deletion cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ macro(ADD_VELOX_DEPENDENCIES)
if(ENABLE_S3)
add_velox_dependency(connector::hive::s3fs "${VELOX_COMPONENTS_PATH}/connectors/hive/storage_adapters/s3fs/libvelox_s3fs.a")
endif()
if(ENABLE_ABFS)
add_velox_dependency(connector::hive::abfs "${VELOX_COMPONENTS_PATH}/connectors/hive/storage_adapters/abfs/libvelox_abfs.a")
endif()
add_velox_dependency(dwio::dwrf::writer "${VELOX_COMPONENTS_PATH}/dwio/dwrf/writer/libvelox_dwio_dwrf_writer.a")
add_velox_dependency(dwio::dwrf::reader "${VELOX_COMPONENTS_PATH}/dwio/dwrf/reader/libvelox_dwio_dwrf_reader.a")
add_velox_dependency(dwio::dwrf::utils "${VELOX_COMPONENTS_PATH}/dwio/dwrf/utils/libvelox_dwio_dwrf_utils.a")
Expand Down Expand Up @@ -266,7 +269,10 @@ macro(find_gcssdk)
find_package(google_cloud_cpp_storage REQUIRED)
endmacro()


macro(find_azure)
set (CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
endmacro()

# Build Velox backend.
set(VELOX_SRCS
Expand Down Expand Up @@ -393,3 +399,9 @@ endif()
if(BUILD_EXAMPLES)
add_subdirectory(udf/examples)
endif()

if(ENABLE_ABFS)
add_definitions(-DENABLE_ABFS)
find_azure()
target_link_libraries(velox PRIVATE Azure::azure-storage-blobs)
endif()
16 changes: 16 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
#ifdef ENABLE_GCS
#include <fstream>
#endif
#ifdef ENABLE_ABFS
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#endif
#include "config/GlutenConfig.h"
#include "jni/JniFileSystem.h"
#include "operators/functions/SparkTokenizer.h"
Expand Down Expand Up @@ -284,6 +287,19 @@ void VeloxBackend::initConnector(const facebook::velox::Config* conf) {
mutableConf->setValue("hive.s3.path-style-access", pathStyleAccess ? "true" : "false");
#endif

#ifdef ENABLE_ABFS
velox::filesystems::abfs::registerAbfsFileSystem();
const auto& confValue = conf->valuesCopy();
for (auto& [k, v] : confValue) {
if (k.find("fs.azure.account.key") == 0) {
mutableConf->setValue(k, v);
} else if (k.find("spark.hadoop.fs.azure.account.key") == 0) {
constexpr int32_t accountKeyPrefixLength = 13;
mutableConf->setValue(k.substr(accountKeyPrefixLength), v);
}
}
#endif

#ifdef ENABLE_GCS
// https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#api-client-configuration
auto gsStorageRootUrl = conf->get("spark.hadoop.fs.gs.storage.root.url");
Expand Down
11 changes: 8 additions & 3 deletions dev/builddeps-veloxbe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ENABLE_HBM=OFF
ENABLE_GCS=OFF
ENABLE_S3=OFF
ENABLE_HDFS=OFF
ENABLE_ABFS=OFF
ENABLE_EP_CACHE=OFF
ARROW_ENABLE_CUSTOM_CODEC=OFF
ENABLE_VCPKG=OFF
Expand Down Expand Up @@ -77,6 +78,10 @@ do
ENABLE_HDFS=("${arg#*=}")
shift # Remove argument name from processing
;;
--enable_abfs=*)
ENABLE_ABFS=("${arg#*=}")
shift # Remove argument name from processing
;;
--enable_ep_cache=*)
ENABLE_EP_CACHE=("${arg#*=}")
shift # Remove argument name from processing
Expand All @@ -100,8 +105,8 @@ fi

##install velox
cd $GLUTEN_DIR/ep/build-velox/src
./get_velox.sh --enable_hdfs=$ENABLE_HDFS --build_protobuf=$BUILD_PROTOBUF --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS
./build_velox.sh --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS --build_type=$BUILD_TYPE --enable_hdfs=$ENABLE_HDFS \
./get_velox.sh --enable_hdfs=$ENABLE_HDFS --build_protobuf=$BUILD_PROTOBUF --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS --enable_abfs=$ENABLE_ABFS
./build_velox.sh --enable_s3=$ENABLE_S3 --enable_gcs=$ENABLE_GCS --build_type=$BUILD_TYPE --enable_hdfs=$ENABLE_HDFS --enable_abfs=$ENABLE_ABFS \
--enable_ep_cache=$ENABLE_EP_CACHE --build_tests=$BUILD_TESTS --build_benchmarks=$BUILD_BENCHMARKS

## compile gluten cpp
Expand All @@ -111,5 +116,5 @@ mkdir build
cd build
cmake -DBUILD_VELOX_BACKEND=ON -DCMAKE_BUILD_TYPE=$BUILD_TYPE \
-DBUILD_TESTS=$BUILD_TESTS -DBUILD_EXAMPLES=$BUILD_EXAMPLES -DBUILD_BENCHMARKS=$BUILD_BENCHMARKS -DBUILD_JEMALLOC=$BUILD_JEMALLOC \
-DENABLE_HBM=$ENABLE_HBM -DENABLE_QAT=$ENABLE_QAT -DENABLE_IAA=$ENABLE_IAA -DENABLE_GCS=$ENABLE_GCS -DENABLE_S3=$ENABLE_S3 -DENABLE_HDFS=$ENABLE_HDFS ..
-DENABLE_HBM=$ENABLE_HBM -DENABLE_QAT=$ENABLE_QAT -DENABLE_IAA=$ENABLE_IAA -DENABLE_GCS=$ENABLE_GCS -DENABLE_S3=$ENABLE_S3 -DENABLE_HDFS=$ENABLE_HDFS -DENABLE_ABFS=$ENABLE_ABFS ..
make -j
10 changes: 10 additions & 0 deletions ep/build-velox/src/build_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ ENABLE_S3=OFF
ENABLE_GCS=OFF
#Set on run gluten on HDFS
ENABLE_HDFS=OFF
#Set on run gluten on ABFS
ENABLE_ABFS=OFF
BUILD_TYPE=release
VELOX_HOME=""
ENABLE_EP_CACHE=OFF
Expand Down Expand Up @@ -50,6 +52,10 @@ for arg in "$@"; do
ENABLE_HDFS=("${arg#*=}")
shift # Remove argument name from processing
;;
--enable_abfs=*)
ENABLE_ABFS=("${arg#*=}")
shift # Remove argument name from processing
;;
--build_type=*)
BUILD_TYPE=("${arg#*=}")
shift # Remove argument name from processing
Expand Down Expand Up @@ -99,6 +105,9 @@ function compile {
if [ $ENABLE_HDFS == "ON" ]; then
COMPILE_OPTION="$COMPILE_OPTION -DVELOX_ENABLE_HDFS=ON"
fi
if [ $ENABLE_ABFS == "ON" ]; then
COMPILE_OPTION="$COMPILE_OPTION -DVELOX_ENABLE_ABFS=ON"
fi
if [ $ENABLE_S3 == "ON" ]; then
COMPILE_OPTION="$COMPILE_OPTION -DVELOX_ENABLE_S3=ON"
fi
Expand Down Expand Up @@ -247,6 +256,7 @@ echo "VELOX_HOME=${VELOX_HOME}"
echo "ENABLE_S3=${ENABLE_S3}"
echo "ENABLE_GCS=${ENABLE_GCS}"
echo "ENABLE_HDFS=${ENABLE_HDFS}"
echo "ENABLE_ABFS=${ENABLE_ABFS}"
echo "BUILD_TYPE=${BUILD_TYPE}"

cd ${VELOX_HOME}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ object GlutenConfig {

// Hardware acceleraters backend
val GLUTEN_SHUFFLE_CODEC_BACKEND = "spark.gluten.sql.columnar.shuffle.codecBackend"
// ABFS config
val ABFS_ACCOUNT_KEY = "hadoop.fs.azure.account.key"
val SPARK_ABFS_ACCOUNT_KEY: String = "spark." + ABFS_ACCOUNT_KEY

// QAT config
val GLUTEN_QAT_BACKEND_NAME = "qat"
val GLUTEN_QAT_SUPPORTED_CODEC: Set[String] = Set("gzip", "zstd")
Expand Down Expand Up @@ -539,6 +543,10 @@ object GlutenConfig {
.filter(_._1.startsWith(HADOOP_PREFIX + S3A_PREFIX))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))

conf
.filter(_._1.startsWith(SPARK_ABFS_ACCOUNT_KEY))
.foreach(entry => nativeConfMap.put(entry._1, entry._2))

// return
nativeConfMap
}
Expand Down

0 comments on commit abbbd24

Please sign in to comment.