Skip to content

Commit

Permalink
[GLUTEN-7541][VL] Improve HLLRewriteRule for Velox (apache#7543)
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer authored Oct 18, 2024
1 parent 6ec9037 commit 2721484
Showing 1 changed file with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,34 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, HyperLogLogPlusPlus}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXPRESSION}
import org.apache.spark.sql.types._

case class HLLRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = LogicalPlanSelector.maybe(spark, plan) {
plan.resolveOperatorsUp {
plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) {
case a: Aggregate =>
a.transformExpressions {
case hllExpr @ AggregateExpression(hll: HyperLogLogPlusPlus, _, _, _, _)
a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
case aggExpr @ AggregateExpression(hll: HyperLogLogPlusPlus, _, _, _, _)
if GlutenConfig.getConf.enableNativeHyperLogLogAggregateFunction &&
GlutenConfig.getConf.enableColumnarHashAgg &&
isDataTypeSupported(hll.child.dataType) =>
AggregateExpression(
HLLAdapter(
hll.child,
Literal(hll.relativeSD),
hll.mutableAggBufferOffset,
hll.inputAggBufferOffset),
hllExpr.mode,
hllExpr.isDistinct,
hllExpr.filter,
hllExpr.resultId
)
isSupportedDataType(hll.child.dataType) =>
val hllAdapter = HLLAdapter(
hll.child,
Literal(hll.relativeSD),
hll.mutableAggBufferOffset,
hll.inputAggBufferOffset)

aggExpr.copy(aggregateFunction = hllAdapter)
}
}
}

private def isDataTypeSupported(dataType: DataType): Boolean = {
// HLL in velox only supports below data types. we should not offload HLL to velox, if
private def isSupportedDataType(dataType: DataType): Boolean = {
// HLL in Velox only supports below data types. We should not offload HLL to velox, if
// child's data type is not supported. This prevents the case only partail agg is fallbacked.
// As spark and velox have different HLL binary formats, HLL binary generated by spark can't
// be parsed by velox, it would cause the error: 'Unexpected type of HLL'.
// As Spark and Velox have different HLL binary formats, HLL binary generated by Spark can't
// be parsed by Velox, it would cause the error: 'Unexpected type of HLL'.
dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
StringType | _: CharType | _: DecimalType | DateType | TimestampType | BinaryType =>
Expand Down

0 comments on commit 2721484

Please sign in to comment.