Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
WangGuangxin committed Jul 2, 2024
1 parent ed28aeb commit b6597de
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,22 +377,19 @@ case class OffloadWindow() extends OffloadSingleNode with LogLevelUtil {
val transformer = replace.doReplace(window)
val newChild = transformer.children.head match {
case SortExec(_, false, child, _)
if outputOrderSatisfied(child, transformer.requiredChildOrdering) =>
if outputOrderSatisfied(child, transformer.requiredChildOrdering.head) =>
child
case p @ ProjectExec(_, SortExec(_, false, child, _))
if outputOrderSatisfied(child, transformer.requiredChildOrdering) =>
p.copy(child = child)
if outputOrderSatisfied(child, transformer.requiredChildOrdering.head) =>
p.withNewChildren(Seq(child))
case children => children
}
transformer.withNewChildren(Seq(newChild))
case other => other
}

private def outputOrderSatisfied(child: SparkPlan, required: Seq[Seq[SortOrder]]): Boolean = {
Seq(child).zip(required).forall {
case (child, requiredOrdering) =>
SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)
}
private def outputOrderSatisfied(child: SparkPlan, requiredOrdering: Seq[SortOrder]): Boolean = {
SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)
}
}

Expand Down

0 comments on commit b6597de

Please sign in to comment.