From d3cf7a512fc7c687e60d735d16513763595418b8 Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Tue, 10 Sep 2024 15:58:59 +0800 Subject: [PATCH] [GLUTEN-7180][CH] Fix ut `Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys` for the CH backend when the aqe is on Fix ut Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys for the CH backend when the aqe is on Close #7180. --- .../gluten/vectorized/CHNativeBlock.java | 6 +++ .../gluten/vectorized/StorageJoinBuilder.java | 9 ++-- .../backendsapi/clickhouse/CHRuleApi.scala | 1 + .../clickhouse/CHSparkPlanExecApi.scala | 36 +++++++++++-- .../CHAQEPropagateEmptyRelation.scala | 52 +++++++++++++++++++ .../apache/gluten/vectorized/BlockStats.java | 35 +++++++++++++ .../joins/ClickHouseBuildSideRelation.scala | 6 ++- .../sql/execution/utils/CHExecUtil.scala | 36 +++++++++---- ...enClickHouseTPCHSaltNullParquetSuite.scala | 49 +++++++++++++++++ .../benchmarks/CHHashBuildBenchmark.scala | 9 ++-- .../Join/BroadCastJoinBuilder.cpp | 6 ++- .../local-engine/Join/BroadCastJoinBuilder.h | 3 +- .../Join/StorageJoinFromReadBuffer.cpp | 38 +++----------- .../Join/StorageJoinFromReadBuffer.h | 3 +- cpp-ch/local-engine/local_engine_jni.cpp | 46 ++++++++++++++-- .../clickhouse/ClickHouseTestSettings.scala | 1 - .../clickhouse/ClickHouseTestSettings.scala | 1 - .../clickhouse/ClickHouseTestSettings.scala | 1 - .../clickhouse/ClickHouseTestSettings.scala | 1 - 19 files changed, 272 insertions(+), 67 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala create mode 100644 backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java index e3c51ae28583a..c8cce61b4f2b4 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java @@ -71,6 +71,12 @@ public long totalBytes() { public native void nativeClose(long blockAddress); + public native BlockStats nativeBlockStats(long blockAddress, int columnPosition); + + public BlockStats getBlockStats(int columnPosition) { + return nativeBlockStats(blockAddress, columnPosition); + } + public void close() { if (blockAddress != 0) { nativeClose(blockAddress); diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java index d53ff17188828..d417fa1a9d23e 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/StorageJoinBuilder.java @@ -48,7 +48,8 @@ private static native long nativeBuild( boolean hasMixedFiltCondition, boolean isExistenceJoin, byte[] namedStruct, - boolean isNullAwareAntiJoin); + boolean isNullAwareAntiJoin, + boolean hasNullKeyValues); private StorageJoinBuilder() {} @@ -58,7 +59,8 @@ public static long build( long rowCount, BroadCastHashJoinContext broadCastContext, List newBuildKeys, - List newOutput) { + List newOutput, + boolean hasNullKeyValues) { ConverterUtils$ converter = ConverterUtils$.MODULE$; List keys; List output; @@ -96,7 +98,8 @@ public static long build( broadCastContext.hasMixedFiltCondition(), broadCastContext.isExistenceJoin(), toNameStruct(output).toByteArray(), - broadCastContext.isNullAwareAntiJoin()); + broadCastContext.isNullAwareAntiJoin(), + hasNullKeyValues); } /** create table named struct */ diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 61fed3f999a40..a83d349cdb329 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -44,6 +44,7 @@ private object CHRuleApi { def injectSpark(injector: SparkInjector): Unit = { // Regular Spark rules. injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply) + injector.injectQueryStagePrepRule(spark => CHAQEPropagateEmptyRelation(spark)) injector.injectParser( (spark, parserInterface) => new GlutenCacheFilesSqlParser(spark, parserInterface)) injector.injectParser( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index a8996c4d2e834..1108b8b3c5010 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -474,12 +474,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { numOutputRows: SQLMetric, dataSize: SQLMetric): BuildSideRelation = { - val buildKeys: Seq[Expression] = mode match { + val (buildKeys, isNullAware) = mode match { case mode1: HashedRelationBroadcastMode => - mode1.key + (mode1.key, mode1.isNullAware) case _ => // IdentityBroadcastMode - Seq.empty + (Seq.empty, false) } val (newChild, newOutput, newBuildKeys) = @@ -532,8 +532,27 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { } (newChild, (child.output ++ appendedProjections).map(_.toAttribute), preProjectionBuildKeys) } + + // find the key index in the output + val keyColumnIndex = if (isNullAware) { + def findKeyOrdinal(key: Expression, output: Seq[Attribute]): Int = { + key match { + case b: BoundReference => b.ordinal + case n: NamedExpression => + output.indexWhere(o => (o.name.equals(n.name) && o.exprId == n.exprId)) + case _ => throw new GlutenException(s"Cannot find $key in the child's output: $output") + } + } + if (newBuildKeys.isEmpty) { + findKeyOrdinal(buildKeys(0), newOutput) + } else { + findKeyOrdinal(newBuildKeys(0), newOutput) + } + } else { + 0 + } val countsAndBytes = - CHExecUtil.buildSideRDD(dataSize, newChild).collect + CHExecUtil.buildSideRDD(dataSize, newChild, isNullAware, keyColumnIndex).collect val batches = countsAndBytes.map(_._2) val totalBatchesSize = batches.map(_.length).sum @@ -548,8 +567,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { s" written bytes is correct.") } val rowCount = countsAndBytes.map(_._1).sum + val hasNullKeyValues = countsAndBytes.map(_._3).foldLeft[Boolean](false)((b, a) => { b || a }) numOutputRows += rowCount - ClickHouseBuildSideRelation(mode, newOutput, batches.flatten, rowCount, newBuildKeys) + ClickHouseBuildSideRelation( + mode, + newOutput, + batches.flatten, + rowCount, + newBuildKeys, + hasNullKeyValues) } /** Define backend specfic expression mappings. */ diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala new file mode 100644 index 0000000000000..fea2199eddded --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.utils.PhysicalPlanSelector + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.LeftAnti +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, LocalTableScanExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ClickHouseBuildSideRelation} + +case class CHAQEPropagateEmptyRelation(session: SparkSession) extends Rule[SparkPlan] { + + def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(session, plan) { + plan.transform { + case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, left, right, isNullAwareAntiJoin) + if (joinType == LeftAnti) && isNullAwareAntiJoin => + right match { + case BroadcastQueryStageExec(_, plan: SparkPlan, _) => + val columnarBroadcast = plan match { + case c: ColumnarBroadcastExchangeExec => c + case ReusedExchangeExec(_, c: ColumnarBroadcastExchangeExec) => c + } + val chBuildSideRelation = columnarBroadcast.relationFuture.get().value + chBuildSideRelation match { + case c: ClickHouseBuildSideRelation if c.hasNullKeyValues => + LocalTableScanExec(bhj.output, Seq.empty) + case _ => bhj + } + case o => bhj + } + case other => other + } + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java new file mode 100644 index 0000000000000..e47454cbed55f --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/BlockStats.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.vectorized; + +public class BlockStats { + private final long blockRecordCount; + private final boolean hasNullKeyValues; + + public BlockStats(long blockRecordCount, boolean hasNullKeyValues) { + this.blockRecordCount = blockRecordCount; + this.hasNullKeyValues = hasNullKeyValues; + } + + public long getBlockRecordCount() { + return blockRecordCount; + } + + public boolean isHasNullKeyValues() { + return hasNullKeyValues; + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala index 862a8b4200f34..92887f16d70ab 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/joins/ClickHouseBuildSideRelation.scala @@ -35,7 +35,8 @@ case class ClickHouseBuildSideRelation( output: Seq[Attribute], batches: Array[Byte], numOfRows: Long, - newBuildKeys: Seq[Expression] = Seq.empty) + newBuildKeys: Seq[Expression] = Seq.empty, + hasNullKeyValues: Boolean = false) extends BuildSideRelation with Logging { @@ -58,7 +59,8 @@ case class ClickHouseBuildSideRelation( numOfRows, broadCastContext, newBuildKeys.asJava, - output.asJava) + output.asJava, + hasNullKeyValues) (hashTableData, this) } else { (StorageJoinBuilder.nativeCloneBuildHashTable(hashTableData), null) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 4496d893fcd7c..6d91108d22dae 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -58,10 +58,13 @@ object CHExecUtil extends Logging { def toBytes( dataSize: SQLMetric, iter: Iterator[ColumnarBatch], + isNullAware: Boolean = false, + keyColumnIndex: Int = 0, compressionCodec: Option[String] = Some("lz4"), compressionLevel: Option[Int] = None, - bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = { - var count = 0 + bufferSize: Int = 4 << 10): Iterator[(Long, Array[Byte], Boolean)] = { + var count = 0L + var hasNullKeyValues = false val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](bufferSize) // 4K val level = compressionLevel.getOrElse(Int.MinValue) @@ -69,20 +72,35 @@ object CHExecUtil extends Logging { compressionCodec .map(new BlockOutputStream(bos, buffer, dataSize, true, _, level, bufferSize)) .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", level, bufferSize)) - while (iter.hasNext) { - val batch = iter.next() - count += batch.numRows - blockOutputStream.write(batch) + if (isNullAware) { + while (iter.hasNext) { + val batch = iter.next() + val blockStats = CHNativeBlock.fromColumnarBatch(batch).getBlockStats(keyColumnIndex) + count += blockStats.getBlockRecordCount + hasNullKeyValues = hasNullKeyValues || blockStats.isHasNullKeyValues + blockOutputStream.write(batch) + } + } else { + while (iter.hasNext) { + val batch = iter.next() + count += batch.numRows() + blockOutputStream.write(batch) + } } blockOutputStream.flush() blockOutputStream.close() - Iterator((count, bos.toByteArray)) + Iterator((count, bos.toByteArray, hasNullKeyValues)) } - def buildSideRDD(dataSize: SQLMetric, newChild: SparkPlan): RDD[(Int, Array[Byte])] = { + def buildSideRDD( + dataSize: SQLMetric, + newChild: SparkPlan, + isNullAware: Boolean, + keyColumnIndex: Int + ): RDD[(Long, Array[Byte], Boolean)] = { newChild .executeColumnar() - .mapPartitionsInternal(iter => toBytes(dataSize, iter)) + .mapPartitionsInternal(iter => toBytes(dataSize, iter, isNullAware, keyColumnIndex)) } private def buildRangePartitionSampleRDD( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 49697872e8aec..f7cf0de3762d0 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -2935,6 +2935,55 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr df => { checkBHJWithIsNullAwareAntiJoin(df) }) + + withSQLConf(("spark.sql.adaptive.enabled", "true")) { + def checkAQEBHJWithIsNullAwareAntiJoin(df: DataFrame, isNullAwareBhjCnt: Int = 1): Unit = { + val bhjs = collect(df.queryExecution.executedPlan) { + case bhj: CHBroadcastHashJoinExecTransformer if bhj.isNullAwareAntiJoin => true + } + assert(bhjs.size == isNullAwareBhjCnt) + } + + val sql6 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (null), (6) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql6, + true, + df => { + checkAQEBHJWithIsNullAwareAntiJoin(df, 0) + }) + + val sql7 = + s""" + |select * from partsupp + |where + |cast(ps_suppkey AS INT) NOT IN (SELECT suppkey FROM VALUES (null), (6) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql7, + true, + df => { + checkAQEBHJWithIsNullAwareAntiJoin(df, 0) + }) + + val sql8 = + s""" + |select * from partsupp + |where + |ps_suppkey NOT IN (SELECT suppkey FROM VALUES (5), (6) sub(suppkey)) + |""".stripMargin + compareResultsAgainstVanillaSpark( + sql8, + true, + df => { + checkAQEBHJWithIsNullAwareAntiJoin(df) + }) + } + } test("soundex") { diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala index 87c389a651c35..2c36b404ec267 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHHashBuildBenchmark.scala @@ -58,7 +58,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark w |select $scanSchema from parquet.`$parquetDir` | |""".stripMargin) - val rowCount: Int = chParquet.count().toInt + val rowCount = chParquet.count() val runs = Seq(1, 2, 4, 8, 16, 32, 64).reverse .map(num => rowCount / num) @@ -79,13 +79,14 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark w s"build hash table with $num rows with $iteration hash tables", executedCnt) { _ => - for (i <- 0 until iteration) { + for (i <- 0L until iteration) { val table = StorageJoinBuilder.build( bytes, num, relation, new util.ArrayList[Expression](), - new util.ArrayList[Attribute]()) + new util.ArrayList[Attribute](), + false) StorageJoinBuilder.nativeCleanBuildHashTable("", table) } } @@ -94,7 +95,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark w } private def createBroadcastRelation( - child: SparkPlan): (Array[Byte], Int, BroadCastHashJoinContext) = { + child: SparkPlan): (Array[Byte], Long, BroadCastHashJoinContext) = { val dataSize = SQLMetrics.createSizeMetric(spark.sparkContext, "size of files read") val countsAndBytes = child diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp index 0b88137b328bf..5f20c17bfb1f4 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.cpp @@ -116,7 +116,8 @@ std::shared_ptr buildJoin( bool has_mixed_join_condition, bool is_existence_join, const std::string & named_struct, - bool is_null_aware_anti_join) + bool is_null_aware_anti_join, + bool has_null_key_values) { auto join_key_list = Poco::StringTokenizer(join_keys, ","); Names key_names; @@ -193,7 +194,8 @@ std::shared_ptr buildJoin( ConstraintsDescription(), key, true, - is_null_aware_anti_join); + is_null_aware_anti_join, + has_null_key_values); } void init(JNIEnv * env) diff --git a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h index d089d7420a8af..52db2e7e75e35 100644 --- a/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h +++ b/cpp-ch/local-engine/Join/BroadCastJoinBuilder.h @@ -39,7 +39,8 @@ std::shared_ptr buildJoin( bool has_mixed_join_condition, bool is_existence_join, const std::string & named_struct, - bool is_null_aware_anti_join); + bool is_null_aware_anti_join, + bool has_null_key_values); void cleanBuildHashTable(const std::string & hash_table_id, jlong instance); std::shared_ptr getJoin(const std::string & hash_table_id); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp index 90baa754f31b3..834ba97815669 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.cpp @@ -74,9 +74,11 @@ StorageJoinFromReadBuffer::StorageJoinFromReadBuffer( const ConstraintsDescription & constraints, const String & comment, const bool overwrite_, - bool is_null_aware_anti_join_) - : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_) + bool is_null_aware_anti_join_, + bool has_null_key_values_) + : key_names(key_names_), use_nulls(use_nulls_), row_count(row_count_), overwrite(overwrite_), is_null_aware_anti_join(is_null_aware_anti_join_), has_null_key_value(has_null_key_values_) { + is_empty_hash_table = row_count < 1; storage_metadata.setColumns(columns); storage_metadata.setConstraints(constraints); storage_metadata.setComment(comment); @@ -106,36 +108,8 @@ void StorageJoinFromReadBuffer::buildJoin(Blocks & data, const Block header, std auto build_join = [&] { join = std::make_shared(analyzed_join, header, overwrite, row_count); - // only when is_null_aware_anti_join is true, it needs to check whether is null key value exists - if (is_null_aware_anti_join) - { - is_empty_hash_table = data.empty(); - size_t total_size = 0; - for (Block block : data) - { - for (size_t i = 0; i < block.columns(); ++i) - { - const auto & column = block.getByPosition(i); - if (column.name == key_names.at(0)) - { - if (const auto * nullable = checkAndGetColumn(column.column.get())) - { - const auto & null_map_data = nullable->getNullMapData(); - // check whether there is null key value - has_null_key_value = !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()); - } - } - } - total_size += block.rows(); - join->addBlockToJoin(std::move(block), true); - } - is_empty_hash_table = (total_size < 1); - } - else - { - for (Block block : data) - join->addBlockToJoin(std::move(block), true); - } + for (Block block : data) + join->addBlockToJoin(std::move(block), true); }; /// Record memory usage in Total Memory Tracker ThreadFromGlobalPoolNoTracingContextPropagation thread(build_join); diff --git a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h index 8c9416b544573..e000952a45999 100644 --- a/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h +++ b/cpp-ch/local-engine/Join/StorageJoinFromReadBuffer.h @@ -47,7 +47,8 @@ class StorageJoinFromReadBuffer const DB::ConstraintsDescription & constraints_, const String & comment, bool overwrite_, - bool is_null_aware_anti_join_); + bool is_null_aware_anti_join_, + bool has_null_key_values_); bool has_null_key_value = false; bool is_empty_hash_table = false; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 15c721be3489f..79c777eddcd08 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -110,6 +110,9 @@ static jmethodID block_stripes_constructor; static jclass split_result_class; static jmethodID split_result_constructor; +static jclass block_stats_class; +static jmethodID block_stats_constructor; + JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) { JNIEnv * env; @@ -127,6 +130,9 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) split_result_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/CHSplitResult;"); split_result_constructor = local_engine::GetMethodID(env, split_result_class, "", "(JJJJJJ[J[JJJJ)V"); + block_stats_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/BlockStats;"); + block_stats_constructor = local_engine::GetMethodID(env, block_stats_class, "", "(JZ)V"); + local_engine::ShuffleReader::input_stream_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/ShuffleInputStream;"); local_engine::NativeSplitter::iterator_class @@ -184,6 +190,7 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * /*reserved*/) env->DeleteGlobalRef(spark_row_info_class); env->DeleteGlobalRef(block_stripes_class); env->DeleteGlobalRef(split_result_class); + env->DeleteGlobalRef(block_stats_class); env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class); env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); @@ -332,8 +339,8 @@ Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, job else { const auto * nullable = checkAndGetColumn(&*col.column); - size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - return num_nulls < block->rows(); + const auto & null_map_data = nullable->getNullMapData(); + return !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()); } LOCAL_ENGINE_JNI_METHOD_END(env, false) } @@ -506,6 +513,35 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes LOCAL_ENGINE_JNI_METHOD_END(env, -1) } +JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, jobject obj, jlong block_address, jint column_position) +{ + LOCAL_ENGINE_JNI_METHOD_START + DB::Block * block = reinterpret_cast(block_address); + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) + { + jobject block_stats = env->NewObject( + block_stats_class, + block_stats_constructor, + block->rows(), + false); + return block_stats; + } + else + { + const auto * nullable = checkAndGetColumn(&*col.column); + const auto & null_map_data = nullable->getNullMapData(); + + jobject block_stats = env->NewObject( + block_stats_class, + block_stats_constructor, + block->rows(), + !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size())); + return block_stats; + } + LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) +} + JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader( JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes) { @@ -1065,7 +1101,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild jboolean has_mixed_join_condition, jboolean is_existence_join, jbyteArray named_struct, - jboolean is_null_aware_anti_join) + jboolean is_null_aware_anti_join, + jboolean has_null_key_values) { LOCAL_ENGINE_JNI_METHOD_START const auto hash_table_id = jstring2string(env, key); @@ -1086,7 +1123,8 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild has_mixed_join_condition, is_existence_join, struct_string, - is_null_aware_anti_join)); + is_null_aware_anti_join, + has_null_key_values)); return obj->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 6d5083dbe2957..e98705ebd19ad 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1161,7 +1161,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") - .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index de979ac274271..be66559961e10 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1182,7 +1182,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") - .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 89a44c602ecc2..2b4c9502b13e2 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1027,7 +1027,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") - .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 388036c558a43..3b8d1c3a58e48 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1027,7 +1027,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-31220, SPARK-32056: repartition by expression with AQE") .exclude("SPARK-31220, SPARK-32056: repartition by range with AQE") .exclude("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") - .exclude("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") .exclude("SPARK-32753: Only copy tags to node with no tags") .exclude("Logging plan changes for AQE") .exclude("SPARK-33551: Do not use AQE shuffle read for repartition")