From 56bfe643f825c73de47a5fca9dfbd8507d80efdb Mon Sep 17 00:00:00 2001 From: loneylee Date: Tue, 9 Jul 2024 10:22:50 +0800 Subject: [PATCH] fix ci --- ...oadcastNestedLoopJoinExecTransformer.scala | 22 ++++++++++++++++--- ...oadcastNestedLoopJoinExecTransformer.scala | 16 ++------------ 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala index c408a223784b6..35be8ee0b13ea 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala @@ -16,17 +16,20 @@ */ package org.apache.gluten.execution +import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.extension.ValidationResult import org.apache.spark.rdd.RDD import org.apache.spark.rpc.GlutenDriverEndpoint import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.BuildSide -import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType} +import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftSemi} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.vectorized.ColumnarBatch +import com.google.protobuf.{Any, StringValue} + case class CHBroadcastNestedLoopJoinExecTransformer( left: SparkPlan, right: SparkPlan, @@ -56,7 +59,6 @@ case class CHBroadcastNestedLoopJoinExecTransformer( val context = BroadCastHashJoinContext(Seq.empty, joinType, false, buildPlan.output, buildBroadcastTableId) val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context) - // FIXME: Do we have to make build side a RDD? streamedRDD :+ broadcastRDD } @@ -77,11 +79,25 @@ case class CHBroadcastNestedLoopJoinExecTransformer( res } + override def genJoinParameters(): Any = { + // for ch + val joinParametersStr = new StringBuffer("JoinParameters:") + joinParametersStr + .append("buildHashTableId=") + .append(buildBroadcastTableId) + .append("\n") + val message = StringValue + .newBuilder() + .setValue(joinParametersStr.toString) + .build() + BackendsApiManager.getTransformerApiInstance.packPBMessage(message) + } + override def validateJoinTypeAndBuildSide(): ValidationResult = { joinType match { case _: InnerLike => case _ => - if (condition.isDefined) { + if (joinType == LeftSemi || condition.isDefined) { return ValidationResult.notOk( s"Broadcast Nested Loop join is not supported join type $joinType with conditions") } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala index ef25b39ab9bd3..0251943cef31b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.BaseJoinExec import org.apache.spark.sql.execution.metric.SQLMetric -import com.google.protobuf.{Any, StringValue} +import com.google.protobuf.Any import io.substrait.proto.CrossRel abstract class BroadcastNestedLoopJoinExecTransformer( @@ -110,19 +110,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer( } } - def genJoinParameters(): Any = { - // for ch - val joinParametersStr = new StringBuffer("JoinParameters:") - joinParametersStr - .append("buildHashTableId=") - .append(buildBroadcastTableId) - .append("\n") - val message = StringValue - .newBuilder() - .setValue(joinParametersStr.toString) - .build() - BackendsApiManager.getTransformerApiInstance.packPBMessage(message) - } + def genJoinParameters(): Any = Any.getDefaultInstance override protected def doTransform(context: SubstraitContext): TransformContext = { val streamedPlanContext = streamedPlan.asInstanceOf[TransformSupport].transform(context)