Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 7, 2024
1 parent 4cb0a43 commit e9b364e
Showing 1 changed file with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,23 +331,25 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}
}

val child = if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) {
VeloxAppendBatchesExec(shuffle.child, GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
} else {
shuffle.child
def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = {
if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) {
VeloxAppendBatchesExec(plan, GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
} else {
plan
}
}

val child = shuffle.child

shuffle.outputPartitioning match {
case HashPartitioning(exprs, _) =>
val hashExpr = new Murmur3Hash(exprs)
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output
val projectTransformer = ProjectExecTransformer(projectList, child)
val validationResult = projectTransformer.doValidate()
if (validationResult.isValid) {
ColumnarShuffleExchangeExec(
shuffle,
projectTransformer,
projectTransformer.output.drop(1))
val newChild = maybeAddAppendBatchesExec(projectTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1))
} else {
TransformHints.tagNotTransformable(shuffle, validationResult)
shuffle.withNewChildren(child :: Nil)
Expand All @@ -363,7 +365,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
// null type since the value always be null.
val columnsForHash = child.output.filterNot(_.dataType == NullType)
if (columnsForHash.isEmpty) {
ColumnarShuffleExchangeExec(shuffle, child, child.output)
val newChild = maybeAddAppendBatchesExec(child)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
val hashExpr = new Murmur3Hash(columnsForHash)
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output
Expand All @@ -384,18 +387,17 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
val validationResult = dropSortColumnTransformer.doValidate()
if (validationResult.isValid) {
ColumnarShuffleExchangeExec(
shuffle,
dropSortColumnTransformer,
dropSortColumnTransformer.output)
val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
TransformHints.tagNotTransformable(shuffle, validationResult)
shuffle.withNewChildren(child :: Nil)
}
}
}
case _ =>
ColumnarShuffleExchangeExec(shuffle, child, null)
val newChild = maybeAddAppendBatchesExec(child)
ColumnarShuffleExchangeExec(shuffle, newChild, null)
}
}

Expand Down

0 comments on commit e9b364e

Please sign in to comment.