diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala index 5ca5087d9ef4..2a2671926994 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala @@ -900,12 +900,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla test("combine small batches before shuffle") { val minBatchSize = 15 - val maxBatchSize = 100 withSQLConf( "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true", "spark.gluten.sql.columnar.maxBatchSize" -> "2", - "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" -> - s"$minBatchSize~$maxBatchSize" + "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize" -> + s"$minBatchSize" ) { val df = runQueryAndCompare( "select l_orderkey, sum(l_partkey) as sum from lineitem " + @@ -921,16 +920,10 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla assert(metrics("numOutputRows").value == 27) assert(metrics("numOutputBatches").value == 2) } - } - test("split small batches before shuffle") { - val minBatchSize = 1 - val maxBatchSize = 4 withSQLConf( "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true", - "spark.gluten.sql.columnar.maxBatchSize" -> "100", - "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" -> - s"$minBatchSize~$maxBatchSize" + "spark.gluten.sql.columnar.maxBatchSize" -> "2" ) { val df = runQueryAndCompare( "select l_orderkey, sum(l_partkey) as sum from lineitem " + @@ -939,12 +932,12 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla val ops = collect(df.queryExecution.executedPlan) { case p: VeloxResizeBatchesExec => p } assert(ops.size == 1) val op = ops.head - assert(op.minOutputBatchSize == minBatchSize) + assert(op.minOutputBatchSize == 1) val metrics = op.metrics assert(metrics("numInputRows").value == 27) - assert(metrics("numInputBatches").value == 1) + assert(metrics("numInputBatches").value == 14) assert(metrics("numOutputRows").value == 27) - assert(metrics("numOutputBatches").value == 7) + assert(metrics("numOutputBatches").value == 14) } } diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index f7bc1cb13ee7..b8ea12e944aa 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -39,9 +39,13 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc) add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc) # TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc) add_velox_test( - velox_operators_test SOURCES VeloxColumnarToRowTest.cc - VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc - VeloxColumnarBatchTest.cc) + velox_operators_test + SOURCES + VeloxColumnarToRowTest.cc + VeloxRowToColumnarTest.cc + VeloxColumnarBatchSerializerTest.cc + VeloxColumnarBatchTest.cc + VeloxBatchResizerTest.cc) add_velox_test( velox_plan_conversion_test SOURCES diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc b/cpp/velox/tests/VeloxBatchResizerTest.cc new file mode 100644 index 000000000000..aecd52f927cc --- /dev/null +++ b/cpp/velox/tests/VeloxBatchResizerTest.cc @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "utils/VeloxBatchResizer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; + +namespace gluten { +class ColumnarBatchArray : public ColumnarBatchIterator { + public: + explicit ColumnarBatchArray(const std::vector> batches) + : batches_(std::move(batches)) {} + + std::shared_ptr next() override { + if (cursor_ >= batches_.size()) { + return nullptr; + } + return batches_[cursor_++]; + } + + private: + const std::vector> batches_; + int32_t cursor_ = 0; +}; + +class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + RowVectorPtr newVector(size_t numRows) { + auto constant = makeConstant(1, numRows); + auto out = + std::make_shared(pool(), ROW({INTEGER()}), nullptr, numRows, std::vector{constant}); + return out; + } + + void checkResize(int32_t min, int32_t max, std::vector inSizes, std::vector outSizes) { + auto inBatches = std::vector>(); + for (const auto& size : inSizes) { + inBatches.push_back(std::make_shared(newVector(size))); + } + VeloxBatchResizer resizer(pool(), min, max, std::make_unique(std::move(inBatches))); + auto actualOutSizes = std::vector(); + while (true) { + auto next = resizer.next(); + if (next == nullptr) { + break; + } + actualOutSizes.push_back(next->numRows()); + } + ASSERT_EQ(actualOutSizes, outSizes); + } +}; + +TEST_F(VeloxBatchResizerTest, sanity) { + checkResize(100, std::numeric_limits::max(), {30, 50, 30, 40, 30}, {110, 70}); + checkResize(1, 40, {10, 20, 50, 30, 40, 30}, {10, 20, 40, 10, 30, 40, 30}); + checkResize(1, 39, {10, 20, 50, 30, 40, 30}, {10, 20, 39, 11, 30, 39, 1, 30}); + checkResize(40, 40, {10, 20, 50, 30, 40, 30}, {30, 40, 10, 30, 40, 30}); + checkResize(39, 39, {10, 20, 50, 30, 40, 30}, {30, 39, 11, 30, 39, 1, 30}); + checkResize(100, 200, {5, 900, 50}, {5, 200, 200, 200, 200, 100, 50}); + checkResize(100, 200, {5, 900, 30, 80}, {5, 200, 200, 200, 200, 100, 110}); + checkResize(100, 200, {5, 900, 700}, {5, 200, 200, 200, 200, 100, 200, 200, 200, 100}); + ASSERT_ANY_THROW(checkResize(0, 0, {}, {})); +} +} // namespace gluten diff --git a/cpp/velox/utils/VeloxBatchResizer.cc b/cpp/velox/utils/VeloxBatchResizer.cc index 7b51463068c9..56429299464a 100644 --- a/cpp/velox/utils/VeloxBatchResizer.cc +++ b/cpp/velox/utils/VeloxBatchResizer.cc @@ -23,9 +23,7 @@ namespace { class SliceRowVector : public ColumnarBatchIterator { public: SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in) - : maxOutputBatchSize_(maxOutputBatchSize), in_(in) { - GLUTEN_CHECK(in->size() > maxOutputBatchSize, "Invalid state"); - } + : maxOutputBatchSize_(maxOutputBatchSize), in_(in) {} std::shared_ptr next() override { int32_t remainingLength = in_->size() - cursor_; @@ -55,7 +53,11 @@ gluten::VeloxBatchResizer::VeloxBatchResizer( : pool_(pool), minOutputBatchSize_(minOutputBatchSize), maxOutputBatchSize_(maxOutputBatchSize), - in_(std::move(in)) {} + in_(std::move(in)) { + GLUTEN_CHECK( + minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0, + "Either minOutputBatchSize or maxOutputBatchSize should be larger than 0"); +} std::shared_ptr VeloxBatchResizer::next() { if (next_) { @@ -82,6 +84,11 @@ std::shared_ptr VeloxBatchResizer::next() { for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) { auto nextVb = VeloxColumnarBatch::from(pool_, nextCb); auto nextRv = nextVb->getRowVector(); + if (buffer->size() + nextRv->size() > maxOutputBatchSize_) { + GLUTEN_CHECK(next_ == nullptr, "Invalid state"); + next_ = std::make_unique(maxOutputBatchSize_, nextRv); + return std::make_shared(buffer); + } buffer->append(nextRv.get()); if (buffer->size() >= minOutputBatchSize_) { // Buffer is full. diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index e3f6f1d984ed..147345a87647 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -187,7 +187,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { def columnarShuffleCompressionThreshold: Int = conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD) - // FIXME: Not clear: MIN or MAX ? def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) def shuffleWriterBufferSize: Int = conf @@ -326,12 +325,11 @@ class GlutenConfig(conf: SQLConf) extends Logging { def veloxResizeBatchesShuffleInputRange: ResizeRange = { val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) - val defaultRange: ResizeRange = - ResizeRange((0.25 * standardSize).toInt.max(1), 4 * standardSize) - conf - .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE) - .map(ResizeRange.parse) - .getOrElse(defaultRange) + val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1) + val minSize = conf + .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE) + .getOrElse(defaultMinSize) + ResizeRange(minSize, Int.MaxValue) } def chColumnarShuffleSpillThreshold: Long = { @@ -1479,17 +1477,16 @@ object GlutenConfig { .booleanConf .createWithDefault(true) - val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE = - buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range") + val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE = + buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize") .internal() .doc( - s"The minimum and maximum batch sizes for shuffle. If the batch size is " + - s"smaller / bigger than minimum / maximum value, it will be combined with other " + - s"batches / split before sending to shuffle. Only functions when " + + s"The minimum batch size for shuffle. If size of an input batch is " + + s"smaller than the value, it will be combined with other " + + s"batches before sending to shuffle. Only functions when " + s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} is set to true. " + - s"A valid value for the option is min~max. " + - s"E.g., s.g.s.c.b.v.resizeBatches.shuffleInput.range=100~10000") - .stringConf + s"Default value: 0.25 * ") + .intConf .createOptional val COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD = diff --git a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java new file mode 100644 index 000000000000..85c0912fd7c7 --- /dev/null +++ b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.integration; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +public class SparkJvmOptions { + private static final String MODULE_OPTIONS_CLASS_NAME = "org.apache.spark.launcher.JavaModuleOptions"; + + public static String read() { + try { + final Class clazz = Class.forName("org.apache.spark.launcher.JavaModuleOptions"); + final Method method = clazz.getMethod("defaultModuleOptions"); + return (String) method.invoke(null); + } catch (ClassNotFoundException e) { + // Could happen in Spark 3.2 which doesn't have this class yet. + return ""; + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) { + System.out.println(read()); + } +} diff --git a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java index 6750b90e9e49..d186b5d0b1d6 100644 --- a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java +++ b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java @@ -332,6 +332,7 @@ public String getSparkMasterUrl() { @Override public Map extraSparkConf() { final Map extras = new HashMap<>(); + extras.put(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS, "-Dio.netty.tryReflectionSetAccessible=true"); extras.put(SparkLauncher.EXECUTOR_CORES, String.valueOf(resourceEnumeration.lcExecutorCores())); extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm", resourceEnumeration.lcExecutorHeapMem())); extras.put("spark.memory.offHeap.enabled", "true"); diff --git a/tools/gluten-it/sbin/gluten-it.sh b/tools/gluten-it/sbin/gluten-it.sh index 00ff78e34997..8c1a6413b5ec 100755 --- a/tools/gluten-it/sbin/gluten-it.sh +++ b/tools/gluten-it/sbin/gluten-it.sh @@ -16,8 +16,6 @@ set -euf -GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G -XX:ErrorFile=/var/log/java/hs_err_pid%p.log"} - BASEDIR=$(dirname $0) LIB_DIR=$BASEDIR/../package/target/lib @@ -28,32 +26,25 @@ fi JAR_PATH=$LIB_DIR/* +SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp $JAR_PATH org.apache.gluten.integration.SparkJvmOptions) + EMBEDDED_SPARK_HOME=$BASEDIR/../spark-home +# We temporarily disallow setting these two variables by caller. +SPARK_HOME="" +SPARK_SCALA_VERSION="" export SPARK_HOME=${SPARK_HOME:-$EMBEDDED_SPARK_HOME} export SPARK_SCALA_VERSION=${SPARK_SCALA_VERSION:-'2.12'} echo "SPARK_HOME set at [$SPARK_HOME]." echo "SPARK_SCALA_VERSION set at [$SPARK_SCALA_VERSION]." -$JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \ - -XX:+IgnoreUnrecognizedVMOptions \ - --add-opens=java.base/java.lang=ALL-UNNAMED \ - --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \ - --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ - --add-opens=java.base/java.io=ALL-UNNAMED \ - --add-opens=java.base/java.net=ALL-UNNAMED \ - --add-opens=java.base/java.nio=ALL-UNNAMED \ - --add-opens=java.base/java.util=ALL-UNNAMED \ - --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ - --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \ - --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \ - --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \ - --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ - --add-opens=java.base/sun.nio.cs=ALL-UNNAMED \ - --add-opens=java.base/sun.security.action=ALL-UNNAMED \ - --add-opens=java.base/sun.util.calendar=ALL-UNNAMED \ - -Djdk.reflect.useDirectMethodHandle=false \ +GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G"} + +$JAVA_HOME/bin/java \ + $SPARK_JVM_OPTIONS \ + $GLUTEN_IT_JVM_ARGS \ + -XX:ErrorFile=/var/log/java/hs_err_pid%p.log \ -Dio.netty.tryReflectionSetAccessible=true \ -cp $JAR_PATH \ org.apache.gluten.integration.Cli $@