Skip to content

Commit

Permalink
Revert "[GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3 (apach…
Browse files Browse the repository at this point in the history
…e#6172)"

This reverts commit bf3d8d7.
  • Loading branch information
JkSelf committed Nov 18, 2024
1 parent c144443 commit 01eb682
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.10", "libre2.so", false)
loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false)
loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.0", "libre2.so", false)
loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader {
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader {
loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false)
loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends SharedLibraryLoader {
loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false)
loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false)
loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.5", "libre2.so", false)
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends SharedLibraryLoader {
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false)
Expand Down
28 changes: 28 additions & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ macro(add_duckdb)
endif()
endmacro()

macro(find_libhdfs3)
find_package(libhdfs3 CONFIG)
if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
set(LIBHDFS3_LIBRARY HDFS::hdfs3)
else()
find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
find_library(libhdfs3_LIBRARY NAMES hdfs3)
find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR
libhdfs3_LIBRARY)
add_library(HDFS::hdfs3 SHARED IMPORTED)
set_target_properties(
HDFS::hdfs3
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}"
IMPORTED_LOCATION "${libhdfs3_LIBRARY}")
endif()

if(NOT libhdfs3_FOUND)
message(FATAL_ERROR "LIBHDFS3 Library Not Found")
endif()
endmacro()

macro(find_re2)
find_package(re2 CONFIG)
if(re2_FOUND AND TARGET re2::re2)
Expand Down Expand Up @@ -194,6 +216,10 @@ set(VELOX_SRCS
utils/Common.cc
utils/VeloxBatchResizer.cc)

if(ENABLE_HDFS)
list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
endif()

if(ENABLE_S3)
find_package(ZLIB)
endif()
Expand Down Expand Up @@ -330,6 +356,8 @@ endif()

if(ENABLE_HDFS)
add_definitions(-DENABLE_HDFS)
find_libhdfs3()
target_link_libraries(velox PUBLIC HDFS::hdfs3)
endif()

if(ENABLE_S3)
Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"

#ifdef ENABLE_HDFS
#include "utils/HdfsUtils.h"
#endif

using namespace facebook;

namespace gluten {
Expand Down Expand Up @@ -67,6 +71,9 @@ WholeStageResultIterator::WholeStageResultIterator(
scanNodeIds_(scanNodeIds),
scanInfos_(scanInfos),
streamIds_(streamIds) {
#ifdef ENABLE_HDFS
gluten::updateHdfsTokens(veloxCfg_.get());
#endif
spillStrategy_ = veloxCfg_->get<std::string>(kSpillStrategy, kSpillStrategyDefaultValue);
auto spillThreadNum = veloxCfg_->get<uint32_t>(kSpillThreadNum, kSpillThreadNumDefaultValue);
if (spillThreadNum > 0) {
Expand Down
22 changes: 22 additions & 0 deletions cpp/velox/utils/HdfsUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <velox/common/config/Config.h>
#include <memory>
namespace gluten {
void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg);
}
22 changes: 21 additions & 1 deletion ep/build-velox/src/modify_velox.patch
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,27 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7fd99b6dc..e7e03a800 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -428,7 +428,7 @@ velox_resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS)
endif()

if(VELOX_ENABLE_HDFS)
- find_library(
- LIBHDFS3
- NAMES libhdfs3.so libhdfs3.dylib
- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
+ find_package(libhdfs3)
+ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
+ set(LIBHDFS3 HDFS::hdfs3)
+ else()
+ find_library(
+ LIBHDFS3
+ NAMES libhdfs3.so libhdfs3.dylib
+ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
+ endif()
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()

@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
# for reference. find_package(range-v3)

velox_set_source(gflags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getProperties: Map[String, String] = Map.empty

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos(): Seq[SplitInfo] = {
def getSplitInfos: Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -129,10 +128,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)

val sparkConf: SparkConf = sparkContext.getConf

val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)

val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo

@transient
Expand Down Expand Up @@ -294,33 +289,11 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
if (GlutenConfig.getConf.enableHdfsViewfs) {
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
}
splitInfo.setPaths(newPaths.asJava)
}
}
}
}

val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
allScanSplitInfos,
basicScanExecTransformers)

val rdd = new GlutenWholeStageColumnarRDD(
sparkContext,
inputPartitions,
Expand Down Expand Up @@ -424,8 +397,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
// p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
val allScanSplitInfos =
allScanPartitions.zip(basicScanExecTransformers).map {
case (partition, transformer) =>
transformer.getSplitInfosFromPartitions(partition)
case (partition, transformer) => transformer.getSplitInfosFromPartitions(partition)
}
val partitionLength = allScanSplitInfos.head.size
if (allScanSplitInfos.exists(_.size != partitionLength)) {
Expand Down

0 comments on commit 01eb682

Please sign in to comment.