Skip to content

Commit

Permalink
Merge branch 'main' into fix-input-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ivoson authored Sep 3, 2024
2 parents 2413c1a + 7ca509f commit 747a103
Show file tree
Hide file tree
Showing 93 changed files with 490 additions and 106 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,16 @@ jobs:
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--data-gen=skip -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true
# - name: TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory
# run: |
# cd tools/gluten-it \
# && GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
# --local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
# --data-gen=skip -m=OffHeapExecutionMemory \
# -d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
# -d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
# -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
# -d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true

run-tpc-test-ubuntu-randomkill:
needs: build-native-lib-centos-7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.gluten.memory;

import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskResource;
import org.apache.spark.util.TaskResources;
import org.apache.spark.task.TaskResource;
import org.apache.spark.task.TaskResources;

public class CHThreadGroup implements TaskResource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private static native long nativeBuild(
int joinType,
boolean hasMixedFiltCondition,
boolean isExistenceJoin,
byte[] namedStruct);
byte[] namedStruct,
boolean isNullAwareAntiJoin);

private StorageJoinBuilder() {}

Expand Down Expand Up @@ -94,7 +95,8 @@ public static long build(
joinType,
broadCastContext.hasMixedFiltCondition(),
broadCastContext.isExistenceJoin(),
toNameStruct(output).toByteArray());
toNameStruct(output).toByteArray(),
broadCastContext.isNullAwareAntiJoin());
}

/** create table named struct */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ case class BroadCastHashJoinContext(
hasMixedFiltCondition: Boolean,
isExistenceJoin: Boolean,
buildSideStructure: Seq[Attribute],
buildHashTableId: String)
buildHashTableId: String,
isNullAwareAntiJoin: Boolean = false)

case class CHBroadcastHashJoinExecTransformer(
leftKeys: Seq[Expression],
Expand Down Expand Up @@ -230,9 +231,6 @@ case class CHBroadcastHashJoinExecTransformer(
if (shouldFallback) {
return ValidationResult.failed("ch join validate fail")
}
if (isNullAwareAntiJoin) {
return ValidationResult.failed("ch does not support NAAJ")
}
super.doValidateInternal()
}

Expand All @@ -256,7 +254,9 @@ case class CHBroadcastHashJoinExecTransformer(
isMixedCondition(condition),
joinType.isInstanceOf[ExistenceJoin],
buildPlan.output,
buildHashTableId)
buildHashTableId,
isNullAwareAntiJoin
)
val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context)
// FIXME: Do we have to make build side a RDD?
streamedRDD :+ broadcastRDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class GlutenClickHouseColumnarMemorySortShuffleSuite
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class GlutenClickHouseColumnarShuffleAQESuite
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class GlutenClickHouseDSV2ColumnarShuffleSuite extends GlutenClickHouseTPCHAbstr
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class GlutenClickHouseDSV2Suite extends GlutenClickHouseTPCHAbstractSuite {
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,22 +343,18 @@ class GlutenClickHouseDecimalSuite
decimalTPCHTables.foreach {
dt =>
{
val fallBack = (sql_num == 16)
val compareResult = !dt._2.contains(sql_num)
val native = if (fallBack) "fallback" else "native"
val compare = if (compareResult) "compare" else "noCompare"
val PrecisionLoss = s"allowPrecisionLoss=$allowPrecisionLoss"
val decimalType = dt._1
test(s"""TPCH Decimal(${decimalType.precision},${decimalType.scale})
| Q$sql_num[$PrecisionLoss,$native,$compare]""".stripMargin) {
| Q$sql_num[$PrecisionLoss,native,$compare]""".stripMargin) {
spark.sql(s"use decimal_${decimalType.precision}_${decimalType.scale}")
withSQLConf(
(SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, allowPrecisionLoss)) {
runTPCHQuery(
sql_num,
tpchQueries,
compareResult = compareResult,
noFallBack = !fallBack) { _ => {} }
runTPCHQuery(sql_num, tpchQueries, compareResult = compareResult) {
_ => {}
}
}
spark.sql(s"use default")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class GlutenClickHouseTPCHNullableColumnarShuffleSuite extends GlutenClickHouseT
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.vectorized.GeneralInIterator
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.vectorized.GeneralInIterator
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class GlutenClickHouseTPCHParquetAQESuite
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr

// see issue https://github.com/Kyligence/ClickHouse/issues/93
test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
runTPCHQuery(16) { df => }
}

test("TPCH Q17") {
Expand Down Expand Up @@ -2797,5 +2797,144 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("GLUTEN-341: Support BHJ + isNullAwareAntiJoin for the CH backend") {
def checkBHJWithIsNullAwareAntiJoin(df: DataFrame): Unit = {
val bhjs = df.queryExecution.executedPlan.collect {
case bhj: CHBroadcastHashJoinExecTransformer if bhj.isNullAwareAntiJoin => true
}
assert(bhjs.size == 1)
}

val sql =
s"""
|SELECT
| p_brand,
| p_type,
| p_size,
| count(DISTINCT ps_suppkey) AS supplier_cnt
|FROM
| partsupp,
| part
|WHERE
| p_partkey = ps_partkey
| AND p_brand <> 'Brand#45'
| AND p_type NOT LIKE 'MEDIUM POLISHED%'
| AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9)
| AND ps_suppkey NOT IN (
| SELECT
| s_suppkey
| FROM
| supplier
| WHERE
| s_comment is null)
|GROUP BY
| p_brand,
| p_type,
| p_size
|ORDER BY
| supplier_cnt DESC,
| p_brand,
| p_type,
| p_size;
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql,
true,
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})

val sql1 =
s"""
|SELECT
| p_brand,
| p_type,
| p_size,
| count(DISTINCT ps_suppkey) AS supplier_cnt
|FROM
| partsupp,
| part
|WHERE
| p_partkey = ps_partkey
| AND p_brand <> 'Brand#45'
| AND p_type NOT LIKE 'MEDIUM POLISHED%'
| AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9)
| AND ps_suppkey NOT IN (
| SELECT
| s_suppkey
| FROM
| supplier
| WHERE
| s_comment LIKE '%Customer%Complaints11%')
|GROUP BY
| p_brand,
| p_type,
| p_size
|ORDER BY
| supplier_cnt DESC,
| p_brand,
| p_type,
| p_size;
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql1,
true,
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})

val sql2 =
s"""
|select * from partsupp
|where
|ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50), (null) sub(suppkey))
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql2,
true,
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})

val sql3 =
s"""
|select * from partsupp
|where
|ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50) sub(suppkey) WHERE suppkey > 100)
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql3,
true,
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})

val sql4 =
s"""
|select * from partsupp
|where
|ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50), (60) sub(suppkey))
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql4,
true,
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})

val sql5 =
s"""
|select * from partsupp
|where
|ps_suppkey NOT IN (SELECT suppkey FROM VALUES (null) sub(suppkey))
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql5,
true,
df => {
checkBHJWithIsNullAwareAntiJoin(df)
})
}
}
// scalastyle:on line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.types._
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources
import org.apache.spark.util.collection.BitSet

import com.google.protobuf.{Any, Message}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
import org.apache.spark.util.TaskResources
import org.apache.spark.task.TaskResources

class VeloxValidatorApi extends ValidatorApi {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, TaskResources}
import org.apache.spark.task.TaskResources
import org.apache.spark.util.SerializableConfiguration

import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.vector.types.pojo.Schema
Expand Down
Loading

0 comments on commit 747a103

Please sign in to comment.