Skip to content

Commit

Permalink
Add config to support viewfs in Gluten. (apache#7892)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored Nov 18, 2024
1 parent 3941a2d commit c144443
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import java.lang.{Long => JLong}
import java.net.URI
Expand Down Expand Up @@ -133,8 +132,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos(null).size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(conf._2)(plans.head.getSplitInfos(null).size)
assertResult(conf._2)(plans.head.getSplitInfos.size)
}
}
})
Expand All @@ -1831,7 +1831,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos(null).size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down Expand Up @@ -1939,7 +1939,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
assertResult(conf._2)(scanExec(1).getSplitInfos.size)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, SparkDirectoryUtil}

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}

import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
Expand All @@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
Expand All @@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
modificationTimes,
partitionColumns,
metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf)
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
Expand Down Expand Up @@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration) = {
metadataColumnNames: Seq[String]) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
Expand All @@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder
var filePath = file.filePath.toString
if (filePath.startsWith("viewfs")) {
val viewPath = new Path(filePath)
val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
filePath = viewFileSystem.resolvePath(viewPath).toString
}
paths.add(
GlutenURLDecoder
.decode(filePath, StandardCharsets.UTF_8.name()))
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil

Expand Down Expand Up @@ -59,9 +58,7 @@ case class IcebergScanTransformer(

override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan)

override def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
Expand Down Expand Up @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
Expand Down Expand Up @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
case _ => // do nothing
}
checkLengthAndPlan(df, 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public List<String> getPaths() {
return paths;
}

public void setPaths(List<String> newPaths) {
paths.clear();
paths.addAll(newPaths);
}

public void setFileSchema(StructType schema) {
this.fileSchema = schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

trait IteratorApi {

Expand All @@ -38,8 +37,7 @@ trait IteratorApi {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo
properties: Map[String, String]): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -63,22 +62,19 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getProperties: Map[String, String] = Map.empty

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
def getSplitInfos(): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
}

def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
getProperties,
serializableHadoopConf))
getProperties))
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater}
import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
import org.apache.gluten.substrait.rel.{RelNode, SplitInfo}
import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
import org.apache.gluten.utils.SubstraitPlanPrinterUtil

import org.apache.spark._
Expand All @@ -43,7 +43,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -127,8 +129,10 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)

val sparkConf: SparkConf = sparkContext.getConf

val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)

val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo

@transient
Expand Down Expand Up @@ -289,10 +293,28 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
getSplitInfosFromPartitions(
basicScanExecTransformers,
allScanPartitions,
serializableHadoopConf)
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
if (GlutenConfig.getConf.enableHdfsViewfs) {
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
}
splitInfo.setPaths(newPaths.asJava)
}
}
}
}

val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
Expand Down Expand Up @@ -384,8 +406,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

private def getSplitInfosFromPartitions(
basicScanExecTransformers: Seq[BasicScanExecTransformer],
allScanPartitions: Seq[Seq[InputPartition]],
serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = {
allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted. We should combine the two scan
// transformers' partitions with same index, and set them together in
Expand All @@ -404,7 +425,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val allScanSplitInfos =
allScanPartitions.zip(basicScanExecTransformers).map {
case (partition, transformer) =>
transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf)
transformer.getSplitInfosFromPartitions(partition)
}
val partitionLength = allScanSplitInfos.head.size
if (allScanSplitInfos.exists(_.size != partitionLength)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)

def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)

def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
}

object GlutenConfig {
Expand Down Expand Up @@ -2193,4 +2195,11 @@ object GlutenConfig {
"Otherwise, do nothing.")
.booleanConf
.createWithDefault(false)

val HDFS_VIEWFS_ENABLED =
buildStaticConf("spark.gluten.storage.hdfsViewfs.enabled")
.internal()
.doc("If enabled, gluten will convert the viewfs path to hdfs path in scala side")
.booleanConf
.createWithDefault(false)
}

0 comments on commit c144443

Please sign in to comment.