Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/acvictor/gluten into acvict…
Browse files Browse the repository at this point in the history
…or/shuffle
  • Loading branch information
acvictor committed Sep 3, 2024
2 parents 0e8c59f + bb69486 commit 11e817a
Show file tree
Hide file tree
Showing 455 changed files with 3,110 additions and 1,568 deletions.
1 change: 1 addition & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ CORE:
- changed-files:
- any-glob-to-any-file: [
'gluten-core/**/*',
'gluten-substrait/**/*',
'shims/**/*',
'gluten-ras/**/*',
'gluten-ui/**/*',
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/clickhouse_be_trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- 'gluten-celeborn/package/**'
- 'gluten-celeborn/clickhouse/**'
- 'gluten-core/**'
- 'gluten-substrait/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
Expand Down
38 changes: 20 additions & 18 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ on:
- 'gluten-celeborn/velox/**'
- 'gluten-ras/**'
- 'gluten-core/**'
- 'gluten-substrait/**'
- 'gluten-data/**'
- 'gluten-delta/**'
- 'gluten-iceberg/**'
- 'gluten-hudi/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
Expand Down Expand Up @@ -344,16 +346,16 @@ jobs:
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true
# - name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
# run: |
# cd tools/gluten-it \
# && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
# --local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
# --data-gen=skip -m=OffHeapExecutionMemory \
# -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
# -d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
# -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
# -d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true

run-tpc-test-ubuntu-randomkill:
needs: build-native-lib-centos-7
Expand Down Expand Up @@ -567,7 +569,7 @@ jobs:
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg \
-Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \
-Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
if: failure()
Expand Down Expand Up @@ -617,7 +619,7 @@ jobs:
- name: Build and run unit test for Spark 3.2.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta \
$MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark33:
Expand Down Expand Up @@ -667,7 +669,7 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
Expand Down Expand Up @@ -719,7 +721,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
Expand Down Expand Up @@ -770,7 +772,7 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
Expand Down Expand Up @@ -822,7 +824,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -Phudi \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
Expand Down Expand Up @@ -873,7 +875,7 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
- name: Upload golden files
Expand Down Expand Up @@ -975,7 +977,7 @@ jobs:
- name: Build and Run unit test for Spark 3.5.1 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \
$MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
Expand Down
4 changes: 2 additions & 2 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<artifactId>gluten-substrait</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<exclusions>
Expand All @@ -33,7 +33,7 @@
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<artifactId>gluten-substrait</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.gluten.memory;

import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResource;
import org.apache.spark.task.TaskResources;

public class CHThreadGroup implements TaskResource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private static native long nativeBuild(
int joinType,
boolean hasMixedFiltCondition,
boolean isExistenceJoin,
byte[] namedStruct);
byte[] namedStruct,
boolean isNullAwareAntiJoin);

private StorageJoinBuilder() {}

Expand Down Expand Up @@ -94,7 +95,8 @@ public static long build(
joinType,
broadCastContext.hasMixedFiltCondition(),
broadCastContext.isExistenceJoin(),
toNameStruct(output).toByteArray());
toNameStruct(output).toByteArray(),
broadCastContext.isNullAwareAntiJoin());
}

/** create table named struct */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.lang.{Long => JLong}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, JniLibLoader}

import org.apache.spark.{SparkConf, SparkContext}
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.listener.CHGlutenSQLAppStatusListener
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
import org.apache.spark.sql.execution.datasources.v1._
import org.apache.spark.sql.utils.ExpressionUtil
import org.apache.spark.util.SparkDirectoryUtil

import org.apache.commons.lang3.StringUtils
Expand All @@ -42,6 +44,13 @@ class CHListenerApi extends ListenerApi with Logging {
GlutenDriverEndpoint.glutenDriverEndpointRef = (new GlutenDriverEndpoint).self
CHGlutenSQLAppStatusListener.registerListener(sc)
initialize(pc.conf, isDriver = true)

val expressionExtensionTransformer = ExpressionUtil.extendedExpressionTransformer(
pc.conf.get(GlutenConfig.GLUTEN_EXTENDED_EXPRESSION_TRAN_CONF, "")
)
if (expressionExtensionTransformer != null) {
ExpressionExtensionTrait.expressionExtensionTransformer = expressionExtensionTransformer
}
}

override def onDriverShutdown(): Unit = shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
Expand Down Expand Up @@ -558,9 +558,25 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
Sig[CollectList](ExpressionNames.COLLECT_LIST),
Sig[CollectSet](ExpressionNames.COLLECT_SET)
) ++
ExpressionExtensionTrait.expressionExtensionTransformer.expressionSigList ++
SparkShimLoader.getSparkShims.bloomFilterExpressionMappings()
}

/** Define backend-specific expression converter. */
override def extraExpressionConverter(
substraitExprName: String,
expr: Expression,
attributeSeq: Seq[Attribute]): Option[ExpressionTransformer] = expr match {
case e
if ExpressionExtensionTrait.expressionExtensionTransformer.extensionExpressionsMapping
.contains(e.getClass) =>
// Use extended expression transformer to replace custom expression first
Some(
ExpressionExtensionTrait.expressionExtensionTransformer
.replaceWithExtensionExpressionTransformer(substraitExprName, e, attributeSeq))
case _ => None
}

override def genStringTranslateTransformer(
substraitExprName: String,
srcExpr: ExpressionTransformer,
Expand Down Expand Up @@ -700,7 +716,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
.doTransform(args)))

val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
AggregateFunctionsBuilder.create(args, aggExpression.aggregateFunction).toInt,
CHExpressions.createAggregateFunction(args, aggExpression.aggregateFunction).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(aggExpression.dataType, aggExpression.nullable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution.CHHashAggregateExecTransformer.getAggregateResultAttributes
import org.apache.gluten.expression._
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
import org.apache.gluten.substrait.{AggregationParams, SubstraitContext}
import org.apache.gluten.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode}
Expand Down Expand Up @@ -249,7 +250,7 @@ case class CHHashAggregateExecTransformer(
childrenNodeList.add(node)
}
val aggFunctionNode = ExpressionBuilder.makeAggregateFunction(
AggregateFunctionsBuilder.create(args, aggregateFunc),
CHExpressions.createAggregateFunction(args, aggregateFunc),
childrenNodeList,
modeToKeyWord(aggExpr.mode),
ConverterUtils.getTypeNode(aggregateFunc.dataType, aggregateFunc.nullable)
Expand Down Expand Up @@ -286,10 +287,10 @@ case class CHHashAggregateExecTransformer(
val aggregateFunc = aggExpr.aggregateFunction
var aggFunctionName =
if (
ExpressionMappings.expressionExtensionTransformer.extensionExpressionsMapping.contains(
aggregateFunc.getClass)
ExpressionExtensionTrait.expressionExtensionTransformer.extensionExpressionsMapping
.contains(aggregateFunc.getClass)
) {
ExpressionMappings.expressionExtensionTransformer
ExpressionExtensionTrait.expressionExtensionTransformer
.buildCustomAggregateFunction(aggregateFunc)
._1
.get
Expand Down Expand Up @@ -437,10 +438,10 @@ case class CHHashAggregateExecPullOutHelper(
val aggregateFunc = exp.aggregateFunction
// First handle the custom aggregate functions
if (
ExpressionMappings.expressionExtensionTransformer.extensionExpressionsMapping.contains(
ExpressionExtensionTrait.expressionExtensionTransformer.extensionExpressionsMapping.contains(
aggregateFunc.getClass)
) {
ExpressionMappings.expressionExtensionTransformer
ExpressionExtensionTrait.expressionExtensionTransformer
.getAttrsIndexForExtensionAggregateExpr(
aggregateFunc,
exp.mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ case class BroadCastHashJoinContext(
hasMixedFiltCondition: Boolean,
isExistenceJoin: Boolean,
buildSideStructure: Seq[Attribute],
buildHashTableId: String)
buildHashTableId: String,
isNullAwareAntiJoin: Boolean = false)

case class CHBroadcastHashJoinExecTransformer(
leftKeys: Seq[Expression],
Expand Down Expand Up @@ -230,9 +231,6 @@ case class CHBroadcastHashJoinExecTransformer(
if (shouldFallback) {
return ValidationResult.failed("ch join validate fail")
}
if (isNullAwareAntiJoin) {
return ValidationResult.failed("ch does not support NAAJ")
}
super.doValidateInternal()
}

Expand All @@ -256,7 +254,9 @@ case class CHBroadcastHashJoinExecTransformer(
isMixedCondition(condition),
joinType.isInstanceOf[ExistenceJoin],
buildPlan.output,
buildHashTableId)
buildHashTableId,
isNullAwareAntiJoin
)
val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context)
// FIXME: Do we have to make build side a RDD?
streamedRDD :+ broadcastRDD
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.expression

import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.substrait.expression.ExpressionBuilder

import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction

// Static helper object for handling expressions that are specifically used in CH backend.
object CHExpressions {
// Since https://github.com/apache/incubator-gluten/pull/1937.
def createAggregateFunction(args: java.lang.Object, aggregateFunc: AggregateFunction): Long = {
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
if (
ExpressionExtensionTrait.expressionExtensionTransformer.extensionExpressionsMapping.contains(
aggregateFunc.getClass)
) {
val (substraitAggFuncName, inputTypes) =
ExpressionExtensionTrait.expressionExtensionTransformer.buildCustomAggregateFunction(
aggregateFunc)
assert(substraitAggFuncName.isDefined)
return ExpressionBuilder.newScalarFunction(
functionMap,
ConverterUtils.makeFuncName(substraitAggFuncName.get, inputTypes, FunctionConfig.REQ))
}

AggregateFunctionsBuilder.create(args, aggregateFunc)
}
}
Loading

0 comments on commit 11e817a

Please sign in to comment.