diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index 5d58ee9b032e..ad075bdacf36 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -215,7 +215,39 @@ void PrefixSort::extractRowToPrefix(char* row, char* prefix) { getAddressFromPrefix(prefix) = row; } -void PrefixSort::sortInternal(std::vector& rows) { +// static. +uint32_t PrefixSort::maxRequiredBytes( + memory::MemoryPool* pool, + RowContainer* rowContainer, + const std::vector& compareFlags, + const velox::common::PrefixSortConfig& config) { + if (rowContainer->numRows() < config.threshold) { + return 0; + } + VELOX_DCHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); + const auto sortLayout = PrefixSortLayout::makeSortLayout( + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + if (sortLayout.noNormalizedKeys) { + return 0; + } + + PrefixSort prefixSort(pool, rowContainer, sortLayout); + return prefixSort.maxRequiredBytes(); +} + +uint32_t PrefixSort::maxRequiredBytes() { + const auto numRows = rowContainer_->numRows(); + const auto numPages = + memory::AllocationTraits::numPages(numRows * sortLayout_.entrySize); + // Prefix data size + swap buffer size. + return memory::AllocationTraits::pageBytes(numPages) + + pool_->preferredSize(checkedPlus( + sortLayout_.entrySize, AlignedBuffer::kPaddedSize)) + + 2 * pool_->alignment(); +} + +void PrefixSort::sortInternal( + std::vector>& rows) { const auto numRows = rows.size(); const auto entrySize = sortLayout_.entrySize; memory::ContiguousAllocation prefixAllocation; diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index 0f860c777ded..a565694d1c9d 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -26,7 +26,7 @@ namespace facebook::velox::exec { namespace detail { FOLLY_ALWAYS_INLINE void stdSort( - std::vector& rows, + std::vector>& rows, RowContainer* rowContainer, const std::vector& compareFlags) { std::sort( @@ -120,7 +120,7 @@ class PrefixSort { /// @param rows The result of RowContainer::listRows(), assuming that the /// caller (SortBuffer etc.) has already got the result. FOLLY_ALWAYS_INLINE static void sort( - std::vector& rows, + std::vector>& rows, memory::MemoryPool* pool, RowContainer* rowContainer, const std::vector& compareFlags, @@ -143,8 +143,21 @@ class PrefixSort { prefixSort.sortInternal(rows); } + /// The stdsort won't require bytes while prefixsort may require buffers + /// such as prefix data. The logic is similar to the above function + /// PrefixSort::sort but returns the maxmium buffer the sort may need. + static uint32_t maxRequiredBytes( + memory::MemoryPool* pool, + RowContainer* rowContainer, + const std::vector& compareFlags, + const velox::common::PrefixSortConfig& config); + private: - void sortInternal(std::vector& rows); + // Estimates the memory required for prefix sort such as prefix buffer and + // swap buffer. + uint32_t maxRequiredBytes(); + + void sortInternal(std::vector>& rows); int compareAllNormalizedKeys(char* left, char* right); diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index 000583fadf07..f8cd69496172 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -34,7 +34,8 @@ SortBuffer::SortBuffer( nonReclaimableSection_(nonReclaimableSection), prefixSortConfig_(prefixSortConfig), spillConfig_(spillConfig), - spillStats_(spillStats) { + spillStats_(spillStats), + sortedRows_(0, memory::StlAllocator(*pool)) { VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); VELOX_CHECK_GT(sortCompareFlags_.size(), 0); VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size()); @@ -109,6 +110,9 @@ void SortBuffer::noMoreInput() { velox::common::testutil::TestValue::adjust( "facebook::velox::exec::SortBuffer::noMoreInput", this); VELOX_CHECK(!noMoreInput_); + // It may trigger spill, make sure it's triggered before noMoreInput_ is set. + ensureSortFits(); + noMoreInput_ = true; // No data. @@ -274,6 +278,42 @@ void SortBuffer::ensureOutputFits() { << ", reservation: " << succinctBytes(pool_->reservedBytes()); } +void SortBuffer::ensureSortFits() { + // Check if spilling is enabled or not. + if (spillConfig_ == nullptr) { + return; + } + + // Test-only spill path. + if (testingTriggerSpill(pool_->name())) { + spill(); + return; + } + + if (numInputRows_ == 0 || spiller_ != nullptr) { + return; + } + + // The memory for std::vector sorted rows and prefix sort required buffer. + uint64_t sortBufferToReserve = + numInputRows_ * sizeof(char*) + + PrefixSort::maxRequiredBytes( + pool_, data_.get(), sortCompareFlags_, prefixSortConfig_); + { + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); + if (pool_->maybeReserve(sortBufferToReserve)) { + return; + } + } + + LOG(WARNING) << fmt::format( + "Failed to reserve {} for memory pool {}, usage: {}, reservation: {}", + succinctBytes(sortBufferToReserve), + pool_->name(), + succinctBytes(pool_->usedBytes()), + succinctBytes(pool_->reservedBytes())); +} + void SortBuffer::updateEstimatedOutputRowSize() { const auto optionalRowSize = data_->estimateRowSize(); if (!optionalRowSize.has_value() || optionalRowSize.value() == 0) { @@ -320,11 +360,14 @@ void SortBuffer::spillOutput() { spillerStoreType_, spillConfig_, spillStats_); - auto spillRows = std::vector( - sortedRows_.begin() + numOutputRows_, sortedRows_.end()); + auto spillRows = Spiller::SpillRows( + sortedRows_.begin() + numOutputRows_, + sortedRows_.end(), + *memory::spillMemoryPool()); spiller_->spill(spillRows); data_->clear(); sortedRows_.clear(); + sortedRows_.shrink_to_fit(); // Finish right after spilling as the output spiller only spills at most // once. finishSpill(); diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 3791bcf71258..72ebfced76ce 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -74,6 +74,9 @@ class SortBuffer { // Reserves memory for output processing. If reservation cannot be increased, // spills enough to make output fit. void ensureOutputFits(); + // Reserves memory for sort. If reservation cannot be increased, + // spills enough to make output fit. + void ensureSortFits(); void updateEstimatedOutputRowSize(); // Invoked to initialize or reset the reusable output buffer to get output. void prepareOutput(vector_size_t maxOutputRows); @@ -113,7 +116,7 @@ class SortBuffer { uint64_t numInputRows_ = 0; // Used to store the input data in row format. std::unique_ptr data_; - std::vector sortedRows_; + std::vector> sortedRows_; // The data type of the rows stored in 'data_' and spilled on disk. The // sort key columns are stored first then the non-sorted data columns. diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 5107758ccca9..325a0ede0091 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -51,7 +51,8 @@ SortWindowBuild::SortWindowBuild( compareFlags_{makeCompareFlags(numPartitionKeys_, node->sortingOrders())}, pool_(pool), prefixSortConfig_(prefixSortConfig), - spillStats_(spillStats) { + spillStats_(spillStats), + sortedRows_(0, memory::StlAllocator(*pool)) { VELOX_CHECK_NOT_NULL(pool_); allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size()); allKeyInfo_.insert( @@ -252,6 +253,7 @@ void SortWindowBuild::noMoreInput() { void SortWindowBuild::loadNextPartitionFromSpill() { sortedRows_.clear(); + sortedRows_.shrink_to_fit(); data_->clear(); for (;;) { diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 7f995fc35ac3..5801738b589d 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -104,7 +104,7 @@ class SortWindowBuild : public WindowBuild { // The rows are sorted by partitionKeys + sortKeys. This total // ordering can be used to split partitions (with the correct // order by) for the processing. - std::vector sortedRows_; + std::vector> sortedRows_; // This is a vector that gives the index of the start row // (in sortedRows_) of each partition in the RowContainer data_. diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 746c1edd4730..9f1f1513ba1c 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -581,7 +581,7 @@ void Spiller::spill(const RowContainerIterator* startRowIter) { checkEmptySpillRuns(); } -void Spiller::spill(std::vector& rows) { +void Spiller::spill(SpillRows& rows) { CHECK_NOT_FINALIZED(); VELOX_CHECK_EQ(type_, Type::kOrderByOutput); VELOX_CHECK(!rows.empty()); @@ -705,7 +705,7 @@ bool Spiller::fillSpillRuns(RowContainerIterator* iterator) { return lastRun; } -void Spiller::fillSpillRun(std::vector& rows) { +void Spiller::fillSpillRun(SpillRows& rows) { VELOX_CHECK_EQ(bits_.numPartitions(), 1); checkEmptySpillRuns(); uint64_t execTimeNs{0}; diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 621c37f3a2e2..33bbcb1f5c66 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -127,7 +127,7 @@ class Spiller { /// 'kOrderByOutput' spiller type to spill during the order by /// output processing. Similarly, the spilled rows still stays in the row /// container. The caller needs to erase them from the row container. - void spill(std::vector& rows); + void spill(SpillRows& rows); /// Append 'spillVector' into the spill file of given 'partition'. It is now /// only used by the spilling operator which doesn't need data sort, such as @@ -297,7 +297,7 @@ class Spiller { // Prepares spill run of a single partition for the spillable data from the // rows. - void fillSpillRun(std::vector& rows); + void fillSpillRun(SpillRows& rows); // Writes out all the rows collected in spillRuns_. void runSpill(bool lastRun); diff --git a/velox/exec/benchmarks/PrefixSortBenchmark.cpp b/velox/exec/benchmarks/PrefixSortBenchmark.cpp index 0af6a9f90bdf..aae331be3f0f 100644 --- a/velox/exec/benchmarks/PrefixSortBenchmark.cpp +++ b/velox/exec/benchmarks/PrefixSortBenchmark.cpp @@ -144,7 +144,8 @@ class PrefixSortBenchmark { RowContainer* rowContainer, const std::vector& compareFlags) { // Copy rows to avoid sort rows already sorted. - std::vector sortedRows = rows; + auto sortedRows = std::vector>( + rows.begin(), rows.end(), *pool_); PrefixSort::sort( sortedRows, pool_, rowContainer, compareFlags, kDefaultSortConfig); } @@ -153,7 +154,8 @@ class PrefixSortBenchmark { const std::vector& rows, RowContainer* rowContainer, const std::vector& compareFlags) { - std::vector sortedRows = rows; + auto sortedRows = std::vector>( + rows.begin(), rows.end(), *pool_); PrefixSort::sort( sortedRows, pool_, rowContainer, compareFlags, kStdSortConfig); } diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 13f3ccf96fbf..4c97adb2d800 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -24,7 +24,7 @@ namespace { class PrefixSortTest : public exec::test::OperatorTestBase { protected: - std::vector + std::vector> storeRows(int numRows, const RowVectorPtr& sortedRows, RowContainer* data); static constexpr CompareFlags kAsc{ @@ -57,18 +57,30 @@ class PrefixSortTest : public exec::test::OperatorTestBase { rowType->children().end()}; RowContainer rowContainer(keyTypes, payloadTypes, pool_.get()); - std::vector rows = storeRows(numRows, data, &rowContainer); - + auto rows = storeRows(numRows, data, &rowContainer); + const std::shared_ptr sortPool = + rootPool_->addLeafChild("prefixsort"); + const auto maxBytes = PrefixSort::maxRequiredBytes( + sortPool.get(), + &rowContainer, + compareFlags, + common::PrefixSortConfig{ + 1024, + // Set threshold to 0 to enable prefix-sort in small dataset. + 0}); + const auto beforeBytes = sortPool->peakBytes(); + ASSERT_EQ(sortPool->peakBytes(), 0); // Use PrefixSort to sort rows. PrefixSort::sort( rows, - pool_.get(), + sortPool.get(), &rowContainer, compareFlags, common::PrefixSortConfig{ 1024, // Set threshold to 0 to enable prefix-sort in small dataset. 0}); + ASSERT_GE(maxBytes, sortPool->peakBytes() - beforeBytes); // Extract data from the RowContainer in order. const RowVectorPtr actual = @@ -89,11 +101,11 @@ class PrefixSortTest : public exec::test::OperatorTestBase { const RowVectorPtr& sortedRows); }; -std::vector PrefixSortTest::storeRows( +std::vector> PrefixSortTest::storeRows( int numRows, const RowVectorPtr& sortedRows, RowContainer* data) { - std::vector rows; + std::vector> rows(*pool()); SelectivityVector allRows(numRows); rows.resize(numRows); for (int row = 0; row < numRows; ++row) { @@ -116,7 +128,7 @@ const RowVectorPtr PrefixSortTest::generateExpectedResult( const auto rowType = asRowType(sortedRows->type()); const int numKeys = compareFlags.size(); RowContainer rowContainer(rowType->children(), pool_.get()); - std::vector rows = storeRows(numRows, sortedRows, &rowContainer); + auto rows = storeRows(numRows, sortedRows, &rowContainer); std::sort( rows.begin(), rows.end(), [&](const char* leftRow, const char* rightRow) { diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index c09b1c724927..1ea0390ed5c2 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -75,6 +75,13 @@ class SortBufferTest : public OperatorTestBase { {"c3", REAL()}, {"c4", DOUBLE()}, {"c5", VARCHAR()}}); + const RowTypePtr nonPrefixSortInputType_ = ROW( + {{"c0", VARCHAR()}, + {"c1", VARCHAR()}, + {"c2", VARCHAR()}, + {"c3", VARCHAR()}, + {"c4", VARCHAR()}, + {"c5", VARCHAR()}}); // Specifies the sort columns ["c4", "c1"]. std::vector sortColumnIndices_{4, 1}; std::vector sortCompareFlags_{ @@ -565,7 +572,7 @@ DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringOutput) { } } -DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) { +DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemorySortGetOutput) { for (bool spillEnabled : {false, true}) { SCOPED_TRACE(fmt::format("spillEnabled {}", spillEnabled)); @@ -611,13 +618,64 @@ DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) { // Sets an extreme large value to get output once to avoid test flakiness. sortBuffer->getOutput(1'000'000); if (spillEnabled) { - ASSERT_EQ(numReserves, 1); + // Reserve memory for sort and getOutput. + ASSERT_EQ(numReserves, 2); } else { ASSERT_EQ(numReserves, 0); } } } +DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemorySort) { + struct { + bool usePrefixSort; + bool spillEnabled; + } testSettings[] = {{false, true}, {true, false}, {true, true}}; + + for (const auto [usePrefixSort, spillEnabled] : testSettings) { + SCOPED_TRACE(fmt::format( + "usePrefixSort: {}, spillEnabled: {}, ", usePrefixSort, spillEnabled)); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto spillConfig = getSpillConfig(spillDirectory->getPath()); + folly::Synchronized spillStats; + const RowTypePtr inputType = + usePrefixSort ? inputType_ : nonPrefixSortInputType_; + auto sortBuffer = std::make_unique( + inputType, + sortColumnIndices_, + sortCompareFlags_, + pool_.get(), + &nonReclaimableSection_, + prefixSortConfig_, + spillEnabled ? &spillConfig : nullptr, + &spillStats); + + const std::shared_ptr spillSource = + memory::memoryManager()->addLeafPool("spillSource"); + VectorFuzzer fuzzer({.vectorSize = 100}, spillSource.get()); + + TestScopedSpillInjection scopedSpillInjection(0); + sortBuffer->addInput(fuzzer.fuzzRow(inputType)); + + std::atomic_bool hasReserveMemory = false; + // Reserve memory for sort. + SCOPED_TESTVALUE_SET( + "facebook::velox::common::memory::MemoryPoolImpl::maybeReserve", + std::function( + ([&](memory::MemoryPoolImpl* pool) { + hasReserveMemory.store(true); + }))); + + sortBuffer->noMoreInput(); + if (spillEnabled) { + // Reserve memory for sort. + ASSERT_TRUE(hasReserveMemory); + } else { + ASSERT_FALSE(hasReserveMemory); + } + } +} + TEST_F(SortBufferTest, emptySpill) { const std::shared_ptr fuzzerPool = memory::memoryManager()->addLeafPool("emptySpillSource"); diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index f243ed364ad3..f842c39f4976 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -1176,7 +1176,7 @@ TEST_P(AllTypes, nonSortedSpillFunctions) { if (type_ == Spiller::Type::kOrderByOutput) { RowContainerIterator rowIter; - std::vector rows(5'000); + std::vector> rows(5'000, *pool_); int numListedRows{0}; numListedRows = rowContainer_->listRows(&rowIter, 5000, rows.data()); ASSERT_EQ(numListedRows, 5000); @@ -1548,11 +1548,11 @@ TEST_P(OrderByOutputOnly, basic) { "Unexpected spiller type: ORDER_BY_OUTPUT"); } { - std::vector emptyRows; + Spiller::SpillRows emptyRows(*pool_); VELOX_ASSERT_THROW(spiller_->spill(emptyRows), ""); } auto spillRows = - std::vector(rows.begin(), rows.begin() + numListedRows); + Spiller::SpillRows(rows.begin(), rows.begin() + numListedRows, *pool_); spiller_->spill(spillRows); ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); @@ -1630,7 +1630,7 @@ TEST_P(MaxSpillRunTest, basic) { testData.maxSpillRunRows); if (type_ == Spiller::Type::kOrderByOutput) { RowContainerIterator rowIter; - std::vector rows(numRows); + Spiller::SpillRows rows(numRows, *pool_); int numListedRows{0}; numListedRows = rowContainer_->listRows(&rowIter, numRows, rows.data()); ASSERT_EQ(numListedRows, numRows);