diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java index 3322afe0b9d5..d0ff2ba466d9 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java @@ -26,7 +26,6 @@ import io.glutenproject.substrait.plan.PlanBuilder; import io.glutenproject.substrait.plan.PlanNode; -import com.google.protobuf.Any; import org.apache.spark.SparkConf; import org.apache.spark.sql.internal.SQLConf; @@ -72,7 +71,9 @@ public boolean doValidate(byte[] subPlan) { private PlanNode buildNativeConfNode(Map confs) { StringMapNode stringMapNode = ExpressionBuilder.makeStringMap(confs); AdvancedExtensionNode extensionNode = - ExtensionBuilder.makeAdvancedExtension(Any.pack(stringMapNode.toProtobuf())); + ExtensionBuilder.makeAdvancedExtension( + BackendsApiManager.getTransformerApiInstance() + .packPBMessage(stringMapNode.toProtobuf())); return PlanBuilder.makePlan(extensionNode); } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala index 4aa1875bc79b..c420f2225be2 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala @@ -234,5 +234,5 @@ class CHTransformerApi extends TransformerApi with Logging { throw new UnsupportedOperationException("CH backend does not support this method") } - override def getPackMessage(message: Message): Any = Any.pack(message) + override def packPBMessage(message: Message): Any = Any.pack(message) } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala index 64d406b8db4c..7a5c05284abc 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/TransformerApiImpl.scala @@ -96,5 +96,5 @@ class TransformerApiImpl extends TransformerApi with Logging { } } - override def getPackMessage(message: Message): Any = Any.pack(message, "") + override def packPBMessage(message: Message): Any = Any.pack(message, "") } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala index b7b25a170ede..92606ab56398 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala @@ -16,6 +16,7 @@ */ package io.glutenproject.execution +import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression._ import io.glutenproject.expression.ConverterUtils.FunctionConfig import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode} @@ -31,8 +32,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import com.google.protobuf.Any - import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList} @@ -162,7 +161,8 @@ case class HashAggregateExecTransformer( groupingExpressions.size + aggregateExpressions.size) } else { val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, getPartialAggOutTypes).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, getPartialAggOutTypes).toProtobuf)) RelBuilder.makeProjectRel( aggRel, expressionNodes, @@ -435,7 +435,8 @@ case class HashAggregateExecTransformer( .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( inputRel, exprNodes, diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java index d18d0966c156..a762348a6b65 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java @@ -16,7 +16,8 @@ */ package io.glutenproject.substrait.rel; -import com.google.protobuf.Any; +import io.glutenproject.backendsapi.BackendsApiManager; + import com.google.protobuf.StringValue; import io.substrait.proto.ReadRel; @@ -69,7 +70,8 @@ public ReadRel.ExtensionTable toProtobuf() { ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder(); StringValue extensionTable = StringValue.newBuilder().setValue(extensionTableStr.toString()).build(); - extensionTableBuilder.setDetail(Any.pack(extensionTable)); + extensionTableBuilder.setDetail( + BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable)); return extensionTableBuilder.build(); } } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala index d207de8e534f..68184bc729c6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/TransformerApi.scala @@ -74,5 +74,5 @@ trait TransformerApi { def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String - def getPackMessage(message: Message): Any + def packPBMessage(message: Message): Any } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index 5280cf5ffb6a..7d397e7dbf07 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -33,8 +33,6 @@ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV import org.apache.spark.sql.utils.StructTypeFWD import org.apache.spark.sql.vectorized.ColumnarBatch -import com.google.protobuf.Any - import scala.collection.JavaConverters._ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkPlan) @@ -88,7 +86,8 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeFilterRel(input, condExprNode, extensionNode, context, operatorId) } } @@ -221,7 +220,8 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( input, projExprNodeList, diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ExpandExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ExpandExecTransformer.scala index 9879a5877fb3..4d547f771b49 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ExpandExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ExpandExecTransformer.scala @@ -30,8 +30,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ -import com.google.protobuf.Any - import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ @@ -115,7 +113,8 @@ case class ExpandExecTransformer( inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) } val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( input, preExprNodes, @@ -167,7 +166,8 @@ case class ExpandExecTransformer( } val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeExpandRel(input, projectSetExprNodes, extensionNode, context, operatorId) } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala index da9313108bd0..381ae3dcd8fb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala @@ -30,8 +30,6 @@ import io.glutenproject.substrait.rel.{RelBuilder, RelNode} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan -import com.google.protobuf.Any - import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ @@ -164,7 +162,8 @@ case class GenerateExecTransformer( val inputTypeNodeList = inputAttributes.map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)).asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeGenerateRel(input, generator, childOutput, extensionNode, context, operatorId) } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala index 2e5832d8f64c..e925a0d887a1 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/HashAggregateExecBaseTransformer.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate._ import org.apache.spark.sql.types._ -import com.google.protobuf.{Any, StringValue} +import com.google.protobuf.StringValue import java.util.{ArrayList => JArrayList, List => JList} @@ -266,7 +266,8 @@ abstract class HashAggregateExecBaseTransformer( .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( input, preExprNodes, @@ -379,7 +380,8 @@ abstract class HashAggregateExecBaseTransformer( .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( aggRel, resExprNodes, @@ -557,7 +559,8 @@ abstract class HashAggregateExecBaseTransformer( val inputTypeNodeList = originalInputAttributes .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf) } else { null } @@ -568,7 +571,7 @@ abstract class HashAggregateExecBaseTransformer( "0" } val optimization = - BackendsApiManager.getTransformerApiInstance.getPackMessage( + BackendsApiManager.getTransformerApiInstance.packPBMessage( StringValue.newBuilder.setValue(s"isStreaming=$isStreaming\n").build) ExtensionBuilder.makeAdvancedExtension(optimization, enhancement) } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala index 5c257bb1e8c2..27edbaac08eb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala @@ -292,7 +292,7 @@ trait HashJoinLikeExecTransformer .newBuilder() .setValue(joinParametersStr.toString) .build() - BackendsApiManager.getTransformerApiInstance.getPackMessage(message) + BackendsApiManager.getTransformerApiInstance.packPBMessage(message) } def genJoinParametersInternal(): (Int, Int, String) = { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala b/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala index 0412b94e8f2b..b239d2999551 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/JoinUtils.scala @@ -41,7 +41,7 @@ object JoinUtils { // Normally the enhancement node is only used for plan validation. But here the enhancement // is also used in execution phase. In this case an empty typeUrlPrefix need to be passed, // so that it can be correctly parsed into json string on the cpp side. - BackendsApiManager.getTransformerApiInstance.getPackMessage( + BackendsApiManager.getTransformerApiInstance.packPBMessage( TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf) } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/LimitTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/LimitTransformer.scala index a678db0763b4..3d2fb440939b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/LimitTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/LimitTransformer.scala @@ -28,8 +28,6 @@ import io.glutenproject.substrait.rel.{RelBuilder, RelNode} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan -import com.google.protobuf.Any - import scala.collection.JavaConverters._ case class LimitTransformer(child: SparkPlan, offset: Long, count: Long) @@ -81,7 +79,8 @@ case class LimitTransformer(child: SparkPlan, offset: Long, count: Long) val inputTypeNodes = inputAttributes.map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)).asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodes).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodes).toProtobuf)) RelBuilder.makeFetchRel(input, offset, count, extensionNode, context, operatorId) } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/SortExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/SortExecTransformer.scala index 7d9319fa5395..7a04e51efac0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/SortExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/SortExecTransformer.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import com.google.protobuf.Any import io.substrait.proto.SortField import java.util.{ArrayList => JArrayList} @@ -112,7 +111,8 @@ case class SortExecTransformer( } val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( input, projectExpressions, @@ -136,7 +136,8 @@ case class SortExecTransformer( } val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeSortRel(inputRel, sortFieldList, extensionNode, context, operatorId) } @@ -157,7 +158,8 @@ case class SortExecTransformer( } val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel( sortRel, new JArrayList[ExpressionNode](selectOrigins), @@ -195,7 +197,8 @@ case class SortExecTransformer( val inputTypeNodeList = originalInputAttributes.map( attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList.asJava).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList.asJava).toProtobuf)) RelBuilder.makeSortRel(input, sortFieldList.asJava, extensionNode, context, operatorId) } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala index bb61619e277a..b9325a4ea052 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala @@ -193,7 +193,7 @@ case class SortMergeJoinExecTransformer( .newBuilder() .setValue(joinParametersStr.toString) .build() - BackendsApiManager.getTransformerApiInstance.getPackMessage(message) + BackendsApiManager.getTransformerApiInstance.packPBMessage(message) } // Direct output order of substrait join operation diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala index ec2a22d8603f..d69618b19759 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala @@ -97,7 +97,7 @@ case class WindowExecTransformer( .newBuilder() .setValue(windowParametersStr.toString) .build() - BackendsApiManager.getTransformerApiInstance.getPackMessage(message) + BackendsApiManager.getTransformerApiInstance.packPBMessage(message) } def getRelNode( @@ -168,7 +168,8 @@ case class WindowExecTransformer( .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) .asJava val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeWindowRel( input, diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala index 8eaf80e1a600..11835c095b2b 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala @@ -34,8 +34,6 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.EvalPythonExec import org.apache.spark.sql.types.StructType -import com.google.protobuf.Any - import java.util.{ArrayList => JArrayList, List => JList} case class EvalPythonExecTransformer( @@ -120,7 +118,8 @@ case class EvalPythonExecTransformer( inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) } val extensionNode = ExtensionBuilder.makeAdvancedExtension( - Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) RelBuilder.makeProjectRel(input, expressionNodes, extensionNode, context, operatorId, -1) } }