diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala index b9d90d589c02..a5e638d9be28 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala @@ -37,6 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader { loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false) loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.10", "libre2.so", false) loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false) loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala index dbb7d59f889d..5d8c18b8bbf1 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala @@ -42,6 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader { loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false) loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.0", "libre2.so", false) loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala index f633a79f8fcd..694cf4c622f5 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala @@ -42,6 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader { loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false) loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.9", "libre2.so", false) loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala index 4f9b6fdd7250..6927f2539ef0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala @@ -44,5 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader { loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala index e3967eea3267..ce01f4399f4f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala @@ -50,5 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader { loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false) loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala index 6d9271e9e3d9..79c0518ea3b9 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala @@ -57,6 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends SharedLibraryLoader { loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false) loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false) loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.5", "libre2.so", false) loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala index 95f7db5655b2..a5d99ede42b9 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala @@ -42,6 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends SharedLibraryLoader { loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false) loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false) loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false) + loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false) loader.loadAndCreateLink("libre2.so.9", "libre2.so", false) loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false) loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 3ed7574d1e4c..f30bb828a484 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -109,6 +109,28 @@ macro(add_duckdb) endif() endmacro() +macro(find_libhdfs3) + find_package(libhdfs3 CONFIG) + if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) + set(LIBHDFS3_LIBRARY HDFS::hdfs3) + else() + find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h) + set(CMAKE_FIND_LIBRARY_SUFFIXES ".so") + find_library(libhdfs3_LIBRARY NAMES hdfs3) + find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR + libhdfs3_LIBRARY) + add_library(HDFS::hdfs3 SHARED IMPORTED) + set_target_properties( + HDFS::hdfs3 + PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}" + IMPORTED_LOCATION "${libhdfs3_LIBRARY}") + endif() + + if(NOT libhdfs3_FOUND) + message(FATAL_ERROR "LIBHDFS3 Library Not Found") + endif() +endmacro() + macro(find_re2) find_package(re2 CONFIG) if(re2_FOUND AND TARGET re2::re2) @@ -194,6 +216,10 @@ set(VELOX_SRCS utils/Common.cc utils/VeloxBatchResizer.cc) +if(ENABLE_HDFS) + list(APPEND VELOX_SRCS utils/HdfsUtils.cc) +endif() + if(ENABLE_S3) find_package(ZLIB) endif() @@ -330,6 +356,8 @@ endif() if(ENABLE_HDFS) add_definitions(-DENABLE_HDFS) + find_libhdfs3() + target_link_libraries(velox PUBLIC HDFS::hdfs3) endif() if(ENABLE_S3) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b6ecbd959f09..0d9ae99fa1d5 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -22,6 +22,10 @@ #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/PlanNodeStats.h" +#ifdef ENABLE_HDFS +#include "utils/HdfsUtils.h" +#endif + using namespace facebook; namespace gluten { @@ -67,6 +71,9 @@ WholeStageResultIterator::WholeStageResultIterator( scanNodeIds_(scanNodeIds), scanInfos_(scanInfos), streamIds_(streamIds) { +#ifdef ENABLE_HDFS + gluten::updateHdfsTokens(veloxCfg_.get()); +#endif spillStrategy_ = veloxCfg_->get(kSpillStrategy, kSpillStrategyDefaultValue); auto spillThreadNum = veloxCfg_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); if (spillThreadNum > 0) { diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h new file mode 100644 index 000000000000..2e07d7ddf41b --- /dev/null +++ b/cpp/velox/utils/HdfsUtils.h @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +namespace gluten { +void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg); +} diff --git a/ep/build-velox/src/modify_velox.patch b/ep/build-velox/src/modify_velox.patch index bf801ead83ef..33e698857fa9 100644 --- a/ep/build-velox/src/modify_velox.patch +++ b/ep/build-velox/src/modify_velox.patch @@ -13,7 +13,27 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt index 7fd99b6dc..e7e03a800 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt -@@ -428,7 +428,7 @@ velox_resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES}) +@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS) + endif() + + if(VELOX_ENABLE_HDFS) +- find_library( +- LIBHDFS3 +- NAMES libhdfs3.so libhdfs3.dylib +- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) ++ find_package(libhdfs3) ++ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3) ++ set(LIBHDFS3 HDFS::hdfs3) ++ else() ++ find_library( ++ LIBHDFS3 ++ NAMES libhdfs3.so libhdfs3.dylib ++ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED) ++ endif() + add_definitions(-DVELOX_ENABLE_HDFS3) + endif() + +@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES}) # for reference. find_package(range-v3) velox_set_source(gflags) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 73ed35e7190b..ac87681ab8b9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -62,7 +62,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos(): Seq[SplitInfo] = { + def getSplitInfos: Seq[SplitInfo] = { getSplitInfosFromPartitions(getPartitions) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index beb7fe5f99d2..01b9ff9319b3 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists import org.apache.hadoop.fs.{FileSystem, Path} @@ -129,10 +128,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext) val sparkConf: SparkConf = sparkContext.getConf - - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) - val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo @transient @@ -294,33 +289,11 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) val allScanSplitInfos = getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) - if (GlutenConfig.getConf.enableHdfsViewfs) { - allScanSplitInfos.foreach { - splitInfos => - splitInfos.foreach { - case splitInfo: LocalFilesNode => - val paths = splitInfo.getPaths.asScala - if (paths.nonEmpty && paths.head.startsWith("viewfs")) { - // Convert the viewfs path into hdfs - val newPaths = paths.map { - viewfsPath => - val viewPath = new Path(viewfsPath) - val viewFileSystem = - FileSystem.get(viewPath.toUri, serializableHadoopConf.value) - viewFileSystem.resolvePath(viewPath).toString - } - splitInfo.setPaths(newPaths.asJava) - } - } - } - } - val inputPartitions = BackendsApiManager.getIteratorApiInstance.genPartitions( wsCtx, allScanSplitInfos, basicScanExecTransformers) - val rdd = new GlutenWholeStageColumnarRDD( sparkContext, inputPartitions, @@ -424,8 +397,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) val allScanSplitInfos = allScanPartitions.zip(basicScanExecTransformers).map { - case (partition, transformer) => - transformer.getSplitInfosFromPartitions(partition) + case (partition, transformer) => transformer.getSplitInfosFromPartitions(partition) } val partitionLength = allScanSplitInfos.head.size if (allScanSplitInfos.exists(_.size != partitionLength)) {