diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 03e5aaa538a9..cea30ae284d7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -309,8 +309,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { condition: Option[Expression], left: SparkPlan, right: SparkPlan, - isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase = - CHShuffledHashJoinExecTransformer( + isSkewJoin: Boolean, + logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase = { + val res = CHShuffledHashJoinExecTransformer( leftKeys, rightKeys, joinType, @@ -319,6 +320,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { left, right, isSkewJoin) + res.setLogicalLink(logicalLink.getOrElse(null)) + res + } /** Generate BroadcastHashJoinExecTransformer. */ def genBroadcastHashJoinExecTransformer( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala index 15e21681b1c6..252b9bc03fd6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala @@ -26,6 +26,7 @@ import org.apache.spark.rpc.GlutenDriverEndpoint import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive._ @@ -135,24 +136,35 @@ case class CHShuffledHashJoinExecTransformer( .append("isExistenceJoin=") .append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0) .append("\n") - .append("leftRowCount=") - .append(leftStats.rowCount.getOrElse(-1)) - .append("\n") - .append("leftNumPartitions=") - .append(leftStats.numPartitions) - .append("\n") - .append("leftNumMappers=") - .append(leftStats.numMappers) - .append("\n") - .append("rightRowCount=") - .append(rightStats.rowCount.getOrElse(-1)) - .append("\n") - .append("rightNumPartitions=") - .append(rightStats.numPartitions) - .append("\n") - .append("rightNumMappers=") - .append(rightStats.numMappers) - .append("\n") + logicalLink match { + case Some(join: Join) => + val leftRowCount = + if (needSwitchChildren) join.left.stats.rowCount else join.right.stats.rowCount + val rightRowCount = + if (needSwitchChildren) join.right.stats.rowCount else join.left.stats.rowCount + val leftSizeInBytes = + if (needSwitchChildren) join.left.stats.sizeInBytes else join.right.stats.sizeInBytes + val rightSizeInBytes = + if (needSwitchChildren) join.right.stats.sizeInBytes else join.left.stats.sizeInBytes + val numPartitions = outputPartitioning.numPartitions + joinParametersStr + .append("leftRowCount=") + .append(leftRowCount.getOrElse(-1)) + .append("\n") + .append("leftSizeInBytes=") + .append(leftSizeInBytes) + .append("\n") + .append("rightRowCount=") + .append(rightRowCount.getOrElse(-1)) + .append("\n") + .append("rightSizeInBytes=") + .append(rightSizeInBytes) + .append("\n") + .append("numPartitions=") + .append(numPartitions) + .append("\n") + case _ => + } val message = StringValue .newBuilder() .setValue(joinParametersStr.toString) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 89d29781c079..cc813daf1ff6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -437,7 +437,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { condition: Option[Expression], left: SparkPlan, right: SparkPlan, - isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase = + isSkewJoin: Boolean, + logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase = ShuffledHashJoinExecTransformer( leftKeys, rightKeys, diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index f8f0f41fe21a..84744dab21b8 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -96,19 +96,19 @@ struct JoinConfig { /// If the join condition is like `t1.k = t2.k and (t1.id1 = t2.id2 or t1.id2 = t2.id2)`, try to join with multi /// join on clauses `(t1.k = t2.k and t1.id1 = t2.id2) or (t1.k = t2.k or t1.id2 = t2.id2)` - inline static const String PREFER_INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES = "prefer_inequal_join_to_multi_join_on_clauses"; + inline static const String PREFER_MULTI_JOIN_ON_CLAUSES = "prefer_multi_join_on_clauses"; /// Only hash join supports multi join on clauses, the right table cannot be too large. If the row number of right /// table is larger then this limit, this transform will not work. - inline static const String INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES_ROWS_LIMIT = "inequal_join_to_multi_join_on_clauses_row_limit"; + inline static const String MULTI_JOIN_ON_CLAUSES_BUILD_SIDE_ROWS_LIMIT = "multi_join_on_clauses_build_side_row_limit"; - bool prefer_inequal_join_to_multi_join_on_clauses = true; - size_t inequal_join_to_multi_join_on_clauses_rows_limit = 10000000; + bool prefer_multi_join_on_clauses = true; + size_t multi_join_on_clauses_build_side_rows_limit = 10000000; static JoinConfig loadFromContext(DB::ContextPtr context) { JoinConfig config; - config.prefer_inequal_join_to_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES, true); - config.inequal_join_to_multi_join_on_clauses_rows_limit = context->getConfigRef().getUInt64(INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES_ROWS_LIMIT, 10000000); + config.prefer_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_MULTI_JOIN_ON_CLAUSES, true); + config.multi_join_on_clauses_build_side_rows_limit = context->getConfigRef().getUInt64(MULTI_JOIN_ON_CLAUSES_BUILD_SIDE_ROWS_LIMIT, 10000000); return config; } }; diff --git a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp index 59de24de5275..344bf939fe09 100644 --- a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp +++ b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp @@ -132,11 +132,10 @@ JoinOptimizationInfo JoinOptimizationInfo::parse(const String & advance) tryAssign(kvs, "isNullAwareAntiJoin", info.is_null_aware_anti_join); tryAssign(kvs, "isExistenceJoin", info.is_existence_join); tryAssign(kvs, "leftRowCount", info.left_table_rows); - tryAssign(kvs, "leftNumPartitions", info.left_table_partitions_num); - tryAssign(kvs, "leftNumMappers", info.left_table_mappers_num); + tryAssign(kvs, "leftSizeInBytes", info.left_table_bytes); tryAssign(kvs, "rightRowCount", info.right_table_rows); - tryAssign(kvs, "rightNumPartitions", info.right_table_partitions_num); - tryAssign(kvs, "rightNumMappers", info.right_table_mappers_num); + tryAssign(kvs, "rightSizeInBytes", info.right_table_bytes); + tryAssign(kvs, "numPartitions", info.partitions_num); return info; } } diff --git a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h index 08bd520760d7..5f6fe6d256e3 100644 --- a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h +++ b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h @@ -30,11 +30,10 @@ struct JoinOptimizationInfo bool is_null_aware_anti_join = false; bool is_existence_join = false; Int64 left_table_rows = -1; - Int64 left_table_partitions_num = -1; - Int64 left_table_mappers_num = -1; + Int64 left_table_bytes = -1; Int64 right_table_rows = -1; - Int64 right_table_partitions_num = -1; - Int64 right_table_mappers_num = -1; + Int64 right_table_bytes = -1; + Int64 partitions_num = -1; String storage_join_key; static JoinOptimizationInfo parse(const String & advance); diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/JoinRelParser.cpp index 02691fceb7ba..fbab2609412a 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp @@ -316,10 +316,10 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q table_join->addDisjunct(); bool is_multi_join_on_clauses = isJoinWithMultiJoinOnClauses(table_join->getOnlyClause(), join_on_clauses, join, left_header, right_header); - if (is_multi_join_on_clauses && join_config.prefer_inequal_join_to_multi_join_on_clauses && join_opt_info.right_table_rows > 0 - && join_opt_info.right_table_mappers_num > 0 - && join_opt_info.right_table_rows / join_opt_info.right_table_mappers_num - < join_config.inequal_join_to_multi_join_on_clauses_rows_limit) + if (is_multi_join_on_clauses && join_config.prefer_multi_join_on_clauses && join_opt_info.right_table_rows > 0 + && join_opt_info.partitions_num > 0 + && join_opt_info.right_table_rows / join_opt_info.partitions_num + < join_config.multi_join_on_clauses_build_side_rows_limit) { query_plan = buildMultiOnClauseHashJoin(table_join, std::move(left), std::move(right), join_on_clauses); } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 69392a353d7f..b11102aea907 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -114,7 +114,8 @@ trait SparkPlanExecApi { condition: Option[Expression], left: SparkPlan, right: SparkPlan, - isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase + isSkewJoin: Boolean, + logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase /** Generate BroadcastHashJoinExecTransformer. */ def genBroadcastHashJoinExecTransformer( diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala index f9eaa4179c67..c6392d13c70d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala @@ -392,7 +392,8 @@ case class AddFallbackTagRule() extends Rule[SparkPlan] { plan.condition, plan.left, plan.right, - plan.isSkewJoin) + plan.isSkewJoin, + plan.logicalLink) transformer.doValidate().tagOnFallback(plan) case plan: BroadcastExchangeExec => val transformer = ColumnarBroadcastExchangeExec(plan.mode, plan.child) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index a8cc791286b2..a75f8f64de09 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -142,7 +142,8 @@ case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { plan.condition, left, right, - plan.isSkewJoin) + plan.isSkewJoin, + plan.logicalLink) case plan: SortMergeJoinExec => val left = plan.left val right = plan.right