Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Allow specifying maximum batch size for batch resizing #6670

Merged
merged 6 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

import java.util.Iterator;

public final class VeloxBatchAppender {
public final class VeloxBatchResizer {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance("VeloxBatchAppender");
int minOutputBatchSize, int maxOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance("VeloxBatchResizer");
long outHandle =
VeloxBatchAppenderJniWrapper.create(runtime)
.create(minOutputBatchSize, new ColumnarBatchInIterator(in));
VeloxBatchResizerJniWrapper.create(runtime)
.create(minOutputBatchSize, maxOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
import org.apache.gluten.exec.RuntimeAware;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;

public class VeloxBatchAppenderJniWrapper implements RuntimeAware {
public class VeloxBatchResizerJniWrapper implements RuntimeAware {
private final Runtime runtime;

private VeloxBatchAppenderJniWrapper(Runtime runtime) {
private VeloxBatchResizerJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxBatchAppenderJniWrapper create(Runtime runtime) {
return new VeloxBatchAppenderJniWrapper(runtime);
public static VeloxBatchResizerJniWrapper create(Runtime runtime) {
return new VeloxBatchResizerJniWrapper(runtime);
}

@Override
public long handle() {
return runtime.getHandle();
}

public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
public native long create(
int minOutputBatchSize, int maxOutputBatchSize, ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
plan match {
case shuffle: ColumnarShuffleExchangeExec
if !shuffle.useSortBasedShuffle &&
GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle =>
GlutenConfig.getConf.veloxResizeBatchesShuffleInput =>
val range = GlutenConfig.getConf.veloxResizeBatchesShuffleInputRange
val appendBatches =
VeloxAppendBatchesExec(shuffle.child, GlutenConfig.getConf.veloxMinBatchSizeForShuffle)
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
shuffle.withNewChildren(Seq(appendBatches))
case _ => plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.utils.VeloxBatchAppender
import org.apache.gluten.utils.VeloxBatchResizer
import org.apache.gluten.utils.iterator.Iterators

import org.apache.spark.rdd.RDD
Expand All @@ -33,10 +33,13 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._

/**
* An operator to coalesce input batches by appending the later batches to the one that comes
* earlier.
* An operator to resize input batches by appending the later batches to the one that comes earlier,
* or splitting one batch to smaller ones.
*/
case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchSize: Int)
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
minOutputBatchSize: Int,
maxOutputBatchSize: Int)
extends GlutenPlan
with UnaryExecNode {

Expand All @@ -45,7 +48,7 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"appendTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append batches")
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append / split batches")
)

override def supportsColumnar: Boolean = true
Expand All @@ -56,15 +59,15 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS
val numInputBatches = longMetric("numInputBatches")
val numOutputRows = longMetric("numOutputRows")
val numOutputBatches = longMetric("numOutputBatches")
val appendTime = longMetric("appendTime")
val selfTime = longMetric("selfTime")

child.executeColumnar().mapPartitions {
in =>
// Append millis = Out millis - In millis.
val appendMillis = new AtomicLong(0L)

val appender = VeloxBatchAppender.create(
val appender = VeloxBatchResizer.create(
minOutputBatchSize,
maxOutputBatchSize,
Iterators
.wrap(in)
.collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
Expand All @@ -84,7 +87,7 @@ case class VeloxAppendBatchesExec(override val child: SparkPlan, minOutputBatchS
.recyclePayload(_.close())
.recycleIterator {
appender.close()
appendTime += appendMillis.get()
selfTime += appendMillis.get()
}
.create()
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class FlushableHashAggregateRule(session: SparkSession) extends Rule[SparkP

private def canPropagate(plan: SparkPlan): Boolean = plan match {
case _: ProjectExecTransformer => true
case _: VeloxAppendBatchesExec => true
case _: VeloxResizeBatchesExec => true
case _ => false
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading