Skip to content

Commit

Permalink
[VL] Allow specifying maximum batch size for batch resizing (#6670)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Aug 2, 2024
1 parent 3fbf488 commit 94b79c7
Show file tree
Hide file tree
Showing 266 changed files with 3,287 additions and 3,175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

import java.util.Iterator;

public final class VeloxBatchAppender {
public final class VeloxBatchResizer {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance("VeloxBatchAppender");
int minOutputBatchSize, int maxOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance("VeloxBatchResizer");
long outHandle =
VeloxBatchAppenderJniWrapper.create(runtime)
.create(minOutputBatchSize, new ColumnarBatchInIterator(in));
VeloxBatchResizerJniWrapper.create(runtime)
.create(minOutputBatchSize, maxOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
import org.apache.gluten.exec.RuntimeAware;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;

public class VeloxBatchAppenderJniWrapper implements RuntimeAware {
public class VeloxBatchResizerJniWrapper implements RuntimeAware {
private final Runtime runtime;

private VeloxBatchAppenderJniWrapper(Runtime runtime) {
private VeloxBatchResizerJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxBatchAppenderJniWrapper create(Runtime runtime) {
return new VeloxBatchAppenderJniWrapper(runtime);
public static VeloxBatchResizerJniWrapper create(Runtime runtime) {
return new VeloxBatchResizerJniWrapper(runtime);
}

@Override
public long handle() {
return runtime.getHandle();
}

public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
public native long create(
int minOutputBatchSize, int maxOutputBatchSize, ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
plan match {
case shuffle: ColumnarShuffleExchangeExec
if !shuffle.useSortBasedShuffle &&
GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle =>
GlutenConfig.getConf.veloxResizeBatchesShuffleInput =>
val range = GlutenConfig.getConf.veloxResizeBatchesShuffleInputRange
val appendBatches =
VeloxAppendBatchesExec(shuffle.child, GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
shuffle.withNewChildren(Seq(appendBatches))
case _ => plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.utils.VeloxBatchAppender
import org.apache.gluten.utils.VeloxBatchResizer
import org.apache.gluten.utils.iterator.Iterators

import org.apache.spark.rdd.RDD
Expand All @@ -33,10 +33,13 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._

/**
* An operator to coalesce input batches by appending the later batches to the one that comes
* earlier.
* An operator to resize input batches by appending the later batches to the one that comes earlier,
* or splitting one batch to smaller ones.
*/
case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchSize: Int)
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
minOutputBatchSize: Int,
maxOutputBatchSize: Int)
extends GlutenPlan
with UnaryExecNode {

Expand All @@ -45,7 +48,7 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append batches")
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append / split batches")
)

override def supportsColumnar: Boolean = true
Expand All @@ -56,15 +59,15 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS
val numInputBatches = longMetric("numInputBatches")
val numOutputRows = longMetric("numOutputRows")
val numOutputBatches = longMetric("numOutputBatches")
val appendTime = longMetric("appendTime")
val selfTime = longMetric("selfTime")

child.executeColumnar().mapPartitions {
in =>
// Append millis = Out millis - In millis.
val appendMillis = new AtomicLong(0L)

val appender = VeloxBatchAppender.create(
val appender = VeloxBatchResizer.create(
minOutputBatchSize,
maxOutputBatchSize,
Iterators
.wrap(in)
.collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
Expand All @@ -84,7 +87,7 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS
.recyclePayload(_.close())
.recycleIterator {
appender.close()
appendTime += appendMillis.get()
selfTime += appendMillis.get()
}
.create()
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class FlushableHashAggregateRule(session: SparkSession) extends Rule[SparkP

private def canPropagate(plan: SparkPlan): Boolean = plan match {
case _: ProjectExecTransformer => true
case _: VeloxAppendBatchesExec => true
case _: VeloxResizeBatchesExec => true
case _ => false
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 94b79c7

Please sign in to comment.