Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 5, 2024
1 parent d9a523c commit 1ce002c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -900,12 +900,10 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla

test("combine small batches before shuffle") {
val minBatchSize = 15
val maxBatchSize = 100
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "true",
"spark.gluten.sql.columnar.maxBatchSize" -> "2",
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
s"$minBatchSize~$maxBatchSize"
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.min" -> s"$minBatchSize"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
add_velox_test(
velox_operators_test SOURCES VeloxColumnarToRowTest.cc
VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc
VeloxColumnarBatchTest.cc)
VeloxColumnarBatchTest.cc VeloxBatchResizerTest.cc)
add_velox_test(
velox_plan_conversion_test
SOURCES
Expand Down
85 changes: 85 additions & 0 deletions cpp/velox/tests/VeloxBatchResizerTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <limits>

#include "utils/VeloxBatchResizer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox;

namespace gluten {
class ColumnarBatchArray : public ColumnarBatchIterator {
public:
explicit ColumnarBatchArray(const std::vector<std::shared_ptr<ColumnarBatch>> batches)
: batches_(std::move(batches)) {}

std::shared_ptr<ColumnarBatch> next() override {
if (cursor_ >= batches_.size()) {
return nullptr;
}
return batches_[cursor_++];
}

private:
const std::vector<std::shared_ptr<ColumnarBatch>> batches_;
int32_t cursor_ = 0;
};

class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
}

RowVectorPtr newVector(size_t numRows) {
auto constant = makeConstant(1, numRows);
auto out =
std::make_shared<RowVector>(pool(), ROW({INTEGER()}), nullptr, numRows, std::vector<VectorPtr>{constant});
return out;
}

void checkResize(int32_t min, int32_t max, std::vector<int32_t> inSizes, std::vector<int32_t> outSizes) {
auto inBatches = std::vector<std::shared_ptr<ColumnarBatch>>();
for (const auto& size : inSizes) {
inBatches.push_back(std::make_shared<VeloxColumnarBatch>(newVector(size)));
}
VeloxBatchResizer resizer(pool(), min, max, std::make_unique<ColumnarBatchArray>(std::move(inBatches)));
auto actualOutSizes = std::vector<int32_t>();
while (true) {
auto next = resizer.next();
if (next == nullptr) {
break;
}
actualOutSizes.push_back(next->numRows());
}
ASSERT_EQ(actualOutSizes, outSizes);
}
};

TEST_F(VeloxBatchResizerTest, sanity) {
checkResize(100, std::numeric_limits<int32_t>::max(), {30, 50, 30, 40, 30}, {110, 70});
checkResize(1, 40, {10, 20, 50, 30, 40, 30}, {10, 20, 40, 10, 30, 40, 30});
checkResize(1, 39, {10, 20, 50, 30, 40, 30}, {10, 20, 39, 11, 30, 39, 1, 30});
checkResize(40, 40, {10, 20, 50, 30, 40, 30}, {30, 40, 10, 30, 40, 30});
checkResize(39, 39, {10, 20, 50, 30, 40, 30}, {30, 39, 11, 30, 39, 1, 30});
checkResize(100, 200, {5, 900, 50}, {5, 200, 200, 200, 200, 100, 50});
checkResize(100, 200, {5, 900, 30, 80}, {5, 200, 200, 200, 200, 100, 110});
checkResize(100, 200, {5, 900, 700}, {5, 200, 200, 200, 200, 100, 200, 200, 200, 100});
ASSERT_ANY_THROW(checkResize(0, 0, {}, {}));
}
} // namespace gluten
15 changes: 11 additions & 4 deletions cpp/velox/utils/VeloxBatchResizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ namespace {
class SliceRowVector : public ColumnarBatchIterator {
public:
SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in)
: maxOutputBatchSize_(maxOutputBatchSize), in_(in) {
GLUTEN_CHECK(in->size() > maxOutputBatchSize, "Invalid state");
}
: maxOutputBatchSize_(maxOutputBatchSize), in_(in) {}

std::shared_ptr<ColumnarBatch> next() override {
int32_t remainingLength = in_->size() - cursor_;
Expand Down Expand Up @@ -55,7 +53,11 @@ gluten::VeloxBatchResizer::VeloxBatchResizer(
: pool_(pool),
minOutputBatchSize_(minOutputBatchSize),
maxOutputBatchSize_(maxOutputBatchSize),
in_(std::move(in)) {}
in_(std::move(in)) {
GLUTEN_CHECK(
minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0,
"Either minOutputBatchSize or maxOutputBatchSize should be larger than 0");
}

std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
if (next_) {
Expand All @@ -82,6 +84,11 @@ std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
auto nextVb = VeloxColumnarBatch::from(pool_, nextCb);
auto nextRv = nextVb->getRowVector();
if (buffer->size() + nextRv->size() > maxOutputBatchSize_) {
GLUTEN_CHECK(next_ == nullptr, "Invalid state");
next_ = std::make_unique<SliceRowVector>(maxOutputBatchSize_, nextRv);
return std::make_shared<VeloxColumnarBatch>(buffer);
}
buffer->append(nextRv.get());
if (buffer->size() >= minOutputBatchSize_) {
// Buffer is full.
Expand Down
14 changes: 6 additions & 8 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def columnarShuffleCompressionThreshold: Int =
conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)

// FIXME: Not clear: MIN or MAX ?
def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)

def shuffleWriterBufferSize: Int = conf
Expand Down Expand Up @@ -327,7 +326,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxResizeBatchesShuffleInputRange: ResizeRange = {
val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
val defaultRange: ResizeRange =
ResizeRange((0.25 * standardSize).toInt.max(1), (1.5 * standardSize).toInt.max(1))
ResizeRange((0.25 * standardSize).toInt.max(1), Int.MaxValue)
conf
.getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE)
.map(ResizeRange.parse)
Expand Down Expand Up @@ -1480,15 +1479,14 @@ object GlutenConfig {
.createWithDefault(true)

val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range")
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.internal()
.doc(
s"The minimum and maximum batch sizes for shuffle. If the batch size is " +
s"smaller / bigger than minimum / maximum value, it will be combined with other " +
s"batches / split before sending to shuffle. Only functions when " +
s"The minimum batch size for shuffle. If size of an input batch is " +
s"smaller than the value, it will be combined with other " +
s"batches before sending to shuffle. Only functions when " +
s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} is set to true. " +
s"A valid value for the option is min~max. " +
s"E.g., s.g.s.c.b.v.resizeBatches.shuffleInput.range=100~10000")
s"Default value: 0.25 * <max batch size>")
.stringConf
.createOptional

Expand Down

0 comments on commit 1ce002c

Please sign in to comment.