Skip to content

Commit

Permalink
[VL] Fix shuffle spill triggered by evicting buffers during stop (#6422)
Browse files Browse the repository at this point in the history
In VeloxHashBasedShuffleWriter::stop, evict partition buffers can allocating extra memory and triggers spill. In this case the target partition buffer should not get shrunk.

Added UT for this case.
  • Loading branch information
marin-ma authored Jul 12, 2024
1 parent 997c6e3 commit e95f32a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 48 deletions.
16 changes: 10 additions & 6 deletions cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}

arrow::Status VeloxHashBasedShuffleWriter::stop() {
setSplitState(SplitState::kStopEvict);
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
PartitionBufferGuard guard(partitionBufferInUse_, pid);
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
}
}
Expand Down Expand Up @@ -970,10 +972,6 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxHashBasedShuffle
bool reuseBuffers) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]);

if (partitionBufferBase_[partitionId] == 0) {
return std::vector<std::shared_ptr<arrow::Buffer>>{};
}

auto numRows = partitionBufferBase_[partitionId];
auto fixedWidthIdx = 0;
auto binaryIdx = 0;
Expand Down Expand Up @@ -1321,6 +1319,9 @@ arrow::Result<int64_t> VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
// Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_)
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (partitionBufferInUse_.has_value() && *partitionBufferInUse_ == pid) {
continue;
}
if (partitionBufferSize_[pid] > 0 && partitionBufferSize_[pid] > partitionBufferBase_[pid]) {
pidToSize.emplace_back(pid, partitionBufferSize_[pid] - partitionBufferBase_[pid]);
}
Expand Down Expand Up @@ -1348,6 +1349,7 @@ arrow::Result<int64_t> VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
// Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from
// shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_
VELOX_CHECK(!partitionBufferInUse_);
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
int64_t evicted = 0;
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
Expand Down Expand Up @@ -1375,10 +1377,12 @@ arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize
bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
// If OOM happens during SplitState::kSplit, it is triggered by binary buffers resize.
// Or during SplitState::kInit, it is triggered by other operators.
// Or during SplitState::kStopEvict, it is triggered by assembleBuffers allocating extra memory. In this case we use
// PartitionBufferGuard to prevent the target partition from being shrunk.
// The reclaim order is spill->shrink, because the partition buffers can be reused.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning != Partitioning::kSingle &&
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit || splitState_ == SplitState::kStopEvict);
}

bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const {
Expand All @@ -1391,7 +1395,7 @@ arrow::Result<uint32_t> VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShr
if (splitState_ == SplitState::kSplit) {
return partitionBufferBase_[partitionId] + partition2RowCount_[partitionId];
}
if (splitState_ == kInit || splitState_ == SplitState::kStop) {
if (splitState_ == kInit || splitState_ == SplitState::kStopEvict) {
return partitionBufferBase_[partitionId];
}
return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_));
Expand Down
19 changes: 18 additions & 1 deletion cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ namespace gluten {

#endif // end of VELOX_SHUFFLE_WRITER_PRINT

enum SplitState { kInit, kPreAlloc, kSplit, kStop };
enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop };

struct BinaryArrayResizeState {
bool inResize;
Expand Down Expand Up @@ -303,6 +303,21 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {

arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit);

class PartitionBufferGuard {
public:
PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t partitionId)
: partitionBufferInUse_(partitionInUse) {
partitionBufferInUse_ = partitionId;
}

~PartitionBufferGuard() {
partitionBufferInUse_ = std::nullopt;
}

private:
std::optional<uint32_t>& partitionBufferInUse_;
};

BinaryArrayResizeState binaryArrayResizeState_{};

bool hasComplexType_ = false;
Expand Down Expand Up @@ -401,6 +416,8 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {
facebook::velox::serializer::presto::PrestoVectorSerde serde_;

SplitState splitState_{kInit};

std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
}; // class VeloxHashBasedShuffleWriter

} // namespace gluten
66 changes: 25 additions & 41 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,50 +207,13 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
std::vector<VectorPtr> children = {
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
makeRowVector({
makeFlatVector<int32_t>({1, 3}),
makeNullableFlatVector<velox::StringView>({std::nullopt, "de"}),
}),
makeNullableFlatVector<StringView>({std::nullopt, "10 I'm not inline string"}),
makeArrayVector<int64_t>({
{1, 2, 3, 4, 5},
{1, 2, 3},
}),
makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, "str4000"}}}),
};
auto dataVector = makeRowVector(children);
auto children = childrenComplex_;
children.insert((children.begin()), makeFlatVector<int32_t>({1, 2}));
auto vector = makeRowVector(children);
auto firstBlock = takeRows({inputVectorComplex_}, {{1}});
auto secondBlock = takeRows({inputVectorComplex_}, {{0}});

auto firstBlock = makeRowVector({
makeConstant<int32_t>(1, 1),
makeRowVector({
makeConstant<int32_t>(3, 1),
makeFlatVector<velox::StringView>({"de"}),
}),
makeFlatVector<StringView>({"10 I'm not inline string"}),
makeArrayVector<int64_t>({
{1, 2, 3},
}),
makeMapVector<int32_t, StringView>({{{3, "str3000"}, {4, "str4000"}}}),
});

auto secondBlock = makeRowVector({
makeNullConstant(TypeKind::INTEGER, 1),
makeRowVector({
makeConstant<int32_t>(1, 1),
makeNullableFlatVector<velox::StringView>({std::nullopt}),
}),
makeNullableFlatVector<StringView>({std::nullopt}),
makeArrayVector<int64_t>({
{1, 2, 3, 4, 5},
}),
makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}}),
});

testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataVector->type(), {{firstBlock}, {secondBlock}});
testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, inputVectorComplex_->type(), {{firstBlock}, {secondBlock}});
}

TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
Expand Down Expand Up @@ -631,6 +594,27 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}

TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
auto shuffleWriter = createShuffleWriter(&pool);

pool.setEvictable(shuffleWriter.get());
for (int i = 0; i < 3; ++i) {
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
}

// Reclaim from PartitionWriter to free cached bytes.
auto payloadSize = shuffleWriter->cachedPayloadSize();
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
ASSERT_EQ(evicted, payloadSize);

// When evicting partitioning buffers in stop, spill will be triggered by complex types allocating extra memory.
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); }));
}

TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
Expand Down
17 changes: 17 additions & 0 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,27 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048, std::nullopt)),
};
childrenComplex_ = {
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
makeRowVector({
makeFlatVector<int32_t>({1, 3}),
makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "de"}),
}),
makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10 I'm not inline string"}),
makeArrayVector<int64_t>({
{1, 2, 3, 4, 5},
{1, 2, 3},
}),
makeMapVector<int32_t, facebook::velox::StringView>(
{{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, "str4000"}}}),
};

inputVector1_ = makeRowVector(children1_);
inputVector2_ = makeRowVector(children2_);
inputVectorNoNull_ = makeRowVector(childrenNoNull_);
inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
inputVectorComplex_ = makeRowVector(childrenComplex_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
Expand Down Expand Up @@ -217,6 +232,7 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
std::vector<facebook::velox::VectorPtr> childrenNoNull_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
std::vector<facebook::velox::VectorPtr> childrenComplex_;

facebook::velox::RowVectorPtr inputVector1_;
facebook::velox::RowVectorPtr inputVector2_;
Expand All @@ -225,6 +241,7 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
std::string largeString2_;
facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
facebook::velox::RowVectorPtr inputVectorComplex_;
};

class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
Expand Down

0 comments on commit e95f32a

Please sign in to comment.