Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Provide options to combine small batches before sending to shuffle #6009

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
}
}

override def genColumnarShuffleExchange(
shuffle: ShuffleExchangeExec,
child: SparkPlan): SparkPlan = {
override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan = {
val child = shuffle.child
if (
BackendsApiManager.getSettings.supportShuffleWithProject(
shuffle.outputPartitioning,
shuffle.child)
BackendsApiManager.getSettings.supportShuffleWithProject(shuffle.outputPartitioning, child)
) {
val (projectColumnNumber, newPartitioning, newChild) =
addProjectionForShuffleExchange(shuffle)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils;

import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.Runtimes;
import org.apache.gluten.memory.nmm.NativeMemoryManager;
import org.apache.gluten.memory.nmm.NativeMemoryManagers;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;
import org.apache.gluten.vectorized.ColumnarBatchOutIterator;

import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.Iterator;

public final class VeloxBatchAppender {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance();
final NativeMemoryManager nmm = NativeMemoryManagers.contextInstance("VeloxBatchAppender");
long outHandle =
VeloxBatchAppenderJniWrapper.forRuntime(runtime)
.create(
nmm.getNativeInstanceHandle(), minOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle, nmm);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils;

import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.RuntimeAware;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;

public class VeloxBatchAppenderJniWrapper implements RuntimeAware {
private final Runtime runtime;

private VeloxBatchAppenderJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxBatchAppenderJniWrapper forRuntime(Runtime runtime) {
return new VeloxBatchAppenderJniWrapper(runtime);
}

@Override
public long handle() {
return runtime.getHandle();
}

public native long create(
long memoryManagerHandle, int minOutputBatchSize, ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
resIter.close()
}
.recyclePayload(batch => batch.close())
.addToPipelineTime(pipelineTime)
.collectLifeMillis(millis => pipelineTime += millis)
.asInterruptible(context)
.create()
}
Expand Down Expand Up @@ -246,7 +246,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
nativeResultIterator.close()
}
.recyclePayload(batch => batch.close())
.addToPipelineTime(pipelineTime)
.collectLifeMillis(millis => pipelineTime += millis)
.create()
}
// scalastyle:on argcount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper =
HashAggregateExecPullOutHelper(aggregateExpressions, aggregateAttributes)

override def genColumnarShuffleExchange(
shuffle: ShuffleExchangeExec,
newChild: SparkPlan): SparkPlan = {
override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan = {
def allowHashOnMap[T](f: => T): T = {
val originalAllowHash = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_HASH_ON_MAPTYPE)
try {
Expand All @@ -333,20 +331,28 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}
}

def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = {
if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) {
VeloxAppendBatchesExec(plan, GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
} else {
plan
}
}

val child = shuffle.child

shuffle.outputPartitioning match {
case HashPartitioning(exprs, _) =>
val hashExpr = new Murmur3Hash(exprs)
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ newChild.output
val projectTransformer = ProjectExecTransformer(projectList, newChild)
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output
val projectTransformer = ProjectExecTransformer(projectList, child)
val validationResult = projectTransformer.doValidate()
if (validationResult.isValid) {
ColumnarShuffleExchangeExec(
shuffle,
projectTransformer,
projectTransformer.output.drop(1))
val newChild = maybeAddAppendBatchesExec(projectTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1))
} else {
TransformHints.tagNotTransformable(shuffle, validationResult)
shuffle.withNewChildren(newChild :: Nil)
shuffle.withNewChildren(child :: Nil)
}
case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && num > 1 =>
// scalastyle:off line.size.limit
Expand All @@ -357,19 +363,20 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
allowHashOnMap {
// Velox hash expression does not support null type and we also do not need to sort
// null type since the value always be null.
val columnsForHash = newChild.output.filterNot(_.dataType == NullType)
val columnsForHash = child.output.filterNot(_.dataType == NullType)
if (columnsForHash.isEmpty) {
val newChild = maybeAddAppendBatchesExec(child)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
val hashExpr = new Murmur3Hash(columnsForHash)
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ newChild.output
val projectTransformer = ProjectExecTransformer(projectList, newChild)
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output
val projectTransformer = ProjectExecTransformer(projectList, child)
val projectBeforeSortValidationResult = projectTransformer.doValidate()
// Make sure we support offload hash expression
val projectBeforeSort = if (projectBeforeSortValidationResult.isValid) {
projectTransformer
} else {
val project = ProjectExec(projectList, newChild)
val project = ProjectExec(projectList, child)
TransformHints.tagNotTransformable(project, projectBeforeSortValidationResult)
project
}
Expand All @@ -380,17 +387,16 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
val validationResult = dropSortColumnTransformer.doValidate()
if (validationResult.isValid) {
ColumnarShuffleExchangeExec(
shuffle,
dropSortColumnTransformer,
dropSortColumnTransformer.output)
val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
TransformHints.tagNotTransformable(shuffle, validationResult)
shuffle.withNewChildren(newChild :: Nil)
shuffle.withNewChildren(child :: Nil)
}
}
}
case _ =>
val newChild = maybeAddAppendBatchesExec(child)
ColumnarShuffleExchangeExec(shuffle, newChild, null)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.utils.{Iterators, VeloxBatchAppender}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

/**
* An operator to coalesce input batches by appending the later batches to the one that comes
* earlier.
*/
case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchSize: Int)
extends GlutenPlan
with UnaryExecNode {

override lazy val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append batches")
)

override def supportsColumnar: Boolean = true
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputRows = longMetric("numOutputRows")
val numOutputBatches = longMetric("numOutputBatches")
val appendTime = longMetric("appendTime")

child.executeColumnar().mapPartitions {
in =>
// Append millis = Out millis - In millis.
val appendMillis = new AtomicLong(0L)

val appender = VeloxBatchAppender.create(
minOutputBatchSize,
Iterators
.wrap(in)
.collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
.create()
.map {
inBatch =>
numInputRows += inBatch.numRows()
numInputBatches += 1
inBatch
}
.asJava
)

val out = Iterators
.wrap(appender.asScala)
.collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
.recyclePayload(_.close())
.recycleIterator {
appender.close()
appendTime += appendMillis.get()
}
.create()
.map {
outBatch =>
numOutputRows += outBatch.numRows()
numOutputBatches += 1
outBatch
}

out
}
}

override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters

class TestOperator extends VeloxWholeStageTransformerSuite {
class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {

protected val rootPath: String = getClass.getResource("/").getPath
override protected val resourcePath: String = "/tpch-data-parquet-velox"
Expand Down Expand Up @@ -703,6 +704,29 @@ class TestOperator extends VeloxWholeStageTransformerSuite {
}
}

test("combine small batches before shuffle") {
val minBatchSize = 15
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle" -> "true",
"spark.gluten.sql.columnar.maxBatchSize" -> "2",
"spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle" -> s"$minBatchSize"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
"where l_orderkey < 100 group by l_orderkey") { _ => }
checkLengthAndPlan(df, 27)
val ops = collect(df.queryExecution.executedPlan) { case p: VeloxAppendBatchesExec => p }
assert(ops.size == 1)
val op = ops.head
assert(op.minOutputBatchSize == minBatchSize)
val metrics = op.metrics
assert(metrics("numInputRows").value == 27)
assert(metrics("numInputBatches").value == 14)
assert(metrics("numOutputRows").value == 27)
assert(metrics("numOutputBatches").value == 2)
}
}

test("test OneRowRelation") {
val df = sql("SELECT 1")
checkAnswer(df, Row(1))
Expand Down
Loading
Loading