From b420cd6b470ab0c88474e5197c5991a693aa2913 Mon Sep 17 00:00:00 2001 From: yan ma Date: Mon, 15 Jul 2024 18:03:17 +0800 Subject: [PATCH] Revert "[VL] Fix shuffle spill triggered by evicting buffers during stop (#6422)" This reverts commit e95f32ab613dc39f07118072e0f403057ffce49a. --- .../shuffle/VeloxHashBasedShuffleWriter.cc | 16 ++--- .../shuffle/VeloxHashBasedShuffleWriter.h | 19 +----- cpp/velox/tests/VeloxShuffleWriterTest.cc | 66 ++++++++++++------- .../utils/tests/VeloxShuffleWriterTestBase.h | 17 ----- 4 files changed, 48 insertions(+), 70 deletions(-) diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc index f9cd2780c3269..3bd1a2fbc6cce 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc @@ -329,10 +329,8 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo } arrow::Status VeloxHashBasedShuffleWriter::stop() { - setSplitState(SplitState::kStopEvict); if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { - PartitionBufferGuard guard(partitionBufferInUse_, pid); RETURN_NOT_OK(evictPartitionBuffers(pid, false)); } } @@ -972,6 +970,10 @@ arrow::Result>> VeloxHashBasedShuffle bool reuseBuffers) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]); + if (partitionBufferBase_[partitionId] == 0) { + return std::vector>{}; + } + auto numRows = partitionBufferBase_[partitionId]; auto fixedWidthIdx = 0; auto binaryIdx = 0; @@ -1319,9 +1321,6 @@ arrow::Result VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz // Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_) std::vector> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { - if (partitionBufferInUse_.has_value() && *partitionBufferInUse_ == pid) { - continue; - } if (partitionBufferSize_[pid] > 0 && partitionBufferSize_[pid] > partitionBufferBase_[pid]) { pidToSize.emplace_back(pid, partitionBufferSize_[pid] - partitionBufferBase_[pid]); } @@ -1349,7 +1348,6 @@ arrow::Result VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz arrow::Result VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) { // Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from // shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_ - VELOX_CHECK(!partitionBufferInUse_); int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); int64_t evicted = 0; std::vector> pidToSize; @@ -1377,12 +1375,10 @@ arrow::Result VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const { // If OOM happens during SplitState::kSplit, it is triggered by binary buffers resize. // Or during SplitState::kInit, it is triggered by other operators. - // Or during SplitState::kStopEvict, it is triggered by assembleBuffers allocating extra memory. In this case we use - // PartitionBufferGuard to prevent the target partition from being shrunk. // The reclaim order is spill->shrink, because the partition buffers can be reused. // SinglePartitioning doesn't maintain partition buffers. return options_.partitioning != Partitioning::kSingle && - (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit || splitState_ == SplitState::kStopEvict); + (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit); } bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const { @@ -1395,7 +1391,7 @@ arrow::Result VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShr if (splitState_ == SplitState::kSplit) { return partitionBufferBase_[partitionId] + partition2RowCount_[partitionId]; } - if (splitState_ == kInit || splitState_ == SplitState::kStopEvict) { + if (splitState_ == kInit || splitState_ == SplitState::kStop) { return partitionBufferBase_[partitionId]; } return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_)); diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h index f78211980a865..a11f84e952a61 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h @@ -87,7 +87,7 @@ namespace gluten { #endif // end of VELOX_SHUFFLE_WRITER_PRINT -enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop }; +enum SplitState { kInit, kPreAlloc, kSplit, kStop }; struct BinaryArrayResizeState { bool inResize; @@ -303,21 +303,6 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); - class PartitionBufferGuard { - public: - PartitionBufferGuard(std::optional& partitionInUse, uint32_t partitionId) - : partitionBufferInUse_(partitionInUse) { - partitionBufferInUse_ = partitionId; - } - - ~PartitionBufferGuard() { - partitionBufferInUse_ = std::nullopt; - } - - private: - std::optional& partitionBufferInUse_; - }; - BinaryArrayResizeState binaryArrayResizeState_{}; bool hasComplexType_ = false; @@ -416,8 +401,6 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { facebook::velox::serializer::presto::PrestoVectorSerde serde_; SplitState splitState_{kInit}; - - std::optional partitionBufferInUse_{std::nullopt}; }; // class VeloxHashBasedShuffleWriter } // namespace gluten diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 957657eeb5c19..1c1be6fc1b6f3 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -207,13 +207,50 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) { TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) { ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get()); - auto children = childrenComplex_; + std::vector children = { + makeNullableFlatVector({std::nullopt, 1}), + makeRowVector({ + makeFlatVector({1, 3}), + makeNullableFlatVector({std::nullopt, "de"}), + }), + makeNullableFlatVector({std::nullopt, "10 I'm not inline string"}), + makeArrayVector({ + {1, 2, 3, 4, 5}, + {1, 2, 3}, + }), + makeMapVector({{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, "str4000"}}}), + }; + auto dataVector = makeRowVector(children); children.insert((children.begin()), makeFlatVector({1, 2})); auto vector = makeRowVector(children); - auto firstBlock = takeRows({inputVectorComplex_}, {{1}}); - auto secondBlock = takeRows({inputVectorComplex_}, {{0}}); - testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, inputVectorComplex_->type(), {{firstBlock}, {secondBlock}}); + auto firstBlock = makeRowVector({ + makeConstant(1, 1), + makeRowVector({ + makeConstant(3, 1), + makeFlatVector({"de"}), + }), + makeFlatVector({"10 I'm not inline string"}), + makeArrayVector({ + {1, 2, 3}, + }), + makeMapVector({{{3, "str3000"}, {4, "str4000"}}}), + }); + + auto secondBlock = makeRowVector({ + makeNullConstant(TypeKind::INTEGER, 1), + makeRowVector({ + makeConstant(1, 1), + makeNullableFlatVector({std::nullopt}), + }), + makeNullableFlatVector({std::nullopt}), + makeArrayVector({ + {1, 2, 3, 4, 5}, + }), + makeMapVector({{{1, "str1000"}, {2, "str2000"}}}), + }); + + testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataVector->type(), {{firstBlock}, {secondBlock}}); } TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) { @@ -594,27 +631,6 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) { } } -TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) { - ASSERT_NOT_OK(initShuffleWriterOptions()); - shuffleWriterOptions_.bufferSize = 4; - auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); - auto shuffleWriter = createShuffleWriter(&pool); - - pool.setEvictable(shuffleWriter.get()); - for (int i = 0; i < 3; ++i) { - ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_)); - } - - // Reclaim from PartitionWriter to free cached bytes. - auto payloadSize = shuffleWriter->cachedPayloadSize(); - int64_t evicted; - ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted)); - ASSERT_EQ(evicted, payloadSize); - - // When evicting partitioning buffers in stop, spill will be triggered by complex types allocating extra memory. - ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); })); -} - TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) { ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferSize = 4; diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 41b7f31e02d7e..fd3ae3d547fee 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -172,27 +172,12 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase makeNullableFlatVector( std::vector>(2048, std::nullopt)), }; - childrenComplex_ = { - makeNullableFlatVector({std::nullopt, 1}), - makeRowVector({ - makeFlatVector({1, 3}), - makeNullableFlatVector({std::nullopt, "de"}), - }), - makeNullableFlatVector({std::nullopt, "10 I'm not inline string"}), - makeArrayVector({ - {1, 2, 3, 4, 5}, - {1, 2, 3}, - }), - makeMapVector( - {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, "str4000"}}}), - }; inputVector1_ = makeRowVector(children1_); inputVector2_ = makeRowVector(children2_); inputVectorNoNull_ = makeRowVector(childrenNoNull_); inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_); inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_); - inputVectorComplex_ = makeRowVector(childrenComplex_); } arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) { @@ -232,7 +217,6 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase std::vector childrenNoNull_; std::vector childrenLargeBinary1_; std::vector childrenLargeBinary2_; - std::vector childrenComplex_; facebook::velox::RowVectorPtr inputVector1_; facebook::velox::RowVectorPtr inputVector2_; @@ -241,7 +225,6 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase std::string largeString2_; facebook::velox::RowVectorPtr inputVectorLargeBinary1_; facebook::velox::RowVectorPtr inputVectorLargeBinary2_; - facebook::velox::RowVectorPtr inputVectorComplex_; }; class VeloxShuffleWriterTest : public ::testing::TestWithParam, public VeloxShuffleWriterTestBase {