Skip to content

Commit

Permalink
[CORE][VL][CH] Make Iceberg code implement component API (#8192)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Dec 10, 2024
1 parent a24183f commit f37f459
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 66 deletions.
10 changes: 10 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main-iceberg/resource</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -318,6 +323,11 @@
</dependencies>

<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.component.CHIcebergComponent
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.component

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.execution.OffloadIcebergScan
import org.apache.gluten.extension.injector.Injector

class CHIcebergComponent extends Component {
override def name(): String = "clickhouse-iceberg"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("ClickHouseIceberg", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
OffloadIcebergScan.inject(injector)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, Heu
import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers}
import org.apache.gluten.extension.columnar.rewrite._
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser}
Expand Down Expand Up @@ -84,20 +83,13 @@ object CHRuleApi {
// Legacy: The legacy transform rule.
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.fallbackByNativeValidation(offloads)
.build()
Validators.newValidator(conf, offloads)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
injector.injectTransform(
c => intercept(HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads)))
c =>
intercept(
HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads)))

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
Expand Down
8 changes: 6 additions & 2 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
<name>Gluten Backends Velox</name>

<properties>
<resource.dir>${project.basedir}/src/main/resources</resource.dir>
<cpp.build.dir>../cpp/build/</cpp.build.dir>
<cpp.releases.dir>${cpp.build.dir}/releases/</cpp.releases.dir>
</properties>
Expand Down Expand Up @@ -61,6 +60,11 @@
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main-iceberg/resource</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -258,7 +262,7 @@
<targetPath>${platform}/${arch}</targetPath>
</resource>
<resource>
<directory>${resource.dir}</directory>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.component.VeloxIcebergComponent
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
package org.apache.gluten.component
import org.apache.gluten.backendsapi.velox.VeloxBackend
import org.apache.gluten.execution.OffloadIcebergScan
import org.apache.gluten.extension.injector.Injector

class IcebergTransformerProvider extends DataSourceScanTransformerRegister {

override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"

override def createDataSourceV2Transformer(
batchScan: BatchScanExec): BatchScanExecTransformerBase = {
IcebergScanTransformer(batchScan)
class VeloxIcebergComponent extends Component {
override def name(): String = "velox-iceberg"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("VeloxIceberg", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
OffloadIcebergScan.inject(injector)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, Heu
import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers}
import org.apache.gluten.extension.columnar.rewrite._
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -76,20 +75,11 @@ object VeloxRuleApi {
// Legacy: The legacy transform rule.
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.fallbackByNativeValidation(offloads)
.build()
Validators.newValidator(conf, offloads)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
injector.injectTransform(
c => HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads))
c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads))

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => UnionTransformerRule())
Expand Down Expand Up @@ -132,16 +122,7 @@ object VeloxRuleApi {
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))

// Gluten RAS: The RAS rule.
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.build()
val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
injector.injectCoster(_ => LegacyCoster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import scala.collection.mutable
* should be placed to Gluten's classpath with a Java service file. Gluten will discover all the
* component implementations then register them at the booting time.
*
* Experimental: This is not expected to be used in production yet. Use [[Backend]] instead.
* Experimental: This is not expected to be used in production yet. Use
* [[org.apache.gluten.backend.Backend]] instead.
*/
@Experimental
trait Component {
Expand Down Expand Up @@ -180,6 +181,9 @@ object Component {

dependencies.foreach {
case (uid, dependencyCompClass) =>
require(
registry.isClassRegistered(dependencyCompClass),
s"Dependency class not registered yet: ${dependencyCompClass.getName}")
val dependencyUid = registry.findByClass(dependencyCompClass).uid
require(uid != dependencyUid)
require(lookup.contains(uid))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.Injector
import org.apache.gluten.extension.util.AdaptiveContext
import org.apache.gluten.logging.LogLevelUtil

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -64,7 +65,39 @@ object HeuristicTransform {
new HeuristicTransform(all)
}

case class Single(
/**
* A simple heuristic transform rule with a validator and some offload rules.
*
* Validator will be called before applying the offload rules.
*/
case class Simple(validator: Validator, offloadRules: Seq[OffloadSingleNode])
extends Rule[SparkPlan]
with Logging {
override def apply(plan: SparkPlan): SparkPlan = {
offloadRules.foldLeft(plan) {
case (p, rule) =>
p.transformUp {
node =>
validator.validate(node) match {
case Validator.Passed =>
rule.offload(node)
case Validator.Failed(reason) =>
logDebug(s"Validation failed by reason: $reason on query plan: ${node.nodeName}")
node
}
}
}
}
}

/**
* A heuristic transform rule with given rewrite rules. Fallback tags will be used in the
* procedure to determine which part of the plan is or is not eligible to be offloaded. The tags
* should also be correctly handled in the offload rules.
*
* TODO: Handle tags internally. Remove tag handling code in user offload rules.
*/
case class WithRewrites(
validator: Validator,
rewriteRules: Seq[RewriteSingleNode],
offloadRules: Seq[OffloadSingleNode])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object Validator {
def builder(): Builder = Builder()

class Builder private {
import Builder._
private val buffer: ListBuffer[Validator] = mutable.ListBuffer()

/** Add a custom validator to pipeline. */
Expand All @@ -69,6 +70,14 @@ object Validator {
p.validators.flatMap(flatten)
case other => Seq(other)
}
}

private object Builder {
def apply(): Builder = new Builder()

private object NoopValidator extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = pass()
}

private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator {
assert(!validators.exists(_.isInstanceOf[ValidatorPipeline]))
Expand All @@ -85,14 +94,6 @@ object Validator {
finalOut
}
}

private object NoopValidator extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = pass()
}
}

private object Builder {
def apply(): Builder = new Builder()
}

implicit class ValidatorImplicits(v: Validator) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ case class IcebergScanTransformer(
commonPartitionValues = commonPartitionValues
) {

protected[this] def supportsBatchScan(scan: Scan): Boolean = {
IcebergScanTransformer.supportsBatchScan(scan)
}

override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty)

override lazy val getPartitionSchema: StructType =
Expand Down Expand Up @@ -94,4 +98,8 @@ object IcebergScanTransformer {
commonPartitionValues = SparkShimLoader.getSparkShims.getCommonPartitionValues(batchScan)
)
}

def supportsBatchScan(scan: Scan): Boolean = {
scan.getClass.getName == "org.apache.iceberg.spark.source.SparkBatchQueryScan"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

case class OffloadIcebergScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) =>
IcebergScanTransformer(scan)
case other => other
}
}

object OffloadIcebergScan {
def inject(injector: Injector): Unit = {
// Inject legacy rule.
injector.gluten.legacy.injectTransform {
c =>
val offload = Seq(OffloadIcebergScan())
HeuristicTransform.Simple(
Validators.newValidator(c.glutenConf, offload),
offload
)
}

// Inject RAS rule.
injector.gluten.ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[BatchScanExec](OffloadIcebergScan()),
Validators.newValidator(c.glutenConf),
Nil)
}
}
}
Loading

0 comments on commit f37f459

Please sign in to comment.