Skip to content

Commit

Permalink
Fix the duplicated key exception in TopN
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 9, 2023
1 parent 2bb522f commit c5c3544
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ case class LimitTransformer(child: SparkPlan, offset: Long, count: Long)
override protected def doValidateInternal(): ValidationResult = {
val context = new SubstraitContext
val operatorId = context.nextOperatorId(this.nodeName)
val relNode = getRelNode(context, operatorId, offset, count, child.output, null, true)
val input = child match {
case c: TransformSupport => c.doTransform(context).root
case _ => null
}
val relNode = getRelNode(context, operatorId, offset, count, child.output, input, true)

doNativeValidation(context, relNode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.glutenproject.utils.PhysicalPlanSelector
import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -677,12 +677,17 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
"columnar topK is not enabled in TakeOrderedAndProjectExec")
} else {
var tagged: ValidationResult = null
val limitPlan = LimitTransformer(plan.child, 0, plan.limit)
tagged = limitPlan.doValidate()
if (tagged.isValid) {
val orderingSatisfies =
SortOrder.orderingSatisfies(plan.child.outputOrdering, plan.sortOrder)
if (orderingSatisfies) {
val limitPlan = LimitTransformer(plan.child, 0, plan.limit)
tagged = limitPlan.doValidate()
} else {
val sortPlan = SortExecTransformer(plan.sortOrder, false, plan.child)
tagged = sortPlan.doValidate()
val limitPlan = LimitTransformer(sortPlan, 0, plan.limit)
tagged = limitPlan.doValidate()
}

if (tagged.isValid) {
val projectPlan = ProjectExecTransformer(plan.projectList, plan.child)
tagged = projectPlan.doValidate()
Expand Down

0 comments on commit c5c3544

Please sign in to comment.