Skip to content

Commit

Permalink
add ProjectTransformerRegister
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun committed Sep 10, 2024
1 parent eb67ee7 commit 1379afd
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.SparkPlan

case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
extends ProjectExecTransformerBase(projectList, child) {

override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.SparkPlan

case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
extends ProjectExecTransformerBase(projectList, child) {

override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.SparkPlan

case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
extends ProjectExecTransformerBase(projectList, child) {

override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,79 +18,35 @@ package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.`type`.TypeBuilder
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen, Expression, NamedExpression, NullIntolerant, PredicateHelper, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen, NamedExpression}
import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.execution.{ExplainUtils, OrderPreservingNodeShim, PartitioningPreservingNodeShim, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric

import scala.collection.JavaConverters._
import scala.collection.mutable

case class DeltaProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryTransformSupport
with OrderPreservingNodeShim
with PartitioningPreservingNodeShim
with PredicateHelper
with Logging {
case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
extends ProjectExecTransformerBase(projectList, child) {

private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetrics(sparkContext)

override protected def doValidateInternal(): ValidationResult = {
val substraitContext = new SubstraitContext
// Firstly, need to check if the Substrait plan for this operator can be successfully generated.
val operatorId = substraitContext.nextOperatorId(this.nodeName)
val relNode =
getRelNode(substraitContext, projectList, child.output, operatorId, null, validation = true)
// Then, validate the generated plan in native engine.
doNativeValidation(substraitContext, relNode)
}

override def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
case _ => false
}

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
metrics,
extraMetrics.toSeq)

override def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if ((projectList == null || projectList.isEmpty) && childCtx != null) {
// The computing for this project is not needed.
// the child may be an input adapter and childCtx is null. In this case we want to
// make a read node with non-empty base_schema.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}

val currRel =
getRelNode(context, projectList, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Project Rel should be valid")
TransformContext(childCtx.outputAttributes, output, currRel)
}

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering

override protected def outputExpressions: Seq[NamedExpression] = projectList

def getRelNode(
override def getRelNode(
context: SubstraitContext,
projectList: Seq[NamedExpression],
originalInputAttributes: Seq[Attribute],
Expand Down Expand Up @@ -123,14 +79,6 @@ case class DeltaProjectExecTransformer private (projectList: Seq[NamedExpression
}
}

override def verboseStringWithOperatorId(): String = {
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Output", projectList)}
|${ExplainUtils.generateFieldString("Input", child.output)}
|""".stripMargin
}

override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.execution.DeltaFilterTransformerProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.execution.DeltaProjectTransformerProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.ProjectExec

class DeltaProjectTransformerProvider extends ProjectTransformerRegister {

override val dataLakeClass: String = "delta"

override def createProjectTransformer(projectExec: ProjectExec): ProjectExecTransformerBase = {
DeltaProjectExecTransformer(projectExec.projectList, projectExec.child)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ object FilterExecTransformerBase {
}
}

case class ProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan)
abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val input: SparkPlan)
extends UnaryTransformSupport
with OrderPreservingNodeShim
with PartitioningPreservingNodeShim
Expand All @@ -176,7 +176,7 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
// Firstly, need to check if the Substrait plan for this operator can be successfully generated.
val operatorId = substraitContext.nextOperatorId(this.nodeName)
val relNode =
getRelNode(substraitContext, projectList, child.output, operatorId, null, validation = true)
getRelNode(substraitContext, list, child.output, operatorId, null, validation = true)
// Then, validate the generated plan in native engine.
doNativeValidation(substraitContext, relNode)
}
Expand All @@ -192,7 +192,7 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
override def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if ((projectList == null || projectList.isEmpty) && childCtx != null) {
if ((list == null || list.isEmpty) && childCtx != null) {
// The computing for this project is not needed.
// the child may be an input adapter and childCtx is null. In this case we want to
// make a read node with non-empty base_schema.
Expand All @@ -201,16 +201,16 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
}

val currRel =
getRelNode(context, projectList, child.output, operatorId, childCtx.root, validation = false)
getRelNode(context, list, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Project Rel should be valid")
TransformContext(childCtx.outputAttributes, output, currRel)
}

override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def output: Seq[Attribute] = list.map(_.toAttribute)

override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering

override protected def outputExpressions: Seq[NamedExpression] = projectList
override protected def outputExpressions: Seq[NamedExpression] = list

def getRelNode(
context: SubstraitContext,
Expand Down Expand Up @@ -247,23 +247,10 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
override def verboseStringWithOperatorId(): String = {
s"""
|$formattedNodeName
|${ExplainUtils.generateFieldString("Output", projectList)}
|${ExplainUtils.generateFieldString("Output", list)}
|${ExplainUtils.generateFieldString("Input", child.output)}
|""".stripMargin
}

override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
copy(child = newChild)
}
object ProjectExecTransformer {
def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = {
BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList, child)
}

// Directly creating a project transformer may not be considered safe since some backends, E.g.,
// Clickhouse may require to intercept the instantiation procedure.
def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer =
new ProjectExecTransformer(projectList, child)
}

// An alternatives for UnionExec.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.SparkPlan

case class ProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
extends ProjectExecTransformerBase(projectList, child) {

override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
copy(child = newChild)
}

object ProjectExecTransformer {

// Directly creating a project transformer may not be considered safe since some backends, E.g.,
// Clickhouse may require to intercept the instantiation procedure.
def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer =
new ProjectExecTransformer(projectList, child)
}
Loading

0 comments on commit 1379afd

Please sign in to comment.