From c144443443580baf0f97dbfb721d66bc9cb21faa Mon Sep 17 00:00:00 2001 From: JiaKe Date: Mon, 18 Nov 2024 09:01:19 +0800 Subject: [PATCH] Add config to support viewfs in Gluten. (#7892) --- .../clickhouse/CHIteratorApi.scala | 4 +- ...tenClickHouseMergeTreeWriteOnS3Suite.scala | 2 +- .../GlutenClickHouseMergeTreeWriteSuite.scala | 6 +-- .../backendsapi/velox/VeloxIteratorApi.scala | 20 +++------- .../execution/IcebergScanTransformer.scala | 5 +-- .../gluten/execution/VeloxIcebergSuite.scala | 6 +-- .../gluten/substrait/rel/LocalFilesNode.java | 5 +++ .../gluten/backendsapi/IteratorApi.scala | 4 +- .../execution/BasicScanExecTransformer.scala | 12 ++---- .../execution/WholeStageTransformer.scala | 37 +++++++++++++++---- .../org/apache/gluten/GlutenConfig.scala | 9 +++++ 11 files changed, 62 insertions(+), 48 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index dd5a736e7571..ff268b95d8de 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -43,7 +43,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import java.lang.{Long => JLong} import java.net.URI @@ -133,8 +132,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo = { + properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => ExtensionTableBuilder diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 571dc4ba9258..c0f509c68cda 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos(null).size) + assertResult(1)(plans.head.getSplitInfos.size) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index 85c8c2d92a52..72adee309dd7 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1807,7 +1807,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(conf._2)(plans.head.getSplitInfos(null).size) + assertResult(conf._2)(plans.head.getSplitInfos.size) } } }) @@ -1831,7 +1831,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos(null).size) + assertResult(1)(plans.head.getSplitInfos.size) } } } @@ -1939,7 +1939,7 @@ class GlutenClickHouseMergeTreeWriteSuite case f: BasicScanExecTransformer => f } assertResult(2)(scanExec.size) - assertResult(conf._2)(scanExec(1).getSplitInfos(null).size) + assertResult(conf._2)(scanExec(1).getSplitInfos.size) } } }) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 320d1f366c23..061daaac0fad 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, SparkDirectoryUtil} - -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil} import java.lang.{Long => JLong} import java.nio.charset.StandardCharsets @@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo = { + properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => val ( @@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { modificationTimes, partitionColumns, metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf) + constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) LocalFilesBuilder.makeLocalFiles( @@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], - metadataColumnNames: Seq[String], - serializableHadoopConf: SerializableConfiguration) = { + metadataColumnNames: Seq[String]) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() @@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { file => // The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded // path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder - var filePath = file.filePath.toString - if (filePath.startsWith("viewfs")) { - val viewPath = new Path(filePath) - val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value) - filePath = viewFileSystem.resolvePath(viewPath).toString - } paths.add( GlutenURLDecoder - .decode(filePath, StandardCharsets.UTF_8.name())) + .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) val (fileSize, modificationTime) = diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 10d24c317cc1..1cbeb52a9213 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil @@ -59,9 +58,7 @@ case class IcebergScanTransformer( override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) - override def getSplitInfosFromPartitions( - partitions: Seq[InputPartition], - serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions( scan, keyGroupedPartitioning, diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala index 7f399ce629cf..de71d341db69 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1) case _ => // do nothing } checkLengthAndPlan(df, 5) diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index 04bb9d8cf400..9513f497602a 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -92,6 +92,11 @@ public List getPaths() { return paths; } + public void setPaths(List newPaths) { + paths.clear(); + paths.addAll(newPaths); + } + public void setFileSchema(StructType schema) { this.fileSchema = schema; } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 69c9d37334de..11211bd0da91 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration trait IteratorApi { @@ -38,8 +37,7 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo + properties: Map[String, String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index d768ac2c5936..73ed35e7190b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -63,13 +62,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { - getSplitInfosFromPartitions(getPartitions, serializableHadoopConf) + def getSplitInfos(): Seq[SplitInfo] = { + getSplitInfosFromPartitions(getPartitions) } - def getSplitInfosFromPartitions( - partitions: Seq[InputPartition], - serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance .genSplitInfo( @@ -77,8 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getPartitionSchema, fileFormat, getMetadataColumns.map(_.name), - getProperties, - serializableHadoopConf)) + getProperties)) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 70839ffc2eba..beb7fe5f99d2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -25,7 +25,7 @@ import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater} import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode} import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode} -import org.apache.gluten.substrait.rel.{RelNode, SplitInfo} +import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo} import org.apache.gluten.utils.SubstraitPlanPrinterUtil import org.apache.spark._ @@ -43,7 +43,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists +import org.apache.hadoop.fs.{FileSystem, Path} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -127,8 +129,10 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext) val sparkConf: SparkConf = sparkContext.getConf + val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( sparkContext.hadoopConfiguration) + val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo @transient @@ -289,10 +293,28 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f */ val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) val allScanSplitInfos = - getSplitInfosFromPartitions( - basicScanExecTransformers, - allScanPartitions, - serializableHadoopConf) + getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) + if (GlutenConfig.getConf.enableHdfsViewfs) { + allScanSplitInfos.foreach { + splitInfos => + splitInfos.foreach { + case splitInfo: LocalFilesNode => + val paths = splitInfo.getPaths.asScala + if (paths.nonEmpty && paths.head.startsWith("viewfs")) { + // Convert the viewfs path into hdfs + val newPaths = paths.map { + viewfsPath => + val viewPath = new Path(viewfsPath) + val viewFileSystem = + FileSystem.get(viewPath.toUri, serializableHadoopConf.value) + viewFileSystem.resolvePath(viewPath).toString + } + splitInfo.setPaths(newPaths.asJava) + } + } + } + } + val inputPartitions = BackendsApiManager.getIteratorApiInstance.genPartitions( wsCtx, @@ -384,8 +406,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f private def getSplitInfosFromPartitions( basicScanExecTransformers: Seq[BasicScanExecTransformer], - allScanPartitions: Seq[Seq[InputPartition]], - serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = { + allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = { // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. We should combine the two scan // transformers' partitions with same index, and set them together in @@ -404,7 +425,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val allScanSplitInfos = allScanPartitions.zip(basicScanExecTransformers).map { case (partition, transformer) => - transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf) + transformer.getSplitInfosFromPartitions(partition) } val partitionLength = allScanSplitInfos.head.size if (allScanSplitInfos.exists(_.size != partitionLength)) { diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 5b15faf6466a..107c33a241fd 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -478,6 +478,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED) def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED) + + def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED) } object GlutenConfig { @@ -2193,4 +2195,11 @@ object GlutenConfig { "Otherwise, do nothing.") .booleanConf .createWithDefault(false) + + val HDFS_VIEWFS_ENABLED = + buildStaticConf("spark.gluten.storage.hdfsViewfs.enabled") + .internal() + .doc("If enabled, gluten will convert the viewfs path to hdfs path in scala side") + .booleanConf + .createWithDefault(false) }