Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Allow offloading BroadcastNestedLoopJoin for left join with build left & right join with build right #7149

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_LEFT:
joinType = core::JoinType::kLeft;
break;
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_RIGHT:
joinType = core::JoinType::kRight;
break;
default:
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
}
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::CrossRel& crossR
switch (crossRel.type()) {
case ::substrait::CrossRel_JoinType_JOIN_TYPE_INNER:
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
case ::substrait::CrossRel_JoinType_JOIN_TYPE_RIGHT:
break;
default:
LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
override def rightKeys: Seq[Expression] = Nil

private lazy val substraitJoinType: CrossRel.JoinType =
SubstraitUtil.toCrossRelSubstrait(joinType)
SubstraitUtil.toCrossRelSubstrait(joinType, needSwitchChildren)

// Unique ID for builded table
// Unique ID for built table.
lazy val buildBroadcastTableId: String = "BuiltBNLJBroadcastTable-" + buildPlan.id

// Hint substrait to switch the left and right,
Expand Down Expand Up @@ -96,6 +96,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
joinType match {
case _: InnerLike => right.outputPartitioning
case RightOuter => right.outputPartitioning
case LeftOuter => left.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"BroadcastNestedLoopJoin should not take $x as the JoinType with building left side")
Expand All @@ -104,6 +105,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
joinType match {
case _: InnerLike => left.outputPartitioning
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"BroadcastNestedLoopJoin should not take $x as the JoinType with building right side")
Expand All @@ -123,9 +125,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer(

val operatorId = context.nextOperatorId(this.nodeName)
val joinParams = new JoinParams
if (condition.isDefined) {
joinParams.isWithCondition = true
}
joinParams.isWithCondition = condition.isDefined

val crossRel = JoinUtils.createCrossRel(
substraitJoinType,
Expand Down Expand Up @@ -162,21 +162,11 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
}

def validateJoinTypeAndBuildSide(): ValidationResult = {
val result = joinType match {
joinType match {
case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
case _ =>
ValidationResult.failed(s"$joinType join is not supported with BroadcastNestedLoopJoin")
}

if (!result.ok()) {
return result
}

(joinType, buildSide) match {
case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
ValidationResult.failed(s"$joinType join is not supported with $buildSide")
case _ => ValidationResult.succeeded // continue
}
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,33 @@ object SubstraitUtil {
JoinRel.JoinType.UNRECOGNIZED
}

def toCrossRelSubstrait(sparkJoin: JoinType, needSwitchChildren: Boolean): CrossRel.JoinType =
sparkJoin match {
case _: InnerLike =>
CrossRel.JoinType.JOIN_TYPE_INNER
case LeftOuter =>
// since we always assume build right side in substrait,
// the left and right relations are exchanged and the
// join type is reverted.
if (needSwitchChildren) {
CrossRel.JoinType.JOIN_TYPE_RIGHT
} else {
CrossRel.JoinType.JOIN_TYPE_LEFT
}
case RightOuter =>
if (needSwitchChildren) {
CrossRel.JoinType.JOIN_TYPE_LEFT
} else {
CrossRel.JoinType.JOIN_TYPE_RIGHT
}
case LeftSemi =>
CrossRel.JoinType.JOIN_TYPE_LEFT_SEMI
case FullOuter =>
CrossRel.JoinType.JOIN_TYPE_OUTER
case _ =>
CrossRel.JoinType.UNRECOGNIZED
}

def toCrossRelSubstrait(sparkJoin: JoinType): CrossRel.JoinType = sparkJoin match {
case _: InnerLike =>
CrossRel.JoinType.JOIN_TYPE_INNER
Expand Down
Loading