diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteSortMergeJoinToHashJoinRule.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteSortMergeJoinToHashJoinRule.scala index 8c5ada043fbb6..0c463bd779af9 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteSortMergeJoinToHashJoinRule.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteSortMergeJoinToHashJoinRule.scala @@ -86,6 +86,7 @@ case class RewriteSortMergeJoinToHashJoinRule(session: SparkSession) logError(s"Validation failed for ShuffledHashJoinExec: ${validateResult.reason()}") return smj } + logDebug(s"Applied SortMergeJoin to ShuffledHashJoin") hashJoin } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHJoinValidateUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHJoinValidateUtil.scala index aabdae0951a7c..b86482720c7c8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHJoinValidateUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHJoinValidateUtil.scala @@ -54,8 +54,12 @@ object CHJoinValidateUtil extends Logging { condition.isDefined && hasTwoTableColumn(leftOutputSet, rightOutputSet, condition.get) val shouldFallback = joinStrategy match { case SortMergeJoinStrategy(joinType) => - joinType.sql.contains("SEMI") || joinType.sql.contains("ANTI") || joinType.toString - .contains("ExistenceJoin") || hasMixedFilterCondition + if (!joinType.isInstanceOf[ExistenceJoin] && joinType.sql.contains("INNER")) { + false + } else { + joinType.sql.contains("SEMI") || joinType.sql.contains("ANTI") || joinType.toString + .contains("ExistenceJoin") || hasMixedFilterCondition + } case UnknownJoinStrategy(joinType) => throw new IllegalArgumentException(s"Unknown join type $joinStrategy") case _ => false diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala index b9d580c7249cd..4c49cc2d9f46c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala @@ -119,7 +119,7 @@ class GlutenClickHouseColumnarMemorySortShuffleSuite } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala index 10e5c7534d352..e5da78de3fd60 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala @@ -163,7 +163,7 @@ class GlutenClickHouseColumnarShuffleAQESuite } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala index dd997832d3e3d..17bd9912b032c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2ColumnarShuffleSuite.scala @@ -168,7 +168,7 @@ class GlutenClickHouseDSV2ColumnarShuffleSuite extends GlutenClickHouseTPCHAbstr } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala index 08393ccfe7748..a58b6b1c1292d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDSV2Suite.scala @@ -126,7 +126,7 @@ class GlutenClickHouseDSV2Suite extends GlutenClickHouseTPCHAbstractSuite { } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala index cf1bdd296c013..bd831e64bf38c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala @@ -343,7 +343,7 @@ class GlutenClickHouseDecimalSuite decimalTPCHTables.foreach { dt => { - val fallBack = (sql_num == 16 || sql_num == 21) + val fallBack = (sql_num == 16) val compareResult = !dt._2.contains(sql_num) val native = if (fallBack) "fallback" else "native" val compare = if (compareResult) "compare" else "noCompare" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala index c5f67f45d5771..5f9aa0dbda60f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableColumnarShuffleSuite.scala @@ -171,7 +171,7 @@ class GlutenClickHouseTPCHNullableColumnarShuffleSuite extends GlutenClickHouseT } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala index 7f62c69931577..e0e4d33804504 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala @@ -174,7 +174,7 @@ class GlutenClickHouseTPCHNullableSuite extends GlutenClickHouseTPCHAbstractSuit } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index 1c09449c817fb..f25a1313255ff 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -175,7 +175,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala index 1fd8983f5876a..3e1507bf17aab 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala @@ -239,6 +239,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite | LIMIT 100 ; |""".stripMargin // There are some BroadcastHashJoin with NOT condition - compareResultsAgainstVanillaSpark(sql, true, { df => }, false) + compareResultsAgainstVanillaSpark(sql, true, { df => }) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala index 509c830545c60..30fdddbf1d714 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala @@ -49,8 +49,11 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite extends GlutenClickHouseTPC .set("spark.shuffle.manager", "sort") .set("spark.io.compression.codec", "snappy") .set("spark.sql.shuffle.partitions", "5") - .set("spark.sql.autoBroadcastJoinThreshold", "10MB") - .set("spark.memory.offHeap.size", "8g") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.memory.offHeap.size", "12g") + .set( + "spark.gluten.sql.columnar.backend.ch.runtime_config.extra_memory_hard_limit", + "2147483648") .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index c2e2f9f5565f2..39dc7baf96950 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -275,7 +275,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { + runTPCHQuery(21) { df => val plans = collect(df.queryExecution.executedPlan) { case scanExec: BasicScanExecTransformer => scanExec diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala index 8c706f6836399..3847d802f5830 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala @@ -75,6 +75,10 @@ class GlutenClickHouseTPCHParquetAQEConcurrentSuite createNotNullTPCHTablesInParquet(tablesPath) } + test("TPCH Q21") { + runTPCHQuery(21) { df => } + } + test("fix race condition at the global variable of ColumnarOverrideRules::isAdaptiveContext") { val queries = ParVector((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22): _*) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala index 1d8389b481439..2aadac05d348c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala @@ -209,7 +209,7 @@ class GlutenClickHouseTPCHParquetAQESuite } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("TPCH Q22") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 0efc1414ce338..e21df203dac03 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -335,7 +335,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr } test("TPCH Q21") { - runTPCHQuery(21, noFallBack = false) { df => } + runTPCHQuery(21) { df => } } test("GLUTEN-2115: Fix wrong number of records shuffle written") { diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index e5f5dd5dccdb8..44f7066ed0679 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -27,6 +27,9 @@ #include #include +#include +#include + namespace DB { @@ -70,6 +73,7 @@ int64_t QueryContextManager::initializeQuery() query_context->thread_group->memory_tracker.setSoftLimit(memory_limit); query_context->thread_group->memory_tracker.setHardLimit(memory_limit + config.extra_memory_hard_limit); + LOG_INFO(getLogger("QueryContextManager"), "xxx memory limit: {} {}", memory_limit, config.extra_memory_hard_limit); int64_t id = reinterpret_cast(query_context->thread_group.get()); query_map.insert(id, query_context); return id; @@ -167,4 +171,4 @@ double currentThreadGroupMemoryUsageRatio() } return static_cast(CurrentThread::getGroup()->memory_tracker.get()) / CurrentThread::getGroup()->memory_tracker.getSoftLimit(); } -} \ No newline at end of file +} diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/JoinRelParser.cpp index ef19e007d4398..7efdde84a7de1 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp @@ -62,6 +62,7 @@ std::shared_ptr createDefaultTableJoin(substrait::JoinRel_JoinTyp std::pair kind_and_strictness = JoinUtil::getJoinKindAndStrictness(join_type, is_existence_join); table_join->setKind(kind_and_strictness.first); table_join->setStrictness(kind_and_strictness.second); + LOG_ERROR(getLogger("JoinRelParser"), "xxx join type: {} {}", table_join->kind(), table_join->strictness()); return table_join; } @@ -205,6 +206,8 @@ void JoinRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & righ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::QueryPlanPtr left, DB::QueryPlanPtr right) { + LOG_ERROR(getLogger("JoinRelParser"), "xxx left header: {}", left->getCurrentDataStream().header.dumpStructure()); + LOG_ERROR(getLogger("JoinRelParser"), "xxx right header: {}", right->getCurrentDataStream().header.dumpStructure()); auto join_config = JoinConfig::loadFromContext(getContext()); google::protobuf::StringValue optimization_info; optimization_info.ParseFromString(join.advanced_extension().optimization().value()); diff --git a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala index 95391a2c42f56..c57536dce76a0 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala @@ -40,7 +40,8 @@ case class Table(name: String, partitionColumns: Seq[String]) abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSparkSession - with AdaptiveSparkPlanHelper { + with AdaptiveSparkPlanHelper + with Logging { protected val resourcePath: String protected val fileFormat: String @@ -120,6 +121,9 @@ abstract class WholeStageTransformerSuite val queryResultStr = Arm.withResource(Source.fromFile(new File(queriesResults + "/" + sqlNum + ".out"), "UTF-8"))( _.mkString) + if (!queryResultStr.equals(resultStr.toString())) { + logError(s"Results are mismatched. $sqlNum \n$queryResultStr vs. \n${resultStr.toString()}") + } assert(queryResultStr.equals(resultStr.toString)) }