From 8518680d3daf8c46659f9f0c30f6550d50034d73 Mon Sep 17 00:00:00 2001 From: exmy Date: Tue, 28 Nov 2023 16:28:56 +0800 Subject: [PATCH] [CORE] Fix exception of pb MessageToJsonString (#3823) --- .../clickhouse/CHTransformerApi.scala | 3 +++ .../backendsapi/velox/TransformerApiImpl.scala | 4 ++++ .../backendsapi/TransformerApi.scala | 4 ++++ .../execution/HashJoinExecTransformer.scala | 16 +++++++--------- .../io/glutenproject/execution/JoinUtils.scala | 12 ++++++------ .../execution/SortMergeJoinExecTransformer.scala | 14 ++++++-------- .../execution/WindowExecTransformer.scala | 8 +++----- 7 files changed, 33 insertions(+), 28 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala index 5a6e6647fff7..839b1be92041 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.collection.BitSet import com.google.common.collect.Lists +import com.google.protobuf.{Any, Message} import java.util @@ -267,4 +268,6 @@ class CHTransformerApi extends TransformerApi with Logging { override def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String = { throw new UnsupportedOperationException("CH backend does not support this method") } + + override def getPackMessage(message: Message): Any = Any.pack(message) } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala index 61ad92b6b931..05f6dbc940c6 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDi import org.apache.spark.sql.types._ import org.apache.spark.util.collection.BitSet +import com.google.protobuf.{Any, Message} + import java.util.{Map => JMap} class TransformerApiImpl extends TransformerApi with Logging { @@ -135,4 +137,6 @@ class TransformerApiImpl extends TransformerApi with Logging { tmpRuntime.release() } } + + override def getPackMessage(message: Message): Any = Any.pack(message, "") } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala index 30fd9bfde2a1..164524addfb5 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDi import org.apache.spark.sql.types.DecimalType import org.apache.spark.util.collection.BitSet +import com.google.protobuf.{Any, Message} + import java.util trait TransformerApi { @@ -85,4 +87,6 @@ trait TransformerApi { nullOnOverflow: Boolean): ExpressionNode def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String + + def getPackMessage(message: Message): Any } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala index 67bff42ac07d..eec5726b304b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala @@ -204,7 +204,7 @@ trait HashJoinLikeExecTransformer substraitJoinType, needSwitchChildren, joinType, - genJoinParametersBuilder(), + genJoinParameters(), null, null, streamedPlan.output, @@ -278,7 +278,7 @@ trait HashJoinLikeExecTransformer substraitJoinType, needSwitchChildren, joinType, - genJoinParametersBuilder(), + genJoinParameters(), inputStreamedRelNode, inputBuildRelNode, inputStreamedOutput, @@ -297,8 +297,8 @@ trait HashJoinLikeExecTransformer inputBuildOutput) } - def genJoinParametersBuilder(): Any.Builder = { - val (isBHJ, isNullAwareAntiJoin, buildHashTableId) = genJoinParameters() + def genJoinParameters(): Any = { + val (isBHJ, isNullAwareAntiJoin, buildHashTableId) = genJoinParametersInternal() // Start with "JoinParameters:" val joinParametersStr = new StringBuffer("JoinParameters:") // isBHJ: 0 for SHJ, 1 for BHJ @@ -321,12 +321,10 @@ trait HashJoinLikeExecTransformer .newBuilder() .setValue(joinParametersStr.toString) .build() - Any.newBuilder - .setValue(message.toByteString) - .setTypeUrl("/google.protobuf.StringValue") + BackendsApiManager.getTransformerApiInstance.getPackMessage(message) } - def genJoinParameters(): (Int, Int, String) = { + def genJoinParametersInternal(): (Int, Int, String) = { (0, 0, "") } } @@ -406,7 +404,7 @@ abstract class BroadcastHashJoinExecTransformer( // Unique ID for builded hash table lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id - override def genJoinParameters(): (Int, Int, String) = { + override def genJoinParametersInternal(): (Int, Int, String) = { (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId) } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala b/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala index 4f256debb058..0412b94e8f2b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala @@ -16,6 +16,7 @@ */ package io.glutenproject.execution +import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{AttributeReferenceTransformer, ConverterUtils, ExpressionConverter} import io.glutenproject.substrait.`type`.TypeBuilder import io.glutenproject.substrait.SubstraitContext @@ -40,9 +41,8 @@ object JoinUtils { // Normally the enhancement node is only used for plan validation. But here the enhancement // is also used in execution phase. In this case an empty typeUrlPrefix need to be passed, // so that it can be correctly parsed into json string on the cpp side. - Any.pack( - TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf, - /* typeUrlPrefix */ "") + BackendsApiManager.getTransformerApiInstance.getPackMessage( + TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) } def createExtensionNode(output: Seq[Attribute], validation: Boolean): AdvancedExtensionNode = { @@ -132,12 +132,12 @@ object JoinUtils { } def createJoinExtensionNode( - joinParameters: Any.Builder, + joinParameters: Any, output: Seq[Attribute]): AdvancedExtensionNode = { // Use field [optimization] in a extension node // to send some join parameters through Substrait plan. val enhancement = createEnhancement(output) - ExtensionBuilder.makeAdvancedExtension(joinParameters.build(), enhancement) + ExtensionBuilder.makeAdvancedExtension(joinParameters, enhancement) } // Return the direct join output. @@ -180,7 +180,7 @@ object JoinUtils { substraitJoinType: JoinRel.JoinType, exchangeTable: Boolean, joinType: JoinType, - joinParameters: Any.Builder, + joinParameters: Any, inputStreamedRelNode: RelNode, inputBuildRelNode: RelNode, inputStreamedOutput: Seq[Attribute], diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala index 3c2214356d7e..ff95e5ea8c22 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.vectorized.ColumnarBatch -import com.google.protobuf.StringValue +import com.google.protobuf.{Any, StringValue} import io.substrait.proto.JoinRel import scala.collection.JavaConverters._ @@ -55,7 +55,7 @@ case class SortMergeJoinExecTransformer( val (bufferedKeys, streamedKeys, bufferedPlan, streamedPlan) = (rightKeys, leftKeys, right, left) - override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator + override def stringArgs: Iterator[scala.Any] = super.stringArgs.toSeq.dropRight(1).iterator override def simpleStringWithNodeId(): String = { val opId = ExplainUtils.getOpId(this) @@ -176,7 +176,7 @@ case class SortMergeJoinExecTransformer( override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetricsUpdater(metrics) - def genJoinParametersBuilder(): com.google.protobuf.Any.Builder = { + def genJoinParameters(): Any = { val (isSMJ, isNullAwareAntiJoin) = (1, 0) // Start with "JoinParameters:" val joinParametersStr = new StringBuffer("JoinParameters:") @@ -196,9 +196,7 @@ case class SortMergeJoinExecTransformer( .newBuilder() .setValue(joinParametersStr.toString) .build() - com.google.protobuf.Any.newBuilder - .setValue(message.toByteString) - .setTypeUrl("/google.protobuf.StringValue") + BackendsApiManager.getTransformerApiInstance.getPackMessage(message) } // Direct output order of substrait join operation @@ -235,7 +233,7 @@ case class SortMergeJoinExecTransformer( substraitJoinType, false, joinType, - genJoinParametersBuilder(), + genJoinParameters(), null, null, streamedPlan.output, @@ -298,7 +296,7 @@ case class SortMergeJoinExecTransformer( substraitJoinType, false, joinType, - genJoinParametersBuilder(), + genJoinParameters(), inputStreamedRelNode, inputBuildRelNode, inputStreamedOutput, diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala index a2e4160bfaf3..f92cd77e3137 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala @@ -84,7 +84,7 @@ case class WindowExecTransformer( override def outputPartitioning: Partitioning = child.outputPartitioning - def genWindowParametersBuilder(): com.google.protobuf.Any.Builder = { + def genWindowParameters(): Any = { // Start with "WindowParameters:" val windowParametersStr = new StringBuffer("WindowParameters:") // isStreaming: 1 for streaming, 0 for sort @@ -99,9 +99,7 @@ case class WindowExecTransformer( .newBuilder() .setValue(windowParametersStr.toString) .build() - com.google.protobuf.Any.newBuilder - .setValue(message.toByteString) - .setTypeUrl("/google.protobuf.StringValue") + BackendsApiManager.getTransformerApiInstance.getPackMessage(message) } def getRelNode( @@ -157,7 +155,7 @@ case class WindowExecTransformer( }.asJava if (!validation) { val extensionNode = - ExtensionBuilder.makeAdvancedExtension(genWindowParametersBuilder.build(), null) + ExtensionBuilder.makeAdvancedExtension(genWindowParameters(), null) RelBuilder.makeWindowRel( input, windowExpressions,