Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Nov 10, 2023
1 parent 39d1a26 commit fd2159f
Showing 1 changed file with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.glutenproject.metrics.{GlutenTimeMetric, MetricsUpdater, NoopMetricsUp
import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode}
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.plan.{PlanBuilder, PlanNode}
import io.glutenproject.substrait.rel.RelNode
import io.glutenproject.substrait.rel.{ReadSplit, RelNode}
import io.glutenproject.utils.SubstraitPlanPrinterUtil

import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkConf, TaskContext}
Expand Down Expand Up @@ -243,19 +243,12 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
* rather than genFinalStageIterator will be invoked
*/

// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted.
val allScanReadSplits = basicScanExecTransformers.map(_.getReadSplits)
val partitionLength = allScanReadSplits.head.size
if (allScanReadSplits.exists(_.size != partitionLength)) {
throw new GlutenException(
"The partition length of all the scan transformer are not the same.")
}
val allScanReadSplits = getReadSplitFromScanTransformer(basicScanExecTransformers)
val (wsCxt, substraitPlanPartitions) = GlutenTimeMetric.withMillisTime {
val wsCxt = doWholeStageTransform()

// generate each partition of all scan exec
val substraitPlanPartitions = allScanReadSplits.transpose.zipWithIndex.map {
val substraitPlanPartitions = allScanReadSplits.zipWithIndex.map {
case (readSplits, index) =>
wsCxt.substraitContext.initReadSplitsIndex(0)
wsCxt.substraitContext.setReadSplits(readSplits)
Expand Down Expand Up @@ -337,6 +330,32 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

override protected def withNewChildInternal(newChild: SparkPlan): WholeStageTransformer =
copy(child = newChild, materializeInput = materializeInput)(transformStageId)

private def getReadSplitFromScanTransformer(
basicScanExecTransformers: Seq[BasicScanExecTransformer]): Seq[Seq[ReadSplit]] = {
// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted. We should combine the two scan
// transformers' partitions with same index, and set them together in
// the substraitContext. We use transpose to do that, You can refer to
// the diagram below.
// scan1 p11 p12 p13 p14 ... p1n
// scan2 p21 p22 p23 p24 ... p2n
// transpose =>
// scan1 | scan2
// p11 | p21 => substraitContext.setReadSplits([p11, p21])
// p12 | p22 => substraitContext.setReadSplits([p11, p22])
// p13 | p23 ...
// p14 | p24
// ...
// p1n | p2n => substraitContext.setReadSplits([p1n, p2n])
val allScanReadSplits = basicScanExecTransformers.map(_.getReadSplits)
val partitionLength = allScanReadSplits.head.size
if (allScanReadSplits.exists(_.size != partitionLength)) {
throw new GlutenException(
"The partition length of all the scan transformer are not the same.")
}
allScanReadSplits.transpose
}
}

/**
Expand Down

0 comments on commit fd2159f

Please sign in to comment.