Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Unify utility class names with suffix Util #7020

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtil
import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -110,7 +110,7 @@ object OptimizeTableCommandOverwrites extends Logging {
description.primaryKeyOption,
description.partitionColumns,
description.partList,
ConverterUtils.convertNamedStructJson(description.tableSchema),
ConverterUtil.convertNamedStructJson(description.tableSchema),
description.clickhouseTableConfigs,
description.tableSchema.toAttributes
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtil
import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -110,7 +110,7 @@ object OptimizeTableCommandOverwrites extends Logging {
description.primaryKeyOption,
description.partitionColumns,
description.partList,
ConverterUtils.convertNamedStructJson(description.tableSchema),
ConverterUtil.convertNamedStructJson(description.tableSchema),
description.clickhouseTableConfigs,
description.tableSchema.toAttributes
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta.commands

import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtil
import org.apache.gluten.memory.CHThreadGroup
import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -112,7 +112,7 @@ object OptimizeTableCommandOverwrites extends Logging {
description.primaryKeyOption,
description.partitionColumns,
description.partList,
ConverterUtils.convertNamedStructJson(description.tableSchema),
ConverterUtil.convertNamedStructJson(description.tableSchema),
description.clickhouseTableConfigs,
DataTypeUtils.toAttributes(description.tableSchema)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.gluten.execution.BroadCastHashJoinContext;
import org.apache.gluten.execution.JoinTypeTransform;
import org.apache.gluten.expression.ConverterUtils;
import org.apache.gluten.expression.ConverterUtils$;
import org.apache.gluten.expression.ConverterUtil;
import org.apache.gluten.expression.ConverterUtil$;
import org.apache.gluten.substrait.type.TypeNode;
import org.apache.gluten.utils.SubstraitUtil;

Expand Down Expand Up @@ -58,7 +58,7 @@ public static long build(
BroadCastHashJoinContext broadCastContext,
List<Expression> newBuildKeys,
List<Attribute> newOutput) {
ConverterUtils$ converter = ConverterUtils$.MODULE$;
ConverterUtil$ converter = ConverterUtil$.MODULE$;
List<Expression> keys;
List<Attribute> output;
if (newBuildKeys.isEmpty()) {
Expand Down Expand Up @@ -99,8 +99,8 @@ public static long build(

/** create table named struct */
private static NamedStruct toNameStruct(List<Attribute> output) {
List<TypeNode> typeList = ConverterUtils.collectAttributeTypeNodes(output);
List<String> nameList = ConverterUtils.collectAttributeNamesWithExprId(output);
List<TypeNode> typeList = ConverterUtil.collectAttributeTypeNodes(output);
List<String> nameList = ConverterUtil.collectAttributeNamesWithExprId(output);
Type.Struct.Builder structBuilder = Type.Struct.newBuilder();
for (TypeNode typeNode : typeList) {
structBuilder.addTypes(typeNode.toProtobuf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtil
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
import org.apache.gluten.sql.shims.SparkShimLoader
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.lang.{Long => JLong}
Expand Down Expand Up @@ -116,7 +116,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
scan: BasicScanExecTransformer): Unit = {
if (scan.fileFormat == ReadFileFormat.TextReadFormat) {
val names =
ConverterUtils.collectAttributeNamesWithoutExprId(scan.outputAttributes())
ConverterUtil.collectAttributeNamesWithoutExprId(scan.outputAttributes())
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala.toSeq))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
WindowFunctionsBuilder.create(args, aggWindowFunc).toInt,
new JArrayList[ExpressionNode](),
columnName,
ConverterUtils.getTypeNode(aggWindowFunc.dataType, aggWindowFunc.nullable),
ConverterUtil.getTypeNode(aggWindowFunc.dataType, aggWindowFunc.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
Expand All @@ -719,7 +719,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
CHExpressions.createAggregateFunction(args, aggExpression.aggregateFunction).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(aggExpression.dataType, aggExpression.nullable),
ConverterUtil.getTypeNode(aggExpression.dataType, aggExpression.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
Expand Down Expand Up @@ -769,7 +769,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
WindowFunctionsBuilder.create(args, offsetWf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(offsetWf.dataType, offsetWf.nullable),
ConverterUtil.getTypeNode(offsetWf.dataType, offsetWf.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
Expand All @@ -785,7 +785,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
WindowFunctionsBuilder.create(args, wf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(wf.dataType, wf.nullable),
ConverterUtil.getTypeNode(wf.dataType, wf.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.backendsapi.TransformerApi
import org.apache.gluten.execution.CHHashAggregateExecTransformer
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtil
import org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBuilder, ExpressionNode}
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}

Expand Down Expand Up @@ -209,17 +209,17 @@ class CHTransformerApi extends TransformerApi with Logging {
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val functionId = ExpressionBuilder.newScalarFunction(
functionMap,
ConverterUtils.makeFuncName(
ConverterUtil.makeFuncName(
substraitExprName,
Seq(dataType, BooleanType),
ConverterUtils.FunctionConfig.OPT))
ConverterUtil.FunctionConfig.OPT))

// Just make a fake toType value, because native engine cannot accept datatype itself.
val toTypeNodes =
ExpressionBuilder.makeDecimalLiteral(new Decimal().set(0, dataType.precision, dataType.scale))
val expressionNodes =
Lists.newArrayList(childNode, new BooleanLiteralNode(nullOnOverflow), toTypeNodes)
val typeNode = ConverterUtils.getTypeNode(dataType, nullable)
val typeNode = ConverterUtil.getTypeNode(dataType, nullable)
ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object CHHashAggregateExecTransformer {
def getAggregateResultAttributes(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression]): Seq[Attribute] = {
groupingExpressions.map(ConverterUtils.getAttrFromExpr(_).toAttribute) ++ aggregateExpressions
groupingExpressions.map(ConverterUtil.getAttrFromExpr(_).toAttribute) ++ aggregateExpressions
.map(_.resultAttribute)
}

Expand Down Expand Up @@ -115,9 +115,9 @@ case class CHHashAggregateExecTransformer(
// When there is no aggregate function or there is complete mode, it does not need
// to handle outputs according to the AggregateMode
for (attr <- child.output) {
typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
nameList.add(ConverterUtils.genColumnNameWithExprId(attr))
nameList.addAll(ConverterUtils.collectStructFieldNames(attr.dataType))
typeList.add(ConverterUtil.getTypeNode(attr.dataType, attr.nullable))
nameList.add(ConverterUtil.genColumnNameWithExprId(attr))
nameList.addAll(ConverterUtil.collectStructFieldNames(attr.dataType))
}
(child.output, output)
} else if (!modes.contains(Partial)) {
Expand All @@ -132,17 +132,17 @@ case class CHHashAggregateExecTransformer(
nameList.add(colName)
val (dataType, nullable) =
getIntermediateAggregateResultType(attr, aggregateExpressions)
nameList.addAll(ConverterUtils.collectStructFieldNames(dataType))
typeList.add(ConverterUtils.getTypeNode(dataType, nullable))
nameList.addAll(ConverterUtil.collectStructFieldNames(dataType))
typeList.add(ConverterUtil.getTypeNode(dataType, nullable))
resultAttrIndex += 1
}
(aggregateResultAttributes, output)
} else {
// partial mode
for (attr <- child.output) {
typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
nameList.add(ConverterUtils.genColumnNameWithExprId(attr))
nameList.addAll(ConverterUtils.collectStructFieldNames(attr.dataType))
typeList.add(ConverterUtil.getTypeNode(attr.dataType, attr.nullable))
nameList.add(ConverterUtil.genColumnNameWithExprId(attr))
nameList.addAll(ConverterUtil.collectStructFieldNames(attr.dataType))
}

(child.output, aggregateResultAttributes)
Expand Down Expand Up @@ -253,7 +253,7 @@ case class CHHashAggregateExecTransformer(
CHExpressions.createAggregateFunction(args, aggregateFunc),
childrenNodeList,
modeToKeyWord(aggExpr.mode),
ConverterUtils.getTypeNode(aggregateFunc.dataType, aggregateFunc.nullable)
ConverterUtil.getTypeNode(aggregateFunc.dataType, aggregateFunc.nullable)
)
aggregateFunctionList.add(aggFunctionNode)
})
Expand Down Expand Up @@ -281,7 +281,7 @@ case class CHHashAggregateExecTransformer(
aggExpressions: Seq[AggregateExpression]): String = {
val resultAttr = aggResultAttributes(columnIndex)
if (columnIndex < groupingExprs.length) {
ConverterUtils.genColumnNameWithExprId(resultAttr)
ConverterUtil.genColumnNameWithExprId(resultAttr)
} else {
val aggExpr = aggExpressions(columnIndex - groupingExprs.length)
val aggregateFunc = aggExpr.aggregateFunction
Expand All @@ -297,7 +297,7 @@ case class CHHashAggregateExecTransformer(
} else {
AggregateFunctionsBuilder.getSubstraitFunctionName(aggregateFunc)
}
ConverterUtils.genColumnNameWithExprId(resultAttr) + "#Partial#" + aggFunctionName
ConverterUtil.genColumnNameWithExprId(resultAttr) + "#Partial#" + aggFunctionName
}
}

Expand Down Expand Up @@ -396,7 +396,7 @@ case class CHHashAggregateExecTransformer(
val enhancement = if (validation) {
// Use a extension node to send the input types through Substrait plan for validation.
val inputTypeNodeList = originalInputAttributes
.map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
.map(attr => ConverterUtil.getTypeNode(attr.dataType, attr.nullable))
.asJava
Any.pack(TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)
} else {
Expand Down Expand Up @@ -454,7 +454,7 @@ case class CHHashAggregateExecPullOutHelper(
case Partial | PartialMerge =>
val aggBufferAttr = aggregateFunc.inputAggBufferAttributes
for (index <- aggBufferAttr.indices) {
val attr = ConverterUtils.getAttrFromExpr(aggBufferAttr(index))
val attr = ConverterUtil.getAttrFromExpr(aggBufferAttr(index))
aggregateAttr += attr
}
resIndex += aggBufferAttr.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.expression

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.expression.ConverterUtil.FunctionConfig
import org.apache.gluten.substrait.expression._

import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -92,7 +92,7 @@ case class CHTruncTimestampTransformer(

val functionId = ExpressionBuilder.newScalarFunction(
functionMap,
ConverterUtils.makeFuncName(substraitExprName, dataTypes))
ConverterUtil.makeFuncName(substraitExprName, dataTypes))

val expressionNodes = new java.util.ArrayList[ExpressionNode]()
expressionNodes.add(lowerFormatNode)
Expand All @@ -101,7 +101,7 @@ case class CHTruncTimestampTransformer(
expressionNodes.add(ExpressionBuilder.makeStringLiteral(timeZoneId.get))
}

val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
val typeNode = ConverterUtil.getTypeNode(original.dataType, original.nullable)
ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode)
}
}
Expand Down Expand Up @@ -148,7 +148,7 @@ case class CHPosExplodeTransformer(
val funcMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val funcId = ExpressionBuilder.newScalarFunction(
funcMap,
ConverterUtils.makeFuncName(
ConverterUtil.makeFuncName(
ExpressionNames.POSEXPLODE,
Seq(original.child.dataType),
FunctionConfig.OPT))
Expand All @@ -163,7 +163,7 @@ case class CHPosExplodeTransformer(
ExpressionBuilder.makeScalarFunction(
funcId,
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
ConverterUtil.getTypeNode(structType, false))
case m: MapType =>
// Output (pos, key, value) when input is map type
val structType = StructType(
Expand All @@ -174,7 +174,7 @@ case class CHPosExplodeTransformer(
ExpressionBuilder.makeScalarFunction(
funcId,
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
ConverterUtil.getTypeNode(structType, false))
case _ =>
throw new GlutenNotSupportException(s"posexplode($childType) not supported yet.")
}
Expand Down Expand Up @@ -221,7 +221,7 @@ case class GetArrayItemTransformer(
// In Spark, the index of getarrayitem starts from 0
// But in CH, the index of arrayElement starts from 1, besides index argument must
// So we need to do transform: rightNode = add(rightNode, 1)
val addFunctionName = ConverterUtils.makeFuncName(
val addFunctionName = ConverterUtil.makeFuncName(
ExpressionNames.ADD,
Seq(IntegerType, getArrayItem.right.dataType),
FunctionConfig.OPT)
Expand All @@ -230,16 +230,16 @@ case class GetArrayItemTransformer(
rightNode = ExpressionBuilder.makeScalarFunction(
addFunctionId,
Lists.newArrayList(literalNode, rightNode),
ConverterUtils.getTypeNode(getArrayItem.right.dataType, getArrayItem.right.nullable))
ConverterUtil.getTypeNode(getArrayItem.right.dataType, getArrayItem.right.nullable))

val functionName = ConverterUtils.makeFuncName(
val functionName = ConverterUtil.makeFuncName(
substraitExprName,
Seq(getArrayItem.left.dataType, getArrayItem.right.dataType),
FunctionConfig.OPT)
val exprNodes = Lists.newArrayList(leftNode, rightNode)
ExpressionBuilder.makeScalarFunction(
ExpressionBuilder.newScalarFunction(functionMap, functionName),
exprNodes,
ConverterUtils.getTypeNode(getArrayItem.dataType, getArrayItem.nullable))
ConverterUtil.getTypeNode(getArrayItem.dataType, getArrayItem.nullable))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.expression

import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.expression.ConverterUtil.FunctionConfig
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.substrait.expression.ExpressionBuilder

Expand All @@ -37,7 +37,7 @@ object CHExpressions {
assert(substraitAggFuncName.isDefined)
return ExpressionBuilder.newScalarFunction(
functionMap,
ConverterUtils.makeFuncName(substraitAggFuncName.get, inputTypes, FunctionConfig.REQ))
ConverterUtil.makeFuncName(substraitAggFuncName.get, inputTypes, FunctionConfig.REQ))
}

AggregateFunctionsBuilder.create(args, aggregateFunc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper

class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
extends MetricsUpdater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper

/**
* Note: "val metrics" is made transient to avoid sending driver-side metrics to tasks, e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper

class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
extends MetricsUpdater {
Expand Down
Loading
Loading