Skip to content

Commit

Permalink
[VL] Minor follow-ups for PRs (#6693)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Aug 6, 2024
1 parent d746f0f commit 0625a75
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -900,12 +900,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla

test("combine small batches before shuffle") {
val minBatchSize = 15
val maxBatchSize = 100
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true",
"spark.gluten.sql.columnar.maxBatchSize" -> "2",
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
s"$minBatchSize~$maxBatchSize"
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize" ->
s"$minBatchSize"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
Expand All @@ -921,16 +920,10 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
assert(metrics("numOutputRows").value == 27)
assert(metrics("numOutputBatches").value == 2)
}
}

test("split small batches before shuffle") {
val minBatchSize = 1
val maxBatchSize = 4
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true",
"spark.gluten.sql.columnar.maxBatchSize" -> "100",
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
s"$minBatchSize~$maxBatchSize"
"spark.gluten.sql.columnar.maxBatchSize" -> "2"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
Expand All @@ -939,12 +932,12 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
val ops = collect(df.queryExecution.executedPlan) { case p: VeloxResizeBatchesExec => p }
assert(ops.size == 1)
val op = ops.head
assert(op.minOutputBatchSize == minBatchSize)
assert(op.minOutputBatchSize == 1)
val metrics = op.metrics
assert(metrics("numInputRows").value == 27)
assert(metrics("numInputBatches").value == 1)
assert(metrics("numInputBatches").value == 14)
assert(metrics("numOutputRows").value == 27)
assert(metrics("numOutputBatches").value == 7)
assert(metrics("numOutputBatches").value == 14)
}
}

Expand Down
10 changes: 7 additions & 3 deletions cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc)
add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
# TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
add_velox_test(
velox_operators_test SOURCES VeloxColumnarToRowTest.cc
VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc
VeloxColumnarBatchTest.cc)
velox_operators_test
SOURCES
VeloxColumnarToRowTest.cc
VeloxRowToColumnarTest.cc
VeloxColumnarBatchSerializerTest.cc
VeloxColumnarBatchTest.cc
VeloxBatchResizerTest.cc)
add_velox_test(
velox_plan_conversion_test
SOURCES
Expand Down
85 changes: 85 additions & 0 deletions cpp/velox/tests/VeloxBatchResizerTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <limits>

#include "utils/VeloxBatchResizer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox;

namespace gluten {
class ColumnarBatchArray : public ColumnarBatchIterator {
public:
explicit ColumnarBatchArray(const std::vector<std::shared_ptr<ColumnarBatch>> batches)
: batches_(std::move(batches)) {}

std::shared_ptr<ColumnarBatch> next() override {
if (cursor_ >= batches_.size()) {
return nullptr;
}
return batches_[cursor_++];
}

private:
const std::vector<std::shared_ptr<ColumnarBatch>> batches_;
int32_t cursor_ = 0;
};

class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
}

RowVectorPtr newVector(size_t numRows) {
auto constant = makeConstant(1, numRows);
auto out =
std::make_shared<RowVector>(pool(), ROW({INTEGER()}), nullptr, numRows, std::vector<VectorPtr>{constant});
return out;
}

void checkResize(int32_t min, int32_t max, std::vector<int32_t> inSizes, std::vector<int32_t> outSizes) {
auto inBatches = std::vector<std::shared_ptr<ColumnarBatch>>();
for (const auto& size : inSizes) {
inBatches.push_back(std::make_shared<VeloxColumnarBatch>(newVector(size)));
}
VeloxBatchResizer resizer(pool(), min, max, std::make_unique<ColumnarBatchArray>(std::move(inBatches)));
auto actualOutSizes = std::vector<int32_t>();
while (true) {
auto next = resizer.next();
if (next == nullptr) {
break;
}
actualOutSizes.push_back(next->numRows());
}
ASSERT_EQ(actualOutSizes, outSizes);
}
};

TEST_F(VeloxBatchResizerTest, sanity) {
checkResize(100, std::numeric_limits<int32_t>::max(), {30, 50, 30, 40, 30}, {110, 70});
checkResize(1, 40, {10, 20, 50, 30, 40, 30}, {10, 20, 40, 10, 30, 40, 30});
checkResize(1, 39, {10, 20, 50, 30, 40, 30}, {10, 20, 39, 11, 30, 39, 1, 30});
checkResize(40, 40, {10, 20, 50, 30, 40, 30}, {30, 40, 10, 30, 40, 30});
checkResize(39, 39, {10, 20, 50, 30, 40, 30}, {30, 39, 11, 30, 39, 1, 30});
checkResize(100, 200, {5, 900, 50}, {5, 200, 200, 200, 200, 100, 50});
checkResize(100, 200, {5, 900, 30, 80}, {5, 200, 200, 200, 200, 100, 110});
checkResize(100, 200, {5, 900, 700}, {5, 200, 200, 200, 200, 100, 200, 200, 200, 100});
ASSERT_ANY_THROW(checkResize(0, 0, {}, {}));
}
} // namespace gluten
15 changes: 11 additions & 4 deletions cpp/velox/utils/VeloxBatchResizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ namespace {
class SliceRowVector : public ColumnarBatchIterator {
public:
SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in)
: maxOutputBatchSize_(maxOutputBatchSize), in_(in) {
GLUTEN_CHECK(in->size() > maxOutputBatchSize, "Invalid state");
}
: maxOutputBatchSize_(maxOutputBatchSize), in_(in) {}

std::shared_ptr<ColumnarBatch> next() override {
int32_t remainingLength = in_->size() - cursor_;
Expand Down Expand Up @@ -55,7 +53,11 @@ gluten::VeloxBatchResizer::VeloxBatchResizer(
: pool_(pool),
minOutputBatchSize_(minOutputBatchSize),
maxOutputBatchSize_(maxOutputBatchSize),
in_(std::move(in)) {}
in_(std::move(in)) {
GLUTEN_CHECK(
minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0,
"Either minOutputBatchSize or maxOutputBatchSize should be larger than 0");
}

std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
if (next_) {
Expand All @@ -82,6 +84,11 @@ std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
auto nextVb = VeloxColumnarBatch::from(pool_, nextCb);
auto nextRv = nextVb->getRowVector();
if (buffer->size() + nextRv->size() > maxOutputBatchSize_) {
GLUTEN_CHECK(next_ == nullptr, "Invalid state");
next_ = std::make_unique<SliceRowVector>(maxOutputBatchSize_, nextRv);
return std::make_shared<VeloxColumnarBatch>(buffer);
}
buffer->append(nextRv.get());
if (buffer->size() >= minOutputBatchSize_) {
// Buffer is full.
Expand Down
27 changes: 12 additions & 15 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def columnarShuffleCompressionThreshold: Int =
conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)

// FIXME: Not clear: MIN or MAX ?
def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)

def columnarToRowMemThreshold: Long =
Expand Down Expand Up @@ -329,12 +328,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def veloxResizeBatchesShuffleInputRange: ResizeRange = {
val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
val defaultRange: ResizeRange =
ResizeRange((0.25 * standardSize).toInt.max(1), 4 * standardSize)
conf
.getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE)
.map(ResizeRange.parse)
.getOrElse(defaultRange)
val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
val minSize = conf
.getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
.getOrElse(defaultMinSize)
ResizeRange(minSize, Int.MaxValue)
}

def chColumnarShuffleSpillThreshold: Long = {
Expand Down Expand Up @@ -1492,17 +1490,16 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range")
val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.internal()
.doc(
s"The minimum and maximum batch sizes for shuffle. If the batch size is " +
s"smaller / bigger than minimum / maximum value, it will be combined with other " +
s"batches / split before sending to shuffle. Only functions when " +
s"The minimum batch size for shuffle. If size of an input batch is " +
s"smaller than the value, it will be combined with other " +
s"batches before sending to shuffle. Only functions when " +
s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} is set to true. " +
s"A valid value for the option is min~max. " +
s"E.g., s.g.s.c.b.v.resizeBatches.shuffleInput.range=100~10000")
.stringConf
s"Default value: 0.25 * <max batch size>")
.intConf
.createOptional

val COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.integration;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class SparkJvmOptions {
private static final String MODULE_OPTIONS_CLASS_NAME = "org.apache.spark.launcher.JavaModuleOptions";

public static String read() {
try {
final Class<?> clazz = Class.forName("org.apache.spark.launcher.JavaModuleOptions");
final Method method = clazz.getMethod("defaultModuleOptions");
return (String) method.invoke(null);
} catch (ClassNotFoundException e) {
// Could happen in Spark 3.2 which doesn't have this class yet.
return "";
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

public static void main(String[] args) {
System.out.println(read());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public String getSparkMasterUrl() {
@Override
public Map<String, String> extraSparkConf() {
final Map<String, String> extras = new HashMap<>();
extras.put(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS, "-Dio.netty.tryReflectionSetAccessible=true");
extras.put(SparkLauncher.EXECUTOR_CORES, String.valueOf(resourceEnumeration.lcExecutorCores()));
extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm", resourceEnumeration.lcExecutorHeapMem()));
extras.put("spark.memory.offHeap.enabled", "true");
Expand Down
31 changes: 11 additions & 20 deletions tools/gluten-it/sbin/gluten-it.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

set -euf

GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G -XX:ErrorFile=/var/log/java/hs_err_pid%p.log"}

BASEDIR=$(dirname $0)

LIB_DIR=$BASEDIR/../package/target/lib
Expand All @@ -28,32 +26,25 @@ fi

JAR_PATH=$LIB_DIR/*

SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp $JAR_PATH org.apache.gluten.integration.SparkJvmOptions)

EMBEDDED_SPARK_HOME=$BASEDIR/../spark-home

# We temporarily disallow setting these two variables by caller.
SPARK_HOME=""
SPARK_SCALA_VERSION=""
export SPARK_HOME=${SPARK_HOME:-$EMBEDDED_SPARK_HOME}
export SPARK_SCALA_VERSION=${SPARK_SCALA_VERSION:-'2.12'}

echo "SPARK_HOME set at [$SPARK_HOME]."
echo "SPARK_SCALA_VERSION set at [$SPARK_SCALA_VERSION]."

$JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \
-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G"}

$JAVA_HOME/bin/java \
$SPARK_JVM_OPTIONS \
$GLUTEN_IT_JVM_ARGS \
-XX:ErrorFile=/var/log/java/hs_err_pid%p.log \
-Dio.netty.tryReflectionSetAccessible=true \
-cp $JAR_PATH \
org.apache.gluten.integration.Cli $@

0 comments on commit 0625a75

Please sign in to comment.