Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Jul 9, 2024
1 parent a361e25 commit 56bfe64
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.ValidationResult

import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType}
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftSemi}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.protobuf.{Any, StringValue}

case class CHBroadcastNestedLoopJoinExecTransformer(
left: SparkPlan,
right: SparkPlan,
Expand Down Expand Up @@ -56,7 +59,6 @@ case class CHBroadcastNestedLoopJoinExecTransformer(
val context =
BroadCastHashJoinContext(Seq.empty, joinType, false, buildPlan.output, buildBroadcastTableId)
val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context)
// FIXME: Do we have to make build side a RDD?
streamedRDD :+ broadcastRDD
}

Expand All @@ -77,11 +79,25 @@ case class CHBroadcastNestedLoopJoinExecTransformer(
res
}

override def genJoinParameters(): Any = {
// for ch
val joinParametersStr = new StringBuffer("JoinParameters:")
joinParametersStr
.append("buildHashTableId=")
.append(buildBroadcastTableId)
.append("\n")
val message = StringValue
.newBuilder()
.setValue(joinParametersStr.toString)
.build()
BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
}

override def validateJoinTypeAndBuildSide(): ValidationResult = {
joinType match {
case _: InnerLike =>
case _ =>
if (condition.isDefined) {
if (joinType == LeftSemi || condition.isDefined) {
return ValidationResult.notOk(
s"Broadcast Nested Loop join is not supported join type $joinType with conditions")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.BaseJoinExec
import org.apache.spark.sql.execution.metric.SQLMetric

import com.google.protobuf.{Any, StringValue}
import com.google.protobuf.Any
import io.substrait.proto.CrossRel

abstract class BroadcastNestedLoopJoinExecTransformer(
Expand Down Expand Up @@ -110,19 +110,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
}
}

def genJoinParameters(): Any = {
// for ch
val joinParametersStr = new StringBuffer("JoinParameters:")
joinParametersStr
.append("buildHashTableId=")
.append(buildBroadcastTableId)
.append("\n")
val message = StringValue
.newBuilder()
.setValue(joinParametersStr.toString)
.build()
BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
}
def genJoinParameters(): Any = Any.getDefaultInstance

override protected def doTransform(context: SubstraitContext): TransformContext = {
val streamedPlanContext = streamedPlan.asInstanceOf[TransformSupport].transform(context)
Expand Down

0 comments on commit 56bfe64

Please sign in to comment.