Skip to content

Commit

Permalink
Support viewfs connect in Gluten
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Apr 25, 2024
1 parent c3c25ec commit 37d8c8e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{BinaryType, DateType, Decimal, DecimalType, StructType, TimestampType}
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ExecutorManager
import org.apache.spark.util.{ExecutorManager, SerializableConfiguration}

import org.apache.hadoop.fs.{FileSystem, Path}

import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
Expand All @@ -55,11 +57,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo = {
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
partition match {
case f: FilePartition =>
val (paths, starts, lengths, partitionColumns, metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
Expand Down Expand Up @@ -98,7 +101,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
metadataColumnNames: Seq[String]) = {
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
Expand All @@ -108,9 +112,15 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder
var filePath = file.filePath.toString
if (filePath.startsWith("viewfs")) {
val viewPath = new Path(filePath)
val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
filePath = viewFileSystem.resolvePath(viewPath).toString
}
paths.add(
GlutenURLDecoder
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
.decode(filePath, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val metadataColumn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

trait IteratorApi {

def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists
import com.google.protobuf.StringValue
Expand Down Expand Up @@ -65,14 +66,21 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getProperties: Map[String, String] = Map.empty

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos: Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
}

def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name)))
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
serializableHadoopConf))
}

def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
Expand All @@ -89,7 +97,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD(
sparkContext,
WholeStageTransformContext(planNode, substraitContext),
getSplitInfos,
getSplitInfos(new SerializableConfiguration(sparkContext.hadoopConfiguration)),
this,
numOutputRows,
numOutputVectors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists

Expand Down Expand Up @@ -112,6 +113,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)

val sparkConf: SparkConf = sparkContext.getConf
val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)
val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo
val substraitPlanLogLevel: String = GlutenConfig.getConf.substraitPlanLogLevel

Expand Down Expand Up @@ -270,7 +273,10 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
getSplitInfosFromPartitions(
basicScanExecTransformers,
allScanPartitions,
serializableHadoopConf)

val (wsCtx, inputPartitions) = GlutenTimeMetric.withMillisTime {
val wsCtx = doWholeStageTransform()
Expand Down Expand Up @@ -365,7 +371,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

private def getSplitInfosFromPartitions(
basicScanExecTransformers: Seq[BasicScanExecTransformer],
allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
allScanPartitions: Seq[Seq[InputPartition]],
serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = {
// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted. We should combine the two scan
// transformers' partitions with same index, and set them together in
Expand All @@ -383,7 +390,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
// p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
val allScanSplitInfos =
allScanPartitions.zip(basicScanExecTransformers).map {
case (partition, transformer) => transformer.getSplitInfosFromPartitions(partition)
case (partition, transformer) =>
transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf)
}
val partitionLength = allScanSplitInfos.head.size
if (allScanSplitInfos.exists(_.size != partitionLength)) {
Expand Down

0 comments on commit 37d8c8e

Please sign in to comment.