diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index 6213650d6c69a..f953841bd062d 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -41,10 +41,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import java.lang.{Long => JLong} import java.net.URI -import java.util.{ArrayList => JArrayList} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ -import scala.collection.mutable class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { @@ -54,56 +53,41 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { * @return */ override def genFilePartition( - index: Int, - partitions: Seq[InputPartition], - partitionSchemas: Seq[StructType], - fileFormats: Seq[ReadFileFormat], - wsCxt: WholeStageTransformContext): BaseGlutenPartition = { - val localFilesNodesWithLocations = partitions.indices.map( - i => - partitions(i) match { - case p: GlutenMergeTreePartition => - ( - ExtensionTableBuilder - .makeExtensionTable(p.minParts, p.maxParts, p.database, p.table, p.tablePath), - SoftAffinityUtil.getNativeMergeTreePartitionLocations(p)) - case f: FilePartition => - val paths = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - val partitionColumns = mutable.ArrayBuffer.empty[Map[String, String]] - f.files.foreach { - file => - paths.add(new URI(file.filePath).toASCIIString) - starts.add(JLong.valueOf(file.start)) - lengths.add(JLong.valueOf(file.length)) - // TODO: Support custom partition location - val partitionColumn = mutable.Map.empty[String, String] - partitionColumns.append(partitionColumn.toMap) - } - ( - LocalFilesBuilder.makeLocalFiles( - f.index, - paths, - starts, - lengths, - partitionColumns.map(_.asJava).asJava, - fileFormats(i)), - SoftAffinityUtil.getFilePartitionLocations(f)) - case _ => - throw new UnsupportedOperationException(s"Unsupported input partition.") - }) - wsCxt.substraitContext.initLocalFilesNodesIndex(0) - wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) - val substraitPlan = wsCxt.root.toProtobuf - if (index == 0) { - logOnLevel( - GlutenConfig.getConf.substraitPlanLogLevel, - s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil - .substraitPlanToJson(substraitPlan)}" - ) + partition: InputPartition, + partitionSchema: StructType, + fileFormat: ReadFileFormat): (java.io.Serializable, Array[String]) = { + partition match { + case p: GlutenMergeTreePartition => + ( + ExtensionTableBuilder + .makeExtensionTable(p.minParts, p.maxParts, p.database, p.table, p.tablePath), + SoftAffinityUtil.getNativeMergeTreePartitionLocations(p)) + case f: FilePartition => + val paths = new JArrayList[String]() + val starts = new JArrayList[JLong]() + val lengths = new JArrayList[JLong]() + val partitionColumns = new JArrayList[JMap[String, String]] + f.files.foreach { + file => + paths.add(new URI(file.filePath).toASCIIString) + starts.add(JLong.valueOf(file.start)) + lengths.add(JLong.valueOf(file.length)) + // TODO: Support custom partition location + val partitionColumn = new JHashMap[String, String]() + partitionColumns.add(partitionColumn) + } + ( + LocalFilesBuilder.makeLocalFiles( + f.index, + paths, + starts, + lengths, + partitionColumns, + fileFormat), + SoftAffinityUtil.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())) + case _ => + throw new UnsupportedOperationException(s"Unsupported input partition.") } - GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -244,17 +228,25 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { override def genNativeFileScanRDD( sparkContext: SparkContext, wsCxt: WholeStageTransformContext, - fileFormat: ReadFileFormat, - inputPartitions: Seq[InputPartition], + localFileNodes: Seq[(java.io.Serializable, Array[String])], numOutputRows: SQLMetric, numOutputBatches: SQLMetric, scanTime: SQLMetric): RDD[ColumnarBatch] = { val substraitPlanPartition = GlutenTimeMetric.withMillisTime { - // generate each partition of all scan exec - inputPartitions.indices.map( - i => { - genFilePartition(i, Seq(inputPartitions(i)), null, Seq(fileFormat), wsCxt) - }) + localFileNodes.zipWithIndex.map { + case (localFileNode, index) => + wsCxt.substraitContext.initLocalFilesNodesIndex(0) + wsCxt.substraitContext.setLocalFilesNodes(Seq(localFileNode._1)) + val substraitPlan = wsCxt.root.toProtobuf + if (index == 0) { + logOnLevel( + GlutenConfig.getConf.substraitPlanLogLevel, + s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil + .substraitPlanToJson(substraitPlan)}" + ) + } + GlutenPartition(index, substraitPlan.toByteArray, localFileNode._2) + } }(t => logInfo(s"Generating the Substrait plan took: $t ms.")) new NativeFileScanColumnarRDD( diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index 114de2b623fdf..198dcfa35dc6c 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -46,11 +46,10 @@ import java.lang.{Long => JLong} import java.net.URLDecoder import java.nio.charset.StandardCharsets import java.time.ZoneOffset -import java.util.{ArrayList => JArrayList} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import scala.collection.mutable class IteratorApiImpl extends IteratorApi with Logging { @@ -60,70 +59,59 @@ class IteratorApiImpl extends IteratorApi with Logging { * @return */ override def genFilePartition( - index: Int, - partitions: Seq[InputPartition], - partitionSchemas: Seq[StructType], - fileFormats: Seq[ReadFileFormat], - wsCxt: WholeStageTransformContext): BaseGlutenPartition = { - - def constructSplitInfo(schema: StructType, files: Array[PartitionedFile]) = { - val paths = mutable.ArrayBuffer.empty[String] - val starts = mutable.ArrayBuffer.empty[JLong] - val lengths = mutable.ArrayBuffer.empty[JLong] - val partitionColumns = mutable.ArrayBuffer.empty[Map[String, String]] - files.foreach { - file => - paths.append(URLDecoder.decode(file.filePath.toString, StandardCharsets.UTF_8.name())) - starts.append(JLong.valueOf(file.start)) - lengths.append(JLong.valueOf(file.length)) - - val partitionColumn = mutable.Map.empty[String, String] - for (i <- 0 until file.partitionValues.numFields) { - val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { - ExternalCatalogUtils.DEFAULT_PARTITION_NAME - } else { - val pn = file.partitionValues.get(i, schema.fields(i).dataType) - schema.fields(i).dataType match { - case _: BinaryType => - new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) - case _: DateType => - DateFormatter.apply().format(pn.asInstanceOf[Integer]) - case _: TimestampType => - TimestampFormatter - .getFractionFormatter(ZoneOffset.UTC) - .format(pn.asInstanceOf[JLong]) - case _ => pn.toString - } + partition: InputPartition, + partitionSchema: StructType, + fileFormat: ReadFileFormat): (java.io.Serializable, Array[String]) = { + partition match { + case f: FilePartition => + val (paths, starts, lengths, partitionColumns) = + constructSplitInfo(partitionSchema, f.files) + ( + LocalFilesBuilder.makeLocalFiles( + f.index, + paths, + starts, + lengths, + partitionColumns, + fileFormat), + SoftAffinityUtil.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())) + } + } + + private def constructSplitInfo(schema: StructType, files: Array[PartitionedFile]) = { + val paths = new JArrayList[String]() + val starts = new JArrayList[JLong] + val lengths = new JArrayList[JLong]() + val partitionColumns = new JArrayList[JMap[String, String]] + files.foreach { + file => + paths.add(URLDecoder.decode(file.filePath, StandardCharsets.UTF_8.name())) + starts.add(JLong.valueOf(file.start)) + lengths.add(JLong.valueOf(file.length)) + + val partitionColumn = new JHashMap[String, String]() + for (i <- 0 until file.partitionValues.numFields) { + val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME + } else { + val pn = file.partitionValues.get(i, schema.fields(i).dataType) + schema.fields(i).dataType match { + case _: BinaryType => + new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) + case _: DateType => + DateFormatter.apply().format(pn.asInstanceOf[Integer]) + case _: TimestampType => + TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(pn.asInstanceOf[java.lang.Long]) + case _ => pn.toString } - partitionColumn.put(schema.names(i), partitionColumnValue) } - partitionColumns.append(partitionColumn.toMap) - } - (paths, starts, lengths, partitionColumns) + partitionColumn.put(schema.names(i), partitionColumnValue) + } + partitionColumns.add(partitionColumn) } - - val localFilesNodesWithLocations = partitions.indices.map( - i => - partitions(i) match { - case f: FilePartition => - val fileFormat = fileFormats(i) - val partitionSchema = partitionSchemas(i) - val (paths, starts, lengths, partitionColumns) = - constructSplitInfo(partitionSchema, f.files) - ( - LocalFilesBuilder.makeLocalFiles( - f.index, - paths.asJava, - starts.asJava, - lengths.asJava, - partitionColumns.map(_.asJava).asJava, - fileFormat), - SoftAffinityUtil.getFilePartitionLocations(f)) - }) - wsCxt.substraitContext.initLocalFilesNodesIndex(0) - wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) - val substraitPlan = wsCxt.root.toProtobuf - GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) + (paths, starts, lengths, partitionColumns) } /** @@ -211,8 +199,7 @@ class IteratorApiImpl extends IteratorApi with Logging { override def genNativeFileScanRDD( sparkContext: SparkContext, wsCxt: WholeStageTransformContext, - fileFormat: ReadFileFormat, - inputPartitions: Seq[InputPartition], + localFileNodes: Seq[(java.io.Serializable, Array[String])], numOutputRows: SQLMetric, numOutputBatches: SQLMetric, scanTime: SQLMetric): RDD[ColumnarBatch] = { diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala index ab4f1927d9d81..638af300dad10 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala @@ -39,11 +39,9 @@ trait IteratorApi { * @return */ def genFilePartition( - index: Int, - partitions: Seq[InputPartition], - partitionSchema: Seq[StructType], - fileFormats: Seq[ReadFileFormat], - wsCxt: WholeStageTransformContext): BaseGlutenPartition + partition: InputPartition, + partitionSchema: StructType, + fileFormat: ReadFileFormat): (java.io.Serializable, Array[String]) /** * Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other @@ -82,8 +80,7 @@ trait IteratorApi { def genNativeFileScanRDD( sparkContext: SparkContext, wsCxt: WholeStageTransformContext, - fileFormat: ReadFileFormat, - inputPartitions: Seq[InputPartition], + localFileNodes: Seq[(java.io.Serializable, Array[String])], numOutputRows: SQLMetric, numOutputBatches: SQLMetric, scanTime: SQLMetric): RDD[ColumnarBatch] diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index c39a5e4465611..63bed1df703ef 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -54,6 +54,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { // TODO: Remove this expensive call when CH support scan custom partition location. def getInputFilePaths: Seq[String] + def getLocalFilesNodes: Seq[(java.io.Serializable, Array[String])] = + getPartitions.map( + BackendsApiManager.getIteratorApiInstance + .genFilePartition(_, getPartitionSchemas, fileFormat)) + def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("outputRows") val numOutputVectors = longMetric("outputVectors") @@ -63,13 +68,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { val outNames = outputAttributes().map(ConverterUtils.genColumnNameWithExprId).asJava val planNode = PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformContext.root), outNames) - val fileFormat = ConverterUtils.getFileFormat(this) BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD( sparkContext, WholeStageTransformContext(planNode, substraitContext), - fileFormat, - getPartitions, + getLocalFilesNodes, numOutputRows, numOutputVectors, scanTime diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index 57df410defc50..02582e366421f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -244,26 +244,23 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. - val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) - val allScanPartitionSchemas = basicScanExecTransformers.map(_.getPartitionSchemas) - val partitionLength = allScanPartitions.head.size - if (allScanPartitions.exists(_.size != partitionLength)) { + val allScanLocalFilesNodes = basicScanExecTransformers.map(_.getLocalFilesNodes) + val partitionLength = allScanLocalFilesNodes.head.size + if (allScanLocalFilesNodes.exists(_.size != partitionLength)) { throw new GlutenException( "The partition length of all the scan transformer are not the same.") } val (wsCxt, substraitPlanPartitions) = GlutenTimeMetric.withMillisTime { val wsCxt = doWholeStageTransform() - // the file format for each scan exec - val fileFormats = basicScanExecTransformers.map(ConverterUtils.getFileFormat) - // generate each partition of all scan exec - val substraitPlanPartitions = (0 until partitionLength).map( - i => { - val currentPartitions = allScanPartitions.map(_(i)) - BackendsApiManager.getIteratorApiInstance - .genFilePartition(i, currentPartitions, allScanPartitionSchemas, fileFormats, wsCxt) - }) + val substraitPlanPartitions = allScanLocalFilesNodes.transpose.zipWithIndex.map { + case (localFilesNodes, index) => + wsCxt.substraitContext.initLocalFilesNodesIndex(0) + wsCxt.substraitContext.setLocalFilesNodes(localFilesNodes.map(_._1)) + val substraitPlan = wsCxt.root.toProtobuf + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodes.head._2) + } (wsCxt, substraitPlanPartitions) }( t => diff --git a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityUtil.scala b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityUtil.scala index 9f1faa3c11c08..3156991d6be90 100644 --- a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityUtil.scala @@ -23,42 +23,40 @@ import io.glutenproject.utils.LogLevelUtil import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.sql.execution.datasources.FilePartition object SoftAffinityUtil extends LogLevelUtil with Logging { private lazy val softAffinityLogLevel = GlutenConfig.getConf.softAffinityLogLevel /** Get the locations by SoftAffinityManager */ - def getFilePartitionLocations(filePartition: FilePartition): Array[String] = { - // Get the original preferred locations - val expectedTargets = filePartition.preferredLocations() - + def getFilePartitionLocations( + filePaths: Array[String], + preferredLocations: Array[String]): Array[String] = { if ( - !filePartition.files.isEmpty && SoftAffinityManager.usingSoftAffinity - && !SoftAffinityManager.checkTargetHosts(expectedTargets) + !filePaths.isEmpty && SoftAffinityManager.usingSoftAffinity + && !SoftAffinityManager.checkTargetHosts(preferredLocations) ) { // if there is no host in the node list which are executors running on, // using SoftAffinityManager to generate target executors. // Only using the first file to calculate the target executors // Only get one file to calculate the target host - val file = filePartition.files.sortBy(_.filePath.toString).head - val locations = SoftAffinityManager.askExecutors(file.filePath.toString) + val filePath = filePaths.min + val locations = SoftAffinityManager.askExecutors(filePath) if (!locations.isEmpty) { logOnLevel( softAffinityLogLevel, - s"SAMetrics=File ${file.filePath} - " + + s"SAMetrics=File $filePath - " + s"the expected executors are ${locations.mkString("_")} ") locations.map { p => if (p._1.equals("")) p._2 else ExecutorCacheTaskLocation(p._2, p._1).toString - }.toArray + } } else { Array.empty[String] } } else { - expectedTargets + preferredLocations } } @@ -77,7 +75,7 @@ object SoftAffinityUtil extends LogLevelUtil with Logging { p => if (p._1.equals("")) p._2 else ExecutorCacheTaskLocation(p._2, p._1).toString - }.toArray + } } else { Array.empty[String] } diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index 34117333d124d..b1f772e5cdfff 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -60,7 +60,9 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate ).toArray ) - val locations = SoftAffinityUtil.getFilePartitionLocations(partition) + val locations = SoftAffinityUtil.getFilePartitionLocations( + partition.files.map(_.filePath.toString), + partition.preferredLocations()) val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-2", "host-3")) { @@ -89,7 +91,9 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate ).toArray ) - val locations = SoftAffinityUtil.getFilePartitionLocations(partition) + val locations = SoftAffinityUtil.getFilePartitionLocations( + partition.files.map(_.filePath.toString), + partition.preferredLocations()) val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) @@ -119,7 +123,9 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate ).toArray ) - val locations = SoftAffinityUtil.getFilePartitionLocations(partition) + val locations = SoftAffinityUtil.getFilePartitionLocations( + partition.files.map(_.filePath.toString), + partition.preferredLocations()) val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) @@ -161,7 +167,9 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate ).toArray ) - val locations = SoftAffinityUtil.getFilePartitionLocations(partition) + val locations = SoftAffinityUtil.getFilePartitionLocations( + partition.files.map(_.filePath.toString), + partition.preferredLocations()) val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations)