Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Fix shuffle spill triggered by evicting buffers during stop #6422

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}

arrow::Status VeloxHashBasedShuffleWriter::stop() {
setSplitState(SplitState::kStopEvict);
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
PartitionBufferGuard guard(partitionBufferInUse_, pid);
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
}
}
Expand Down Expand Up @@ -970,10 +972,6 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxHashBasedShuffle
bool reuseBuffers) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]);

if (partitionBufferBase_[partitionId] == 0) {
return std::vector<std::shared_ptr<arrow::Buffer>>{};
}

auto numRows = partitionBufferBase_[partitionId];
auto fixedWidthIdx = 0;
auto binaryIdx = 0;
Expand Down Expand Up @@ -1321,6 +1319,9 @@ arrow::Result<int64_t> VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
// Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_)
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (partitionBufferInUse_.has_value() && *partitionBufferInUse_ == pid) {
continue;
}
if (partitionBufferSize_[pid] > 0 && partitionBufferSize_[pid] > partitionBufferBase_[pid]) {
pidToSize.emplace_back(pid, partitionBufferSize_[pid] - partitionBufferBase_[pid]);
}
Expand Down Expand Up @@ -1348,6 +1349,7 @@ arrow::Result<int64_t> VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
// Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from
// shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_
VELOX_CHECK(!partitionBufferInUse_);
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
int64_t evicted = 0;
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
Expand Down Expand Up @@ -1375,10 +1377,12 @@ arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize
bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
// If OOM happens during SplitState::kSplit, it is triggered by binary buffers resize.
// Or during SplitState::kInit, it is triggered by other operators.
// Or during SplitState::kStopEvict, it is triggered by assembleBuffers allocating extra memory. In this case we use
// PartitionBufferGuard to prevent the target partition from being shrunk.
// The reclaim order is spill->shrink, because the partition buffers can be reused.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning != Partitioning::kSingle &&
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit || splitState_ == SplitState::kStopEvict);
}

bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const {
Expand All @@ -1391,7 +1395,7 @@ arrow::Result<uint32_t> VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShr
if (splitState_ == SplitState::kSplit) {
return partitionBufferBase_[partitionId] + partition2RowCount_[partitionId];
}
if (splitState_ == kInit || splitState_ == SplitState::kStop) {
if (splitState_ == kInit || splitState_ == SplitState::kStopEvict) {
return partitionBufferBase_[partitionId];
}
return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_));
Expand Down
19 changes: 18 additions & 1 deletion cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ namespace gluten {

#endif // end of VELOX_SHUFFLE_WRITER_PRINT

enum SplitState { kInit, kPreAlloc, kSplit, kStop };
enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop };

struct BinaryArrayResizeState {
bool inResize;
Expand Down Expand Up @@ -303,6 +303,21 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {

arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit);

class PartitionBufferGuard {
public:
PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t partitionId)
: partitionBufferInUse_(partitionInUse) {
partitionBufferInUse_ = partitionId;
}

~PartitionBufferGuard() {
partitionBufferInUse_ = std::nullopt;
}

private:
std::optional<uint32_t>& partitionBufferInUse_;
};

BinaryArrayResizeState binaryArrayResizeState_{};

bool hasComplexType_ = false;
Expand Down Expand Up @@ -401,6 +416,8 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {
facebook::velox::serializer::presto::PrestoVectorSerde serde_;

SplitState splitState_{kInit};

std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
}; // class VeloxHashBasedShuffleWriter

} // namespace gluten
66 changes: 25 additions & 41 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,50 +207,13 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
std::vector<VectorPtr> children = {
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
makeRowVector({
makeFlatVector<int32_t>({1, 3}),
makeNullableFlatVector<velox::StringView>({std::nullopt, "de"}),
}),
makeNullableFlatVector<StringView>({std::nullopt, "10 I'm not inline string"}),
makeArrayVector<int64_t>({
{1, 2, 3, 4, 5},
{1, 2, 3},
}),
makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, "str4000"}}}),
};
auto dataVector = makeRowVector(children);
auto children = childrenComplex_;
children.insert((children.begin()), makeFlatVector<int32_t>({1, 2}));
auto vector = makeRowVector(children);
auto firstBlock = takeRows({inputVectorComplex_}, {{1}});
auto secondBlock = takeRows({inputVectorComplex_}, {{0}});

auto firstBlock = makeRowVector({
makeConstant<int32_t>(1, 1),
makeRowVector({
makeConstant<int32_t>(3, 1),
makeFlatVector<velox::StringView>({"de"}),
}),
makeFlatVector<StringView>({"10 I'm not inline string"}),
makeArrayVector<int64_t>({
{1, 2, 3},
}),
makeMapVector<int32_t, StringView>({{{3, "str3000"}, {4, "str4000"}}}),
});

auto secondBlock = makeRowVector({
makeNullConstant(TypeKind::INTEGER, 1),
makeRowVector({
makeConstant<int32_t>(1, 1),
makeNullableFlatVector<velox::StringView>({std::nullopt}),
}),
makeNullableFlatVector<StringView>({std::nullopt}),
makeArrayVector<int64_t>({
{1, 2, 3, 4, 5},
}),
makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}}),
});

testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataVector->type(), {{firstBlock}, {secondBlock}});
testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, inputVectorComplex_->type(), {{firstBlock}, {secondBlock}});
}

TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
Expand Down Expand Up @@ -631,6 +594,27 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}

TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
auto shuffleWriter = createShuffleWriter(&pool);

pool.setEvictable(shuffleWriter.get());
for (int i = 0; i < 3; ++i) {
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
}

// Reclaim from PartitionWriter to free cached bytes.
auto payloadSize = shuffleWriter->cachedPayloadSize();
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
ASSERT_EQ(evicted, payloadSize);

// When evicting partitioning buffers in stop, spill will be triggered by complex types allocating extra memory.
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); }));
}

TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
Expand Down
17 changes: 17 additions & 0 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,27 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048, std::nullopt)),
};
childrenComplex_ = {
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
makeRowVector({
makeFlatVector<int32_t>({1, 3}),
makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "de"}),
}),
makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10 I'm not inline string"}),
makeArrayVector<int64_t>({
{1, 2, 3, 4, 5},
{1, 2, 3},
}),
makeMapVector<int32_t, facebook::velox::StringView>(
{{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, "str4000"}}}),
};

inputVector1_ = makeRowVector(children1_);
inputVector2_ = makeRowVector(children2_);
inputVectorNoNull_ = makeRowVector(childrenNoNull_);
inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
inputVectorComplex_ = makeRowVector(childrenComplex_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
Expand Down Expand Up @@ -217,6 +232,7 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
std::vector<facebook::velox::VectorPtr> childrenNoNull_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
std::vector<facebook::velox::VectorPtr> childrenComplex_;

facebook::velox::RowVectorPtr inputVector1_;
facebook::velox::RowVectorPtr inputVector2_;
Expand All @@ -225,6 +241,7 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
std::string largeString2_;
facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
facebook::velox::RowVectorPtr inputVectorComplex_;
};

class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
Expand Down
Loading