diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 5b09189767075..f597907d8f18b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{BinaryType, DateType, Decimal, DecimalType, StructType, TimestampType} import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.ExecutorManager +import org.apache.spark.util.{ExecutorManager, SerializableConfiguration} + +import org.apache.hadoop.fs.{FileSystem, Path} import java.lang.{Long => JLong} import java.nio.charset.StandardCharsets @@ -55,11 +57,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo = { + metadataColumnNames: Seq[String], + serializableHadoopConf: SerializableConfiguration): SplitInfo = { partition match { case f: FilePartition => val (paths, starts, lengths, partitionColumns, metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames) + constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) LocalFilesBuilder.makeLocalFiles( @@ -98,7 +101,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], - metadataColumnNames: Seq[String]) = { + metadataColumnNames: Seq[String], + serializableHadoopConf: SerializableConfiguration) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() @@ -108,9 +112,15 @@ class VeloxIteratorApi extends IteratorApi with Logging { file => // The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded // path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder + var filePath = file.filePath.toString + if (filePath.startsWith("viewfs")) { + val viewPath = new Path(filePath) + val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value) + filePath = viewFileSystem.resolvePath(viewPath).toString + } paths.add( GlutenURLDecoder - .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) + .decode(filePath, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) val metadataColumn = diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index d999948d70478..d1a683454b19e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration trait IteratorApi { @@ -37,7 +38,8 @@ trait IteratorApi { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo + metadataColumnNames: Seq[String], + serializableHadoopConf: SerializableConfiguration): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index b0bc0ea7b27de..ae21fe302c4b8 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists import com.google.protobuf.StringValue @@ -65,14 +66,21 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos: Seq[SplitInfo] = { - getSplitInfosFromPartitions(getPartitions) + def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + getSplitInfosFromPartitions(getPartitions, serializableHadoopConf) } - def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + def getSplitInfosFromPartitions( + partitions: Seq[InputPartition], + serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance - .genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name))) + .genSplitInfo( + _, + getPartitionSchema, + fileFormat, + getMetadataColumns.map(_.name), + serializableHadoopConf)) } def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { @@ -89,7 +97,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD( sparkContext, WholeStageTransformContext(planNode, substraitContext), - getSplitInfos, + getSplitInfos(new SerializableConfiguration(sparkContext.hadoopConfiguration)), this, numOutputRows, numOutputVectors, diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index b809ac4bf1a4c..b5962b37f1672 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists @@ -112,6 +113,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext) val sparkConf: SparkConf = sparkContext.getConf + val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( + sparkContext.hadoopConfiguration) val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo val substraitPlanLogLevel: String = GlutenConfig.getConf.substraitPlanLogLevel @@ -270,7 +273,10 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f */ val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) val allScanSplitInfos = - getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) + getSplitInfosFromPartitions( + basicScanExecTransformers, + allScanPartitions, + serializableHadoopConf) val (wsCtx, inputPartitions) = GlutenTimeMetric.withMillisTime { val wsCtx = doWholeStageTransform() @@ -365,7 +371,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f private def getSplitInfosFromPartitions( basicScanExecTransformers: Seq[BasicScanExecTransformer], - allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = { + allScanPartitions: Seq[Seq[InputPartition]], + serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = { // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. We should combine the two scan // transformers' partitions with same index, and set them together in @@ -383,7 +390,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) val allScanSplitInfos = allScanPartitions.zip(basicScanExecTransformers).map { - case (partition, transformer) => transformer.getSplitInfosFromPartitions(partition) + case (partition, transformer) => + transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf) } val partitionLength = allScanSplitInfos.head.size if (allScanSplitInfos.exists(_.size != partitionLength)) {