Skip to content

Commit

Permalink
[GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3
Browse files Browse the repository at this point in the history
This patch is merged together with the Velox daily rebase update (2024_10_25)

New commits from Velox daily rebase:
3fa7acf3c by Xiaoxuan Meng, Avoid local shuffle and global shuffle use the same hash value to avoid data skew (11338)
19ea693cc by lingbin, Use 'if constexpr' for Buffer::is_pod_like_v<T> (11341)
c5dc89d5a by Jenson, Fix nullable bug of Arrow MapVector in Bridge.cpp (11214)
c14040f2c by Ke, Report inputSizeInBytes in HiveDataSink (11339)
10cdf6fd8 by Jia Ke, Support jvm version libhdfs in velox (9835)
8d77bebbe by Ke, Add single parameter (array<array<T>>) support for array_intersect (11305)
3922aec50 by Xiaoxuan Meng, Fix local file sink to use velox fs as well as fail on exist file (11322)
97aa713cc by Jimmy Lu, Make RowType::nameOf throw Velox exception (11336)
14a74ebde by Duc Nguyen, Move ApproxPercentile intermediate type index to header file (11224)
  • Loading branch information
JkSelf authored Oct 25, 2024
1 parent 3add1fb commit c1ab7b3
Show file tree
Hide file tree
Showing 21 changed files with 60 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import java.lang.{Long => JLong}
import java.net.URI
Expand Down Expand Up @@ -133,7 +134,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos.size)
assertResult(1)(plans.head.getSplitInfos(null).size)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(conf._2)(plans.head.getSplitInfos.size)
assertResult(conf._2)(plans.head.getSplitInfos(null).size)
}
}
})
Expand Down Expand Up @@ -1910,7 +1910,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
assertResult(conf._2)(scanExec(1).getSplitInfos.size)
assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, SparkDirectoryUtil}

import org.apache.hadoop.fs.{FileSystem, Path}

import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
Expand All @@ -55,7 +57,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
partition match {
case f: FilePartition =>
val (
Expand All @@ -66,7 +69,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
modificationTimes,
partitionColumns,
metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
Expand Down Expand Up @@ -109,7 +112,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
metadataColumnNames: Seq[String]) = {
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
Expand All @@ -121,9 +125,15 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder
var filePath = file.filePath.toString
if (filePath.startsWith("viewfs")) {
val viewPath = new Path(filePath)
val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
filePath = viewFileSystem.resolvePath(viewPath).toString
}
paths.add(
GlutenURLDecoder
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
.decode(filePath, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
loader.loadAndCreateLink("libre2.so.10", "libre2.so", false)
loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false)
loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
loader.loadAndCreateLink("libre2.so.0", "libre2.so", false)
loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader {
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader {
loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false)
loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends SharedLibraryLoader {
loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false)
loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false)
loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
loader.loadAndCreateLink("libre2.so.5", "libre2.so", false)
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends SharedLibraryLoader {
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false)
Expand Down
28 changes: 0 additions & 28 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,6 @@ macro(add_duckdb)
endif()
endmacro()

macro(find_libhdfs3)
find_package(libhdfs3 CONFIG)
if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
set(LIBHDFS3_LIBRARY HDFS::hdfs3)
else()
find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
find_library(libhdfs3_LIBRARY NAMES hdfs3)
find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR
libhdfs3_LIBRARY)
add_library(HDFS::hdfs3 SHARED IMPORTED)
set_target_properties(
HDFS::hdfs3
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}"
IMPORTED_LOCATION "${libhdfs3_LIBRARY}")
endif()

if(NOT libhdfs3_FOUND)
message(FATAL_ERROR "LIBHDFS3 Library Not Found")
endif()
endmacro()

macro(find_re2)
find_package(re2 CONFIG)
if(re2_FOUND AND TARGET re2::re2)
Expand Down Expand Up @@ -209,10 +187,6 @@ set(VELOX_SRCS
utils/Common.cc
utils/VeloxBatchResizer.cc)

if(ENABLE_HDFS)
list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
endif()

if(ENABLE_S3)
find_package(ZLIB)
endif()
Expand Down Expand Up @@ -336,8 +310,6 @@ endif()

if(ENABLE_HDFS)
add_definitions(-DENABLE_HDFS)
find_libhdfs3()
target_link_libraries(velox PUBLIC HDFS::hdfs3)
endif()

if(ENABLE_S3)
Expand Down
7 changes: 0 additions & 7 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"

#ifdef ENABLE_HDFS
#include "utils/HdfsUtils.h"
#endif

using namespace facebook;

namespace gluten {
Expand Down Expand Up @@ -70,9 +66,6 @@ WholeStageResultIterator::WholeStageResultIterator(
scanNodeIds_(scanNodeIds),
scanInfos_(scanInfos),
streamIds_(streamIds) {
#ifdef ENABLE_HDFS
gluten::updateHdfsTokens(veloxCfg_.get());
#endif
spillStrategy_ = veloxCfg_->get<std::string>(kSpillStrategy, kSpillStrategyDefaultValue);
auto spillThreadNum = veloxCfg_->get<uint32_t>(kSpillThreadNum, kSpillThreadNumDefaultValue);
if (spillThreadNum > 0) {
Expand Down
22 changes: 0 additions & 22 deletions cpp/velox/utils/HdfsUtils.h

This file was deleted.

2 changes: 1 addition & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_10_24
VELOX_BRANCH=2024_10_25
VELOX_HOME=""

OS=`uname -s`
Expand Down
22 changes: 1 addition & 21 deletions ep/build-velox/src/modify_velox.patch
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,7 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7f7cbc92f..52adb1250 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS)
endif()

if(VELOX_ENABLE_HDFS)
- find_library(
- LIBHDFS3
- NAMES libhdfs3.so libhdfs3.dylib
- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
+ find_package(libhdfs3)
+ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
+ set(LIBHDFS3 HDFS::hdfs3)
+ else()
+ find_library(
+ LIBHDFS3
+ NAMES libhdfs3.so libhdfs3.dylib
+ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
+ endif()
add_definitions(-DVELOX_ENABLE_HDFS3)
endif()

@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS ${BOOST_INCLUDE_LIBRARIES})
# for reference. find_package(range-v3)

set_source(gflags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil

Expand Down Expand Up @@ -58,7 +59,9 @@ case class IcebergScanTransformer(

override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan)

override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
override def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
Expand Down Expand Up @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
Expand Down Expand Up @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1)
case _ => // do nothing
}
checkLengthAndPlan(df, 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

trait IteratorApi {

Expand All @@ -37,7 +38,8 @@ trait IteratorApi {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -62,19 +63,22 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getProperties: Map[String, String] = Map.empty

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos: Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
}

def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
getProperties))
getProperties,
serializableHadoopConf))
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Loading

0 comments on commit c1ab7b3

Please sign in to comment.