From 087f53e18f2ba44065b85bb5f7c3542334988a3a Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Sat, 31 Aug 2024 10:59:35 +0800 Subject: [PATCH] [GLUTEN-341][CH] Support BHJ + isNullAwareAntiJoin for the CH backend for example: TPCH Q16 Close #341. --- .../gluten/vectorized/StorageJoinBuilder.java | 6 +- .../execution/CHHashJoinExecTransformer.scala | 10 +- ...kHouseColumnarMemorySortShuffleSuite.scala | 2 +- ...tenClickHouseColumnarShuffleAQESuite.scala | 2 +- ...enClickHouseDSV2ColumnarShuffleSuite.scala | 2 +- .../execution/GlutenClickHouseDSV2Suite.scala | 2 +- .../GlutenClickHouseDecimalSuite.scala | 12 +- ...ouseTPCHNullableColumnarShuffleSuite.scala | 2 +- .../GlutenClickHouseTPCHNullableSuite.scala | 2 +- .../execution/GlutenClickHouseTPCHSuite.scala | 2 +- ...seTPCHColumnarShuffleParquetAQESuite.scala | 2 +- .../GlutenClickHouseTPCHParquetAQESuite.scala | 2 +- ...enClickHouseTPCHSaltNullParquetSuite.scala | 141 +++++++++++++++++- .../Join/BroadCastJoinBuilder.cpp | 6 +- .../local-engine/Join/BroadCastJoinBuilder.h | 3 +- .../Join/StorageJoinFromReadBuffer.cpp | 38 ++++- .../Join/StorageJoinFromReadBuffer.h | 7 +- .../local-engine/Operator/EarlyStopStep.cpp | 96 ++++++++++++ cpp-ch/local-engine/Operator/EarlyStopStep.h | 56 +++++++ .../Operator/ReplicateRowsStep.cpp | 7 +- cpp-ch/local-engine/Parser/JoinRelParser.cpp | 25 ++++ cpp-ch/local-engine/local_engine_jni.cpp | 5 +- ...useRSSColumnarMemorySortShuffleSuite.scala | 2 +- ...ClickHouseRSSColumnarShuffleAQESuite.scala | 2 +- .../clickhouse/ClickHouseTestSettings.scala | 1 + .../clickhouse/ClickHouseTestSettings.scala | 1 + .../clickhouse/ClickHouseTestSettings.scala | 1 + .../clickhouse/ClickHouseTestSettings.scala | 1 + 28 files changed, 395 insertions(+), 43 deletions(-) create mode 100644 cpp-ch/local-engine/Operator/EarlyStopStep.cpp create mode 100644 cpp-ch/local-engine/Operator/EarlyStopStep.h diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java index 1c4c1302d2aef..d53ff17188828 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java @@ -47,7 +47,8 @@ private static native long nativeBuild( int joinType, boolean hasMixedFiltCondition, boolean isExistenceJoin, - byte[] namedStruct); + byte[] namedStruct, + boolean isNullAwareAntiJoin); private StorageJoinBuilder() {} @@ -94,7 +95,8 @@ public static long build( joinType, broadCastContext.hasMixedFiltCondition(), broadCastContext.isExistenceJoin(), - toNameStruct(output).toByteArray()); + toNameStruct(output).toByteArray(), + broadCastContext.isNullAwareAntiJoin()); } /** create table named struct */ diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala index 9b6b2958ccc71..41def4d42c4e8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala @@ -193,7 +193,8 @@ case class BroadCastHashJoinContext( hasMixedFiltCondition: Boolean, isExistenceJoin: Boolean, buildSideStructure: Seq[Attribute], - buildHashTableId: String) + buildHashTableId: String, + isNullAwareAntiJoin: Boolean = false) case class CHBroadcastHashJoinExecTransformer( leftKeys: Seq[Expression], @@ -230,9 +231,6 @@ case class CHBroadcastHashJoinExecTransformer( if (shouldFallback) { return ValidationResult.failed("ch join validate fail") } - if (isNullAwareAntiJoin) { - return ValidationResult.failed("ch does not support NAAJ") - } super.doValidateInternal() } @@ -256,7 +254,9 @@ case class CHBroadcastHashJoinExecTransformer( isMixedCondition(condition), joinType.isInstanceOf[ExistenceJoin], buildPlan.output, - buildHashTableId) + buildHashTableId, + isNullAwareAntiJoin + ) val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context) // FIXME: Do we have to make build side a RDD? streamedRDD :+ broadcastRDD diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala index 4c49cc2d9f46c..1b79e296595ff 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala @@ -99,7 +99,7 @@ class GlutenClickHouseColumnarMemorySortShuffleSuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala index e5da78de3fd60..d70c019c0ccac 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala @@ -137,7 +137,7 @@ class GlutenClickHouseColumnarShuffleAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala index 17bd9912b032c..fef884a07b512 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala @@ -124,7 +124,7 @@ class GlutenClickHouseDSV2ColumnarShuffleSuite extends GlutenClickHouseTPCHAbstr } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala index a58b6b1c1292d..9c71d6f34f3b8 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala @@ -106,7 +106,7 @@ class GlutenClickHouseDSV2Suite extends GlutenClickHouseTPCHAbstractSuite { } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala index bd831e64bf38c..40f442bc2948b 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala @@ -343,22 +343,18 @@ class GlutenClickHouseDecimalSuite decimalTPCHTables.foreach { dt => { - val fallBack = (sql_num == 16) val compareResult = !dt._2.contains(sql_num) - val native = if (fallBack) "fallback" else "native" val compare = if (compareResult) "compare" else "noCompare" val PrecisionLoss = s"allowPrecisionLoss=$allowPrecisionLoss" val decimalType = dt._1 test(s"""TPCH Decimal(${decimalType.precision},${decimalType.scale}) - | Q$sql_num[$PrecisionLoss,$native,$compare]""".stripMargin) { + | Q$sql_num[$PrecisionLoss,native,$compare]""".stripMargin) { spark.sql(s"use decimal_${decimalType.precision}_${decimalType.scale}") withSQLConf( (SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, allowPrecisionLoss)) { - runTPCHQuery( - sql_num, - tpchQueries, - compareResult = compareResult, - noFallBack = !fallBack) { _ => {} } + runTPCHQuery(sql_num, tpchQueries, compareResult = compareResult) { + _ => {} + } } spark.sql(s"use default") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala index 5f9aa0dbda60f..d6cf33a434207 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala @@ -147,7 +147,7 @@ class GlutenClickHouseTPCHNullableColumnarShuffleSuite extends GlutenClickHouseT } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala index e0e4d33804504..d5e1156ba9d20 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala @@ -150,7 +150,7 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index f25a1313255ff..93c13d15c6ce1 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -151,7 +151,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 39dc7baf96950..28b874e21bacd 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -251,7 +251,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala index 2aadac05d348c..5f9f01fb697a0 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala @@ -185,7 +185,7 @@ class GlutenClickHouseTPCHParquetAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index e21df203dac03..1db37e00f9465 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -311,7 +311,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr // see issue https://github.com/Kyligence/ClickHouse/issues/93 test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { @@ -2797,5 +2797,144 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark(sql, true, { _ => }) } + + test("GLUTEN-341: Support BHJ + isNullAwareAntiJoin for the CH backend") { + def checkBHJWithIsNullAwareAntiJoin(df: DataFrame): Unit = { + val bhjs = df.queryExecution.executedPlan.collect { + case bhj: CHBroadcastHashJoinExecTransformer if bhj.isNullAwareAntiJoin => true + } + assert(bhjs.size == 1) + } + + val sql = + s""" + |SELECT + | p_brand, + | p_type, + | p_size, + | count(DISTINCT ps_suppkey) AS supplier_cnt + |FROM + | partsupp, + | part + |WHERE + | p_partkey = ps_partkey + | AND p_brand <> 'Brand#45' + | AND p_type NOT LIKE 'MEDIUM POLISHED%' + | AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9) + | AND ps_suppkey NOT IN ( + | SELECT + | s_suppkey + | FROM + | supplier + | WHERE + | s_comment is null) + |GROUP BY + | p_brand, + | p_type, + | p_size + |ORDER BY + | supplier_cnt DESC, + | p_brand, + | p_type, + | p_size; + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql1 = + s""" + |SELECT + | p_brand, + | p_type, + | p_size, + | count(DISTINCT ps_suppkey) AS supplier_cnt + |FROM + | partsupp, + | part + |WHERE + | p_partkey = ps_partkey + | AND p_brand <> 'Brand#45' + | AND p_type NOT LIKE 'MEDIUM POLISHED%' + | AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9) + | AND ps_suppkey NOT IN ( + | SELECT + | s_suppkey + | FROM + | supplier + | WHERE + | s_comment LIKE '%Customer%Complaints11%') + |GROUP BY + | p_brand, + | p_type, + | p_size + |ORDER BY + | supplier_cnt DESC, + | p_brand, + | p_type, + | p_size; + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql1, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql2 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50), (null) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql2, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql3 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50) sub(suppkey) WHERE suppkey > 100) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql3, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql4 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (50), (60) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql4, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + + val sql5 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (null) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql5, + true, + df => { + checkBHJWithIsNullAwareAntiJoin(df) + }) + } } // scalastyle:on line.size.limit diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp index da301dcb89f87..0b88137b328bf 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp @@ -115,7 +115,8 @@ std::shared_ptr buildJoin( jint join_type, bool has_mixed_join_condition, bool is_existence_join, - const std::string & named_struct) + const std::string & named_struct, + bool is_null_aware_anti_join) { auto join_key_list = Poco::StringTokenizer(join_keys, ","); Names key_names; @@ -191,7 +192,8 @@ std::shared_ptr buildJoin( columns_description, ConstraintsDescription(), key, - true); + true, + is_null_aware_anti_join); } void init(JNIEnv * env) diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h index a97bd77a84d09..d089d7420a8af 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h @@ -38,7 +38,8 @@ std::shared_ptr buildJoin( jint join_type, bool has_mixed_join_condition, bool is_existence_join, - const std::string & named_struct); + const std::string & named_struct, + bool is_null_aware_anti_join); void cleanBuildHashTable(const std::string & hash_table_id, jlong instance); std::shared_ptr getJoin(const std::string & hash_table_id); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp index 1d87b5f57a094..90baa754f31b3 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -72,8 +73,9 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer( const ColumnsDescription & columns, const ConstraintsDescription & constraints, const String & comment, - const bool overwrite_) - : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), overwrite(overwrite_) + const bool overwrite_, + bool is_null_aware_anti_join_) + : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_) { storage_metadata.setColumns(columns); storage_metadata.setConstraints(constraints); @@ -104,8 +106,36 @@ void StorageJoinFromReadBuffer::buildJoin(Blocks & data, const Block header, std auto build_join = [&] { join = std::make_shared(analyzed_join, header, overwrite, row_count); - for (Block block : data) - join->addBlockToJoin(std::move(block), true); + // only when is_null_aware_anti_join is true, it needs to check whether is null key value exists + if (is_null_aware_anti_join) + { + is_empty_hash_table = data.empty(); + size_t total_size = 0; + for (Block block : data) + { + for (size_t i = 0; i < block.columns(); ++i) + { + const auto & column = block.getByPosition(i); + if (column.name == key_names.at(0)) + { + if (const auto * nullable = checkAndGetColumn(column.column.get())) + { + const auto & null_map_data = nullable->getNullMapData(); + // check whether there is null key value + has_null_key_value = !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()); + } + } + } + total_size += block.rows(); + join->addBlockToJoin(std::move(block), true); + } + is_empty_hash_table = (total_size < 1); + } + else + { + for (Block block : data) + join->addBlockToJoin(std::move(block), true); + } }; /// Record memory usage in Total Memory Tracker ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h index d9766c4392f4d..8c9416b544573 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h @@ -46,7 +46,11 @@ class StorageJoinFromReadBuffer const DB::ColumnsDescription & columns_, const DB::ConstraintsDescription & constraints_, const String & comment, - bool overwrite_); + bool overwrite_, + bool is_null_aware_anti_join_); + + bool has_null_key_value = false; + bool is_empty_hash_table = false; /// The columns' names in right_header may be different from the names in the ColumnsDescription /// in the constructor. @@ -64,6 +68,7 @@ class StorageJoinFromReadBuffer std::shared_mutex join_mutex; std::list input_blocks; std::shared_ptr join = nullptr; + bool is_null_aware_anti_join; void readAllBlocksFromInput(DB::ReadBuffer & in); void buildJoin(DB::Blocks & data, const DB::Block header, std::shared_ptr analyzed_join); diff --git a/cpp-ch/local-engine/Operator/EarlyStopStep.cpp b/cpp-ch/local-engine/Operator/EarlyStopStep.cpp new file mode 100644 index 0000000000000..ff148a203b477 --- /dev/null +++ b/cpp-ch/local-engine/Operator/EarlyStopStep.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +EarlyStopStep::EarlyStopStep( + const DB::DataStream & input_stream_) + : DB::ITransformingStep( + input_stream_, transformHeader(input_stream_.header), getTraits()) +{ +} + +DB::Block EarlyStopStep::transformHeader(const DB::Block& input) +{ + return input.cloneEmpty(); +} + +void EarlyStopStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) +{ + pipeline.addSimpleTransform( + [&](const DB::Block & header) + { + return std::make_shared(header); + }); +} + +void EarlyStopStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + if (!processors.empty()) + DB::IQueryPlanStep::describePipeline(processors, settings); +} + +void EarlyStopStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), transformHeader(input_streams.front().header), getDataStreamTraits()); +} + +EarlyStopTransform::EarlyStopTransform(const DB::Block &header_) + : DB::IProcessor({header_}, {EarlyStopStep::transformHeader(header_)}) +{ +} + +EarlyStopTransform::Status EarlyStopTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (!input.isFinished()) + { + input.close(); + } + output.finish(); + return Status::Finished; +} + +void EarlyStopTransform::work() +{ +} +} diff --git a/cpp-ch/local-engine/Operator/EarlyStopStep.h b/cpp-ch/local-engine/Operator/EarlyStopStep.h new file mode 100644 index 0000000000000..becfd46b83f24 --- /dev/null +++ b/cpp-ch/local-engine/Operator/EarlyStopStep.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +namespace local_engine +{ +/// This step will return empty block. +class EarlyStopStep : public DB::ITransformingStep +{ +public: + explicit EarlyStopStep( + const DB::DataStream & input_stream_); + ~EarlyStopStep() override = default; + + String getName() const override { return "EarlyStopStep"; } + + static DB::Block transformHeader(const DB::Block& input); + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; +private: + void updateOutputStream() override; +}; + +class EarlyStopTransform : public DB::IProcessor +{ +public: + using Status = DB::IProcessor::Status; + explicit EarlyStopTransform(const DB::Block &header_); + ~EarlyStopTransform() override = default; + + Status prepare() override; + void work() override; + String getName() const override { return "EarlyStopTransform"; } +private: + DB::Block header; +}; +} diff --git a/cpp-ch/local-engine/Operator/ReplicateRowsStep.cpp b/cpp-ch/local-engine/Operator/ReplicateRowsStep.cpp index f2d4bc8a865dc..3d8ce61c61133 100644 --- a/cpp-ch/local-engine/Operator/ReplicateRowsStep.cpp +++ b/cpp-ch/local-engine/Operator/ReplicateRowsStep.cpp @@ -51,12 +51,7 @@ ReplicateRowsStep::ReplicateRowsStep(const DB::DataStream & input_stream) DB::Block ReplicateRowsStep::transformHeader(const DB::Block& input) { - DB::Block output; - for (int i = 1; i < input.columns(); i++) - { - output.insert(input.getByPosition(i)); - } - return output; + return input.cloneEmpty(); } void ReplicateRowsStep::transformPipeline( diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/JoinRelParser.cpp index 2959e1986875c..6f8877523b139 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -268,6 +269,30 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q if (storage_join) { + if (join_opt_info.is_null_aware_anti_join && join.type() == substrait::JoinRel_JoinType_JOIN_TYPE_ANTI) + { + if (storage_join->has_null_key_value) + { + // if there is a null key value on the build side, it will return the empty result + auto empty_step = std::make_unique(left->getCurrentDataStream()); + left->addStep(std::move(empty_step)); + } + else if (!storage_join->is_empty_hash_table) + { + auto input_header = left->getCurrentDataStream().header; + DB::ActionsDAG filter_is_not_null_dag{input_header.getColumnsWithTypeAndName()}; + // when is_null_aware_anti_join is true, there is only one join key + const auto * key_field = filter_is_not_null_dag.getInputs()[join.expression().scalar_function().arguments().at(0).value().selection().direct_reference().struct_field().field()]; + + auto result_node = filter_is_not_null_dag.tryFindInOutputs(key_field->result_name); + // add a function isNotNull to filter the null key on the left side + const auto * cond_node = plan_parser->toFunctionNode(filter_is_not_null_dag, "isNotNull", {result_node}); + filter_is_not_null_dag.addOrReplaceInOutputs(*cond_node); + auto filter_step = std::make_unique(left->getCurrentDataStream(), std::move(filter_is_not_null_dag), cond_node->result_name, true); + left->addStep(std::move(filter_step)); + } + // other case: is_empty_hash_table, don't need to handle + } applyJoinFilter(*table_join, join, *left, *right, true); auto broadcast_hash_join = storage_join->getJoinLocked(table_join, context); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c80379a879f87..6ed9b4a43c44a 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1071,7 +1071,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild jint join_type_, jboolean has_mixed_join_condition, jboolean is_existence_join, - jbyteArray named_struct) + jbyteArray named_struct, + jboolean is_null_aware_anti_join) { LOCAL_ENGINE_JNI_METHOD_START const auto hash_table_id = jstring2string(env, key); @@ -1084,7 +1085,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild DB::CompressedReadBuffer input(read_buffer_from_java_array); local_engine::configureCompressedReadBuffer(input); const auto * obj = make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin( - hash_table_id, input, row_count_, join_key, join_type_, has_mixed_join_condition, is_existence_join, struct_string)); + hash_table_id, input, row_count_, join_key, join_type_, has_mixed_join_condition, is_existence_join, struct_string, is_null_aware_anti_join)); return obj->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } diff --git a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala index ef1c4180b11b3..ee7657c505ac3 100644 --- a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala +++ b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala @@ -107,7 +107,7 @@ class GlutenClickHouseRSSColumnarMemorySortShuffleSuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala index 50220264e0637..00f3bee8eb7b3 100644 --- a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala +++ b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala @@ -142,7 +142,7 @@ class GlutenClickHouseRSSColumnarShuffleAQESuite } test("TPCH Q16") { - runTPCHQuery(16, noFallBack = false) { df => } + runTPCHQuery(16) { df => } } test("TPCH Q17") { diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index c096603dee3be..90d1dbae89e4c 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1161,6 +1161,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index ca7f19a0c58d9..1115af3164eb4 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1182,6 +1182,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index f0c1151b9a518..9ededd6c5370c 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1027,6 +1027,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 9fb1f2d34f76e..fd1326cb1c152 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1027,6 +1027,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") + .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition")