Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 7, 2024
1 parent 06b741f commit 2cdeed7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,20 +700,11 @@ object ExpressionConverter extends SQLConfHelper with Logging {
val newChild = from.child match {
case exchange: BroadcastExchangeExec =>
toColumnarBroadcastExchange(exchange)
case aqe: AdaptiveSparkPlanExec if !aqe.supportsColumnar =>
// ColumnarSubqueryBroadcastExec strictly requires for
// child with columnar output. AQE with supportsColumnar=false
// may produce row plan that will fail the subsequent processing.
// Thus we replace it with supportsColumnar=true to make sure
// columnar output is emitted from AQE.
val newAqe = AdaptiveSparkPlanExec(
aqe.inputPlan,
aqe.context,
aqe.preprocessingRules,
aqe.isSubquery,
supportsColumnar = true)
newAqe.copyTagsFrom(aqe)
newAqe
case aqe: AdaptiveSparkPlanExec =>
// Keeps the child if its is AQE even if its supportsColumnar == false.
// ColumnarSubqueryBroadcastExec is compatible with both row-based
// and columnar inputs.
aqe
case other => other
}
val out = ColumnarSubqueryBroadcastExec(from.name, from.index, from.buildKeys, newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelation, HashJoin, LongHashedRelation}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -75,35 +73,25 @@ case class ColumnarSubqueryBroadcastExec(
SQLExecution.withExecutionId(session, executionId) {
val rows = GlutenTimeMetric.millis(longMetric("collectTime")) {
_ =>
val exchangeChild = child match {
case exec: ReusedExchangeExec =>
exec.child
case _ =>
child
}
if (
exchangeChild.isInstanceOf[ColumnarBroadcastExchangeExec] ||
exchangeChild.isInstanceOf[AdaptiveSparkPlanExec]
) {
// transform broadcasted columnar value to Array[InternalRow] by key
exchangeChild
.executeBroadcast[BuildSideRelation]
.value
.transform(buildKeys(index))
.distinct
} else {
val broadcastRelation = exchangeChild.executeBroadcast[HashedRelation]().value
val (iter, expr) = if (broadcastRelation.isInstanceOf[LongHashedRelation]) {
(broadcastRelation.keys(), HashJoin.extractKeyExprAt(buildKeys, index))
} else {
(
broadcastRelation.keys(),
BoundReference(index, buildKeys(index).dataType, buildKeys(index).nullable))
}
val relation = child.executeBroadcast[Any]().value
relation match {
case b: BuildSideRelation =>
b.transform(buildKeys(index)).distinct
case h: HashedRelation =>
val (iter, expr) = if (h.isInstanceOf[LongHashedRelation]) {
(h.keys(), HashJoin.extractKeyExprAt(buildKeys, index))
} else {
(
h.keys(),
BoundReference(index, buildKeys(index).dataType, buildKeys(index).nullable))
}

val proj = UnsafeProjection.create(expr)
val keyIter = iter.map(proj).map(_.copy())
keyIter.toArray[InternalRow].distinct
val proj = UnsafeProjection.create(expr)
val keyIter = iter.map(proj).map(_.copy())
keyIter.toArray[InternalRow].distinct
case other =>
throw new UnsupportedOperationException(
s"Unrecognizable broadcast relation: $other")
}
}
val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes).sum
Expand Down

0 comments on commit 2cdeed7

Please sign in to comment.