Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Aug 16, 2024
1 parent 1376873 commit 3b923af
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean,
logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase = {
val res = CHShuffledHashJoinExecTransformer(
isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase = {
CHShuffledHashJoinExecTransformer(
leftKeys,
rightKeys,
joinType,
Expand All @@ -320,8 +319,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
left,
right,
isSkewJoin)
res.setLogicalLink(logicalLink.getOrElse(null))
res
}

/** Generate BroadcastHashJoinExecTransformer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -69,8 +68,6 @@ object JoinTypeTransform {
}
}

case class ShuffleStageStaticstics(numPartitions: Int, numMappers: Int, rowCount: Option[BigInt])

case class CHShuffledHashJoinExecTransformer(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
Expand Down Expand Up @@ -131,33 +128,34 @@ case class CHShuffledHashJoinExecTransformer(
.append("isExistenceJoin=")
.append(if (joinType.isInstanceOf[ExistenceJoin]) 1 else 0)
.append("\n")
logicalLink match {
case Some(join: Join) =>
val left = if (!needSwitchChildren) join.left else join.right
val right = if (!needSwitchChildren) join.right else join.left
val leftRowCount = left.stats.rowCount
val rightRowCount = right.stats.rowCount
val leftSizeInBytes = left.stats.sizeInBytes
val rightSizeInBytes = right.stats.sizeInBytes
val numPartitions = outputPartitioning.numPartitions

CHAQEUtil.getShuffleQueryStageStats(streamedPlan) match {
case Some(stats) =>
joinParametersStr
.append("leftRowCount=")
.append(leftRowCount.getOrElse(-1))
.append(stats.rowCount.getOrElse(-1))
.append("\n")
.append("leftSizeInBytes=")
.append(leftSizeInBytes)
.append(stats.sizeInBytes)
.append("\n")
case _ =>
}
CHAQEUtil.getShuffleQueryStageStats(buildPlan) match {
case Some(stats) =>
joinParametersStr
.append("rightRowCount=")
.append(rightRowCount.getOrElse(-1))
.append(stats.rowCount.getOrElse(-1))
.append("\n")
.append("rightSizeInBytes=")
.append(rightSizeInBytes)
.append("\n")
.append("numPartitions=")
.append(numPartitions)
.append(stats.sizeInBytes)
.append("\n")
case _ =>
}
joinParametersStr
.append("numPartitions=")
.append(outputPartitioning.numPartitions)
.append("\n")

val message = StringValue
.newBuilder()
.setValue(joinParametersStr.toString)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._

object CHAQEUtil {

// All TransformSupports have lost the logicalLink. So we need iterate the plan to find the
// first ShuffleQueryStageExec and get the runtime stats.
def getShuffleQueryStageStats(plan: SparkPlan): Option[Statistics] = {
plan match {
case stage: ShuffleQueryStageExec =>
Some(stage.getRuntimeStatistics)
case _ =>
if (plan.children.length == 1) {
getShuffleQueryStageStats(plan.children.head)
} else {
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean,
logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase =
isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase =
ShuffledHashJoinExecTransformer(
leftKeys,
rightKeys,
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void tryAssign<Int64>(const std::unordered_map<String, String> & kvs, const Stri
catch (...)
{
LOG_ERROR(getLogger("tryAssign"), "Invalid number: {}", it->second);
throw;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ trait SparkPlanExecApi {
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean,
logicalLink: Option[LogicalPlan]): ShuffledHashJoinExecTransformerBase
isSkewJoin: Boolean): ShuffledHashJoinExecTransformerBase

/** Generate BroadcastHashJoinExecTransformer. */
def genBroadcastHashJoinExecTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,7 @@ case class AddFallbackTagRule() extends Rule[SparkPlan] {
plan.condition,
plan.left,
plan.right,
plan.isSkewJoin,
plan.logicalLink)
plan.isSkewJoin)
transformer.doValidate().tagOnFallback(plan)
case plan: BroadcastExchangeExec =>
val transformer = ColumnarBroadcastExchangeExec(plan.mode, plan.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil {
plan.condition,
left,
right,
plan.isSkewJoin,
plan.logicalLink)
plan.isSkewJoin)
case plan: SortMergeJoinExec =>
val left = plan.left
val right = plan.right
Expand Down

0 comments on commit 3b923af

Please sign in to comment.