Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5103][VL] Support JVM libhdfs in velox #5384

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ jobs:
- name: Build Gluten Velox third party
if: ${{ steps.cache.outputs.cache-hit != 'true' }}
run: |
yum update -y && yum install -y java-1.8.0-openjdk-devel wget
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk

echo "JAVA_HOME: $JAVA_HOME"

wget https://archive.apache.org/dist/hadoop/core/hadoop-2.10.1/hadoop-2.10.1.tar.gz
tar xf hadoop-2.10.1.tar.gz -C /usr/local/
export HADOOP_HOME='/usr/local/hadoop-2.10.1'
echo "HADOOP_HOME: $HADOOP_HOME"

source dev/ci-velox-buildstatic.sh
- name: Upload Artifact Native
uses: actions/upload-artifact@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{BinaryType, DateType, Decimal, DecimalType, StructType, TimestampType}
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ExecutorManager
import org.apache.spark.util.{ExecutorManager, SerializableConfiguration}

import org.apache.hadoop.fs.{FileSystem, Path}

import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
Expand All @@ -53,7 +55,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo = {
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
partition match {
case f: FilePartition =>
val (
Expand All @@ -64,7 +67,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
modificationTimes,
partitionColumns,
metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
Expand Down Expand Up @@ -105,7 +108,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
metadataColumnNames: Seq[String]) = {
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
Expand All @@ -117,9 +121,15 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder
var filePath = file.filePath.toString
if (filePath.startsWith("viewfs")) {
val viewPath = new Path(filePath)
val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
filePath = viewFileSystem.resolvePath(viewPath).toString
}
paths.add(
GlutenURLDecoder
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
.decode(filePath, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader {
.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
.loadAndCreateLink("libre2.so.10", "libre2.so", false)
.loadAndCreateLink("libzstd.so.1", "libzstd.so", false)
.loadAndCreateLink("liblz4.so.1", "liblz4.so", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader {
.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
.loadAndCreateLink("libre2.so.0", "libre2.so", false)
.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
.commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader {
.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
.commit()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader {
.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false)
.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
.commit()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class SharedLibraryLoaderUbuntu2004 extends SharedLibraryLoader {
.loadAndCreateLink("libicudata.so.66", "libicudata.so", false)
.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false)
.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
.loadAndCreateLink("libre2.so.5", "libre2.so", false)
.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SharedLibraryLoaderUbuntu2204 extends SharedLibraryLoader {
.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
.loadAndCreateLink("libre2.so.9", "libre2.so", false)
.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false)
Expand Down
15 changes: 10 additions & 5 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ set(VELOX_SRCS
utils/Common.cc
utils/VeloxBatchAppender.cc)

if(ENABLE_HDFS)
list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
endif()
# if (ENABLE_HDFS)
# list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
# endif ()

if(ENABLE_S3)
find_package(ZLIB)
Expand Down Expand Up @@ -629,8 +629,13 @@ endif()

if(ENABLE_HDFS)
add_definitions(-DENABLE_HDFS)
find_libhdfs3()
target_link_libraries(velox PUBLIC HDFS::hdfs3)
set(HADOOP_HOME $ENV{HADOOP_HOME})
find_library(
LIBHDFS
NAMES libhdfs.so
HINTS "${HADOOP_HOME}/lib/native" REQUIRED)
# find_libhdfs3()
target_link_libraries(velox PUBLIC ${LIBHDFS})
endif()

if(ENABLE_S3)
Expand Down
12 changes: 6 additions & 6 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"

#ifdef ENABLE_HDFS
#include "utils/HdfsUtils.h"
#endif
// #ifdef ENABLE_HDFS
// #include "utils/HdfsUtils.h"
// #endif

using namespace facebook;

Expand Down Expand Up @@ -68,9 +68,9 @@ WholeStageResultIterator::WholeStageResultIterator(
scanNodeIds_(scanNodeIds),
scanInfos_(scanInfos),
streamIds_(streamIds) {
#ifdef ENABLE_HDFS
gluten::updateHdfsTokens(veloxCfg_.get());
#endif
// #ifdef ENABLE_HDFS
// gluten::updateHdfsTokens(veloxCfg_.get());
// #endif
spillStrategy_ = veloxCfg_->get<std::string>(kSpillStrategy, kSpillStrategyDefaultValue);
auto spillThreadNum = veloxCfg_->get<uint32_t>(kSpillThreadNum, kSpillThreadNumDefaultValue);
if (spillThreadNum > 0) {
Expand Down
66 changes: 0 additions & 66 deletions cpp/velox/utils/HdfsUtils.cc

This file was deleted.

22 changes: 0 additions & 22 deletions cpp/velox/utils/HdfsUtils.h

This file was deleted.

2 changes: 1 addition & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_06_14
VELOX_BRANCH=libhdfs
VELOX_HOME=""

#Set on run gluten on HDFS
Expand Down
22 changes: 1 addition & 21 deletions ep/build-velox/src/modify_velox.patch
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,7 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5c7bf770a..9f897f577 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -234,10 +234,15 @@ if(VELOX_ENABLE_ABFS)
endif()

if(VELOX_ENABLE_HDFS)
- find_library(
- LIBHDFS3
- NAMES libhdfs3.so libhdfs3.dylib
- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
+ find_package(libhdfs3)
+ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
+ set(LIBHDFS3 HDFS::hdfs3)
+ else()
+ find_library(
+ LIBHDFS3
+ NAMES libhdfs3.so libhdfs3.dylib
+ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
+ endif()
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()

@@ -377,7 +382,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
# for reference. find_package(range-v3)

set_source(gflags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

trait IteratorApi {

def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue

Expand Down Expand Up @@ -62,14 +63,21 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getProperties: Map[String, String] = Map.empty

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos: Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
}

def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name)))
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
serializableHadoopConf))
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Loading
Loading