Skip to content

Commit

Permalink
Merge branch 'main' into wip-wait-spill
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Aug 21, 2024
2 parents e43fe4b + aab550f commit b79cd95
Show file tree
Hide file tree
Showing 60 changed files with 3,574 additions and 1,011 deletions.
18 changes: 11 additions & 7 deletions .github/workflows/clickhouse_be_trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@ on:
jobs:
add-comment:
runs-on: ubuntu-latest
permissions: write-all
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Sleep for Dev PR workflow done
run: |
sleep 15
- name: Add comment to PR
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
COMMENT="Run Gluten Clickhouse CI"
URL=$(jq -r .pull_request.comments_url "$GITHUB_EVENT_PATH")
curl -H "Authorization: token ${GITHUB_TOKEN}" -X POST -d "{\"body\":\"$COMMENT\"}" "${URL}"
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.payload.number,
body: "Run Gluten Clickhouse CI"
});
189 changes: 80 additions & 109 deletions .github/workflows/velox_docker.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CHBackend extends Backend {
override def validatorApi(): ValidatorApi = new CHValidatorApi
override def metricsApi(): MetricsApi = new CHMetricsApi
override def listenerApi(): ListenerApi = new CHListenerApi
override def ruleApi(): RuleApi = new CHRuleApi
override def settings(): BackendSettingsApi = CHBackendSettings
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides}
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
import org.apache.spark.util.SparkPlanRules

class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
}
}

private object CHRuleApi {
def injectSpark(injector: SparkInjector): Unit = {
// Regular Spark rules.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
injector.injectParser(
(spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
injector.injectResolutionRule(
spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf))
injector.injectResolutionRule(
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
injector.injectOptimizerRule(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf))
injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)
}

def injectLegacy(injector: LegacyInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectTransform(_ => FallbackEmptySchemaRelation())
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))
injector.injectTransform(_ => RewriteSparkPlanRulesManager())
injector.injectTransform(_ => AddFallbackTagRule())
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
injector.injectTransform(c => RewriteSortMergeJoinToHashJoinRule.apply(c.session))
injector.injectTransform(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan()))

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectTransform(
c => SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

def injectRas(injector: RasInjector): Unit = {
// CH backend doesn't work with RAS at the moment. Inject a rule that aborts any
// execution calls.
injector.inject(
_ =>
new SparkPlanRules.AbortRule(
"Clickhouse backend doesn't yet have RAS support, please try disabling RAS and" +
" rerunning the application"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.extension.{CommonSubexpressionEliminateRule, CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteDateTimestampComparisonRule, RewriteSortMergeJoinToHashJoinRule, RewriteToDateExpresstionRule}
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.parser.GlutenClickhouseSqlParser
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
Expand All @@ -36,18 +34,13 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
Expand Down Expand Up @@ -549,82 +542,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
ClickHouseBuildSideRelation(mode, newOutput, batches.flatten, rowCount, newBuildKeys)
}

/**
* Generate extended DataSourceV2 Strategies. Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedDataSourceV2Strategies(): List[SparkSession => Strategy] = {
List.empty
}

/**
* Generate extended query stage preparation rules.
*
* @return
*/
override def genExtendedQueryStagePrepRules(): List[SparkSession => Rule[SparkPlan]] = {
List(spark => FallbackBroadcastHashJoinPrepQueryStage(spark))
}

/**
* Generate extended Analyzers. Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new RewriteToDateExpresstionRule(spark, spark.sessionState.conf),
spark => new RewriteDateTimestampComparisonRule(spark, spark.sessionState.conf))
}

/**
* Generate extended Optimizers.
*
* @return
*/
override def genExtendedOptimizers(): List[SparkSession => Rule[LogicalPlan]] = {
List(
spark => new CommonSubexpressionEliminateRule(spark, spark.sessionState.conf),
spark => CHAggregateFunctionRewriteRule(spark),
_ => CountDistinctWithoutExpand,
_ => EqualToRewrite
)
}

/**
* Generate extended columnar pre-rules, in the validation phase.
*
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => FallbackBroadcastHashJoin(spark))

/**
* Generate extended columnar pre-rules.
*
* @return
*/
override def genExtendedColumnarTransformRules(): List[SparkSession => Rule[SparkPlan]] =
List(spark => RewriteSortMergeJoinToHashJoinRule(spark))

override def genInjectPostHocResolutionRules(): List[SparkSession => Rule[LogicalPlan]] = {
List()
}

/**
* Generate extended Strategies.
*
* @return
*/
override def genExtendedStrategies(): List[SparkSession => Strategy] =
List()

override def genInjectExtendedParser()
: List[(SparkSession, ParserInterface) => ParserInterface] = {
List((spark, parserInterface) => new GlutenClickhouseSqlParser(spark, parserInterface))
}

/** Define backend specfic expression mappings. */
override def extraExpressionMappings: Seq[Sig] = {
List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class VeloxBackend extends Backend {
override def validatorApi(): ValidatorApi = new VeloxValidatorApi
override def metricsApi(): MetricsApi = new VeloxMetricsApi
override def listenerApi(): ListenerApi = new VeloxListenerApi
override def ruleApi(): RuleApi = new VeloxRuleApi
override def settings(): BackendSettingsApi = VeloxBackendSettings
}

Expand Down
Loading

0 comments on commit b79cd95

Please sign in to comment.