Skip to content

Commit

Permalink
[GLUTEN-8050][VL] Add viewfs support in scan validation (#8049)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored Dec 24, 2024
1 parent 1d12c4a commit 0570203
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import java.util.Locale

Expand Down Expand Up @@ -166,7 +167,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = {
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = {

// Validate if all types are supported.
def hasComplexType: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.util.control.Breaks.breakable

Expand Down Expand Up @@ -99,15 +100,36 @@ object VeloxBackendSettings extends BackendSettingsApi {
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = {
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = {

def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
.allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
) {
Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
if (filteredRootPaths.nonEmpty) {
val resolvedPaths =
if (
GlutenConfig.getConf.enableHdfsViewfs && filteredRootPaths.head.startsWith("viewfs")
) {
// Convert the viewfs path to hdfs path.
filteredRootPaths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.get.value)
viewFileSystem.resolvePath(viewPath).toString
}
} else {
filteredRootPaths
}

if (
!VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
resolvedPaths.toArray)
) {
Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
} else {
None
}
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.SerializableConfiguration

trait BackendSettingsApi {

Expand All @@ -39,7 +40,9 @@ trait BackendSettingsApi {
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = ValidationResult.succeeded
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult =
ValidationResult.succeeded

def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -77,6 +78,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
getProperties))
}

val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)

override protected def doValidateInternal(): ValidationResult = {
var fields = schema.fields

Expand All @@ -91,7 +95,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
}

val validationResult = BackendsApiManager.getSettings
.validateScanExec(fileFormat, fields, getRootFilePaths, getProperties)
.validateScanExec(
fileFormat,
fields,
getRootFilePaths,
getProperties,
Some(serializableHadoopConf))
if (!validationResult.ok()) {
return validationResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,18 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
if (paths.nonEmpty && paths.exists(_.startsWith("viewfs"))) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
var finalPath = viewfsPath
while (finalPath.startsWith("viewfs")) {
val viewPath = new Path(finalPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
finalPath = viewFileSystem.resolvePath(viewPath).toString
}
finalPath
}
splitInfo.setPaths(newPaths.asJava)
}
Expand Down

0 comments on commit 0570203

Please sign in to comment.