Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Aug 15, 2024
1 parent ed40790 commit 8081b62
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase =
CHShuffledHashJoinExecTransformer(
isSkewJoin: Boolean,
logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase = {
val res = CHShuffledHashJoinExecTransformer(
leftKeys,
rightKeys,
joinType,
Expand All @@ -319,6 +320,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
left,
right,
isSkewJoin)
res.setLogicalLink(logicalLink.getOrElse(null))
res
}

/** Generate BroadcastHashJoinExecTransformer. */
def genBroadcastHashJoinExecTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive._
Expand Down Expand Up @@ -135,24 +136,35 @@ case class CHShuffledHashJoinExecTransformer(
.append("isExistenceJoin=")
.append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0)
.append("\n")
.append("leftRowCount=")
.append(leftStats.rowCount.getOrElse(-1))
.append("\n")
.append("leftNumPartitions=")
.append(leftStats.numPartitions)
.append("\n")
.append("leftNumMappers=")
.append(leftStats.numMappers)
.append("\n")
.append("rightRowCount=")
.append(rightStats.rowCount.getOrElse(-1))
.append("\n")
.append("rightNumPartitions=")
.append(rightStats.numPartitions)
.append("\n")
.append("rightNumMappers=")
.append(rightStats.numMappers)
.append("\n")
logicalLink match {
case Some(join: Join) =>
val leftRowCount =
if (needSwitchChildren) join.left.stats.rowCount else join.right.stats.rowCount
val rightRowCount =
if (needSwitchChildren) join.right.stats.rowCount else join.left.stats.rowCount
val leftSizeInBytes =
if (needSwitchChildren) join.left.stats.sizeInBytes else join.right.stats.sizeInBytes
val rightSizeInBytes =
if (needSwitchChildren) join.right.stats.sizeInBytes else join.left.stats.sizeInBytes
val numPartitions = outputPartitioning.numPartitions
joinParametersStr
.append("leftRowCount=")
.append(leftRowCount.getOrElse(-1))
.append("\n")
.append("leftSizeInBytes=")
.append(leftSizeInBytes)
.append("\n")
.append("rightRowCount=")
.append(rightRowCount.getOrElse(-1))
.append("\n")
.append("rightSizeInBytes=")
.append(rightSizeInBytes)
.append("\n")
.append("numPartitions=")
.append(numPartitions)
.append("\n")
case _ =>
}
val message = StringValue
.newBuilder()
.setValue(joinParametersStr.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase =
isSkewJoin: Boolean,
logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase =
ShuffledHashJoinExecTransformer(
leftKeys,
rightKeys,
Expand Down
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,19 @@ struct JoinConfig
{
/// If the join condition is like `t1.k = t2.k and (t1.id1 = t2.id2 or t1.id2 = t2.id2)`, try to join with multi
/// join on clauses `(t1.k = t2.k and t1.id1 = t2.id2) or (t1.k = t2.k or t1.id2 = t2.id2)`
inline static const String PREFER_INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES = "prefer_inequal_join_to_multi_join_on_clauses";
inline static const String PREFER_MULTI_JOIN_ON_CLAUSES = "prefer_multi_join_on_clauses";
/// Only hash join supports multi join on clauses, the right table cannot be too large. If the row number of right
/// table is larger then this limit, this transform will not work.
inline static const String INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES_ROWS_LIMIT = "inequal_join_to_multi_join_on_clauses_row_limit";
inline static const String MULTI_JOIN_ON_CLAUSES_BUILD_SIDE_ROWS_LIMIT = "multi_join_on_clauses_build_side_row_limit";

bool prefer_inequal_join_to_multi_join_on_clauses = true;
size_t inequal_join_to_multi_join_on_clauses_rows_limit = 10000000;
bool prefer_multi_join_on_clauses = true;
size_t multi_join_on_clauses_build_side_rows_limit = 10000000;

static JoinConfig loadFromContext(DB::ContextPtr context)
{
JoinConfig config;
config.prefer_inequal_join_to_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES, true);
config.inequal_join_to_multi_join_on_clauses_rows_limit = context->getConfigRef().getUInt64(INEQUAL_JOIN_TO_MULTI_JOIN_ON_CLAUSES_ROWS_LIMIT, 10000000);
config.prefer_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_MULTI_JOIN_ON_CLAUSES, true);
config.multi_join_on_clauses_build_side_rows_limit = context->getConfigRef().getUInt64(MULTI_JOIN_ON_CLAUSES_BUILD_SIDE_ROWS_LIMIT, 10000000);
return config;
}
};
Expand Down
7 changes: 3 additions & 4 deletions cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@ JoinOptimizationInfo JoinOptimizationInfo::parse(const String & advance)
tryAssign(kvs, "isNullAwareAntiJoin", info.is_null_aware_anti_join);
tryAssign(kvs, "isExistenceJoin", info.is_existence_join);
tryAssign(kvs, "leftRowCount", info.left_table_rows);
tryAssign(kvs, "leftNumPartitions", info.left_table_partitions_num);
tryAssign(kvs, "leftNumMappers", info.left_table_mappers_num);
tryAssign(kvs, "leftSizeInBytes", info.left_table_bytes);
tryAssign(kvs, "rightRowCount", info.right_table_rows);
tryAssign(kvs, "rightNumPartitions", info.right_table_partitions_num);
tryAssign(kvs, "rightNumMappers", info.right_table_mappers_num);
tryAssign(kvs, "rightSizeInBytes", info.right_table_bytes);
tryAssign(kvs, "numPartitions", info.partitions_num);
return info;
}
}
Expand Down
7 changes: 3 additions & 4 deletions cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ struct JoinOptimizationInfo
bool is_null_aware_anti_join = false;
bool is_existence_join = false;
Int64 left_table_rows = -1;
Int64 left_table_partitions_num = -1;
Int64 left_table_mappers_num = -1;
Int64 left_table_bytes = -1;
Int64 right_table_rows = -1;
Int64 right_table_partitions_num = -1;
Int64 right_table_mappers_num = -1;
Int64 right_table_bytes = -1;
Int64 partitions_num = -1;
String storage_join_key;

static JoinOptimizationInfo parse(const String & advance);
Expand Down
8 changes: 4 additions & 4 deletions cpp-ch/local-engine/Parser/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q
table_join->addDisjunct();
bool is_multi_join_on_clauses
= isJoinWithMultiJoinOnClauses(table_join->getOnlyClause(), join_on_clauses, join, left_header, right_header);
if (is_multi_join_on_clauses && join_config.prefer_inequal_join_to_multi_join_on_clauses && join_opt_info.right_table_rows > 0
&& join_opt_info.right_table_mappers_num > 0
&& join_opt_info.right_table_rows / join_opt_info.right_table_mappers_num
< join_config.inequal_join_to_multi_join_on_clauses_rows_limit)
if (is_multi_join_on_clauses && join_config.prefer_multi_join_on_clauses && join_opt_info.right_table_rows > 0
&& join_opt_info.partitions_num > 0
&& join_opt_info.right_table_rows / join_opt_info.partitions_num
< join_config.multi_join_on_clauses_build_side_rows_limit)
{
query_plan = buildMultiOnClauseHashJoin(table_join, std::move(left), std::move(right), join_on_clauses);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ trait SparkPlanExecApi {
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase
isSkewJoin: Boolean,
logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase

/** Generate BroadcastHashJoinExecTransformer. */
def genBroadcastHashJoinExecTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ case class AddFallbackTagRule() extends Rule[SparkPlan] {
plan.condition,
plan.left,
plan.right,
plan.isSkewJoin)
plan.isSkewJoin,
plan.logicalLink)
transformer.doValidate().tagOnFallback(plan)
case plan: BroadcastExchangeExec =>
val transformer = ColumnarBroadcastExchangeExec(plan.mode, plan.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil {
plan.condition,
left,
right,
plan.isSkewJoin)
plan.isSkewJoin,
plan.logicalLink)
case plan: SortMergeJoinExec =>
val left = plan.left
val right = plan.right
Expand Down

0 comments on commit 8081b62

Please sign in to comment.