Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 7, 2024
1 parent e9b364e commit 626e837
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import java.util.Iterator;

public final class VeloxBatchAppender {
public static ColumnarBatchOutIterator create(int minOutputBatchSize, Iterator<ColumnarBatch> in) {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance();
final NativeMemoryManager nmm = NativeMemoryManagers.contextInstance("VeloxBatchAppender");
long outHandle =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.datasource.ArrowCSVFileFormat
import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, StructField, StructType}

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters

class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -707,14 +709,13 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle" -> "true",
"spark.gluten.sql.columnar.maxBatchSize" -> "2",
"spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle" -> s"$minBatchSize") {
"spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle" -> s"$minBatchSize"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
"where l_orderkey < 100 group by l_orderkey") { _ => }
checkLengthAndPlan(df, 27)
val ops = collect(df.queryExecution.executedPlan) {
case p: VeloxAppendBatchesExec => p
}
val ops = collect(df.queryExecution.executedPlan) { case p: VeloxAppendBatchesExec => p }
assert(ops.size == 1)
val op = ops.head
assert(op.minOutputBatchSize == minBatchSize)
Expand Down

0 comments on commit 626e837

Please sign in to comment.