Skip to content

Commit

Permalink
Merge branch 'main' into sort
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn authored Jul 11, 2024
2 parents 5bc17d0 + c344a34 commit c9a1d53
Show file tree
Hide file tree
Showing 147 changed files with 3,156 additions and 1,184 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/velox_docker_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ on:
branches:
- 'main'

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

concurrency:
group: ${{ github.repository }}-${{ github.workflow }}
cancel-in-progress: false
Expand Down Expand Up @@ -126,4 +129,4 @@ jobs:
# - uses: actions/cache/save@v3
# with:
# path: '${{ env.CCACHE_DIR }}'
# key: ccache-centos-release-default
# key: ccache-centos-release-default
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,20 @@ public static long build(
return converter.genColumnNameWithExprId(attr);
})
.collect(Collectors.joining(","));

int joinType;
if (broadCastContext.buildHashTableId().startsWith("BuiltBNLJBroadcastTable-")) {
joinType = SubstraitUtil.toCrossRelSubstrait(broadCastContext.joinType()).ordinal();
} else {
joinType = SubstraitUtil.toSubstrait(broadCastContext.joinType()).ordinal();
}

return nativeBuild(
broadCastContext.buildHashTableId(),
batches,
rowCount,
joinKey,
SubstraitUtil.toSubstrait(broadCastContext.joinType()).ordinal(),
joinType,
broadCastContext.hasMixedFiltCondition(),
toNameStruct(output).toByteArray());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

wExpression.windowFunction match {
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank | _: NTile =>
case _: RowNumber | _: AggregateExpression | _: Rank | _: DenseRank | _: PercentRank |
_: NTile =>
allSupported = allSupported
case l: Lag =>
checkLagOrLead(l.third)
Expand Down Expand Up @@ -297,4 +298,5 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -348,16 +348,29 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics)

override def genNestedLoopJoinTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"NestedLoopJoinTransformer metrics update is not supported in CH backend")
}
sparkContext: SparkContext): Map[String, SQLMetric] = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"postProjectTime" ->
SQLMetrics.createTimingMetric(sparkContext, "time of postProjection"),
"probeTime" ->
SQLMetrics.createTimingMetric(sparkContext, "time of probe"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time"),
"fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
sparkContext,
"filling right join side time"),
"conditionTime" -> SQLMetrics.createTimingMetric(sparkContext, "join condition time")
)

override def genNestedLoopJoinTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = {
throw new UnsupportedOperationException(
s"NestedLoopJoinTransformer metrics update is not supported in CH backend")
}
metrics: Map[String, SQLMetric]): MetricsUpdater = new BroadcastNestedLoopJoinMetricsUpdater(
metrics)

override def genSampleTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]): BroadcastNestedLoopJoinExecTransformer =
throw new GlutenNotSupportException(
"BroadcastNestedLoopJoinExecTransformer is not supported in ch backend.")
CHBroadcastNestedLoopJoinExecTransformer(
left,
right,
buildSide,
joinType,
condition
)

override def genSampleExecTransformer(
lowerBound: Double,
Expand Down Expand Up @@ -460,16 +465,23 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
val hashedRelationBroadcastMode = mode.asInstanceOf[HashedRelationBroadcastMode]

val buildKeys: Seq[Expression] = mode match {
case mode1: HashedRelationBroadcastMode =>
mode1.key
case _ =>
// IdentityBroadcastMode
Seq.empty
}

val (newChild, newOutput, newBuildKeys) =
if (
hashedRelationBroadcastMode.key
buildKeys
.forall(k => k.isInstanceOf[AttributeReference] || k.isInstanceOf[BoundReference])
) {
(child, child.output, Seq.empty[Expression])
} else {
// pre projection in case of expression join keys
val buildKeys = hashedRelationBroadcastMode.key
val appendedProjections = new ArrayBuffer[NamedExpression]()
val preProjectionBuildKeys = buildKeys.zipWithIndex.map {
case (e, idx) =>
Expand Down Expand Up @@ -704,7 +716,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
val columnName = s"${aliasExpr.name}_${aliasExpr.exprId.id}"
val wExpression = aliasExpr.child.asInstanceOf[WindowExpression]
wExpression.windowFunction match {
case wf @ (RowNumber() | Rank(_) | DenseRank(_)) =>
case wf @ (RowNumber() | Rank(_) | DenseRank(_) | PercentRank(_)) =>
val aggWindowFunc = wf.asInstanceOf[AggregateWindowFunction]
val frame = aggWindowFunc.frame.asInstanceOf[SpecifiedWindowFrame]
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.ValidationResult

import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftSemi}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.protobuf.{Any, StringValue}

case class CHBroadcastNestedLoopJoinExecTransformer(
left: SparkPlan,
right: SparkPlan,
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression])
extends BroadcastNestedLoopJoinExecTransformer(
left,
right,
buildSide,
joinType,
condition
) {

override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
val streamedRDD = getColumnarInputRDDs(streamedPlan)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
if (executionId != null) {
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
} else {
logWarning(
s"Can't not trace broadcast table data $buildBroadcastTableId" +
s" because execution id is null." +
s" Will clean up until expire time.")
}
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
val context =
BroadCastHashJoinContext(Seq.empty, joinType, false, buildPlan.output, buildBroadcastTableId)
val broadcastRDD = CHBroadcastBuildSideRDD(sparkContext, broadcast, context)
streamedRDD :+ broadcastRDD
}

override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): CHBroadcastNestedLoopJoinExecTransformer =
copy(left = newLeft, right = newRight)

def isMixedCondition(cond: Option[Expression]): Boolean = {
val res = if (cond.isDefined) {
val leftOutputSet = left.outputSet
val rightOutputSet = right.outputSet
val allReferences = cond.get.references
!(allReferences.subsetOf(leftOutputSet) || allReferences.subsetOf(rightOutputSet))
} else {
false
}
res
}

override def genJoinParameters(): Any = {
// for ch
val joinParametersStr = new StringBuffer("JoinParameters:")
joinParametersStr
.append("buildHashTableId=")
.append(buildBroadcastTableId)
.append("\n")
val message = StringValue
.newBuilder()
.setValue(joinParametersStr.toString)
.build()
BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
}

override def validateJoinTypeAndBuildSide(): ValidationResult = {
joinType match {
case _: InnerLike =>
case _ =>
if (joinType == LeftSemi || condition.isDefined) {
return ValidationResult.notOk(
s"Broadcast Nested Loop join is not supported join type $joinType with conditions")
}
}

ValidationResult.ok
}

}
Loading

0 comments on commit c9a1d53

Please sign in to comment.