Skip to content

Commit

Permalink
Fix Sort and Spill std::vector for rows memory not tracked causing OOM (
Browse files Browse the repository at this point in the history
#11129)

Summary:
Spark query failed by killed by yarn because the memory overhead exceeds the threshold.
std::vector for rows should be tracked by memory pool.
Need to refactor everywhere if the std::vector is allocated by rows, this is a first PR.
Reserve the memory for the std::vector for rows and prefix sort required buffer.

Pull Request resolved: #11129

Reviewed By: tanjialiang

Differential Revision: D65046657

Pulled By: xiaoxmeng

fbshipit-source-id: f05dee8893b1928fce97af60fb0f5c9afea5fca0
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Oct 28, 2024
1 parent fee04b8 commit 624a21c
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 29 deletions.
34 changes: 33 additions & 1 deletion velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,39 @@ void PrefixSort::extractRowToPrefix(char* row, char* prefix) {
getAddressFromPrefix(prefix) = row;
}

void PrefixSort::sortInternal(std::vector<char*>& rows) {
// static.
uint32_t PrefixSort::maxRequiredBytes(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
const velox::common::PrefixSortConfig& config) {
if (rowContainer->numRows() < config.threshold) {
return 0;
}
VELOX_DCHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize);
if (sortLayout.noNormalizedKeys) {
return 0;
}

PrefixSort prefixSort(pool, rowContainer, sortLayout);
return prefixSort.maxRequiredBytes();
}

uint32_t PrefixSort::maxRequiredBytes() {
const auto numRows = rowContainer_->numRows();
const auto numPages =
memory::AllocationTraits::numPages(numRows * sortLayout_.entrySize);
// Prefix data size + swap buffer size.
return memory::AllocationTraits::pageBytes(numPages) +
pool_->preferredSize(checkedPlus<size_t>(
sortLayout_.entrySize, AlignedBuffer::kPaddedSize)) +
2 * pool_->alignment();
}

void PrefixSort::sortInternal(
std::vector<char*, memory::StlAllocator<char*>>& rows) {
const auto numRows = rows.size();
const auto entrySize = sortLayout_.entrySize;
memory::ContiguousAllocation prefixAllocation;
Expand Down
19 changes: 16 additions & 3 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace facebook::velox::exec {
namespace detail {

FOLLY_ALWAYS_INLINE void stdSort(
std::vector<char*>& rows,
std::vector<char*, memory::StlAllocator<char*>>& rows,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags) {
std::sort(
Expand Down Expand Up @@ -120,7 +120,7 @@ class PrefixSort {
/// @param rows The result of RowContainer::listRows(), assuming that the
/// caller (SortBuffer etc.) has already got the result.
FOLLY_ALWAYS_INLINE static void sort(
std::vector<char*>& rows,
std::vector<char*, memory::StlAllocator<char*>>& rows,
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
Expand All @@ -143,8 +143,21 @@ class PrefixSort {
prefixSort.sortInternal(rows);
}

/// The stdsort won't require bytes while prefixsort may require buffers
/// such as prefix data. The logic is similar to the above function
/// PrefixSort::sort but returns the maxmium buffer the sort may need.
static uint32_t maxRequiredBytes(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
const velox::common::PrefixSortConfig& config);

private:
void sortInternal(std::vector<char*>& rows);
// Estimates the memory required for prefix sort such as prefix buffer and
// swap buffer.
uint32_t maxRequiredBytes();

void sortInternal(std::vector<char*, memory::StlAllocator<char*>>& rows);

int compareAllNormalizedKeys(char* left, char* right);

Expand Down
49 changes: 46 additions & 3 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ SortBuffer::SortBuffer(
nonReclaimableSection_(nonReclaimableSection),
prefixSortConfig_(prefixSortConfig),
spillConfig_(spillConfig),
spillStats_(spillStats) {
spillStats_(spillStats),
sortedRows_(0, memory::StlAllocator<char*>(*pool)) {
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
VELOX_CHECK_GT(sortCompareFlags_.size(), 0);
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size());
Expand Down Expand Up @@ -109,6 +110,9 @@ void SortBuffer::noMoreInput() {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::SortBuffer::noMoreInput", this);
VELOX_CHECK(!noMoreInput_);
// It may trigger spill, make sure it's triggered before noMoreInput_ is set.
ensureSortFits();

noMoreInput_ = true;

// No data.
Expand Down Expand Up @@ -274,6 +278,42 @@ void SortBuffer::ensureOutputFits() {
<< ", reservation: " << succinctBytes(pool_->reservedBytes());
}

void SortBuffer::ensureSortFits() {
// Check if spilling is enabled or not.
if (spillConfig_ == nullptr) {
return;
}

// Test-only spill path.
if (testingTriggerSpill(pool_->name())) {
spill();
return;
}

if (numInputRows_ == 0 || spiller_ != nullptr) {
return;
}

// The memory for std::vector sorted rows and prefix sort required buffer.
uint64_t sortBufferToReserve =
numInputRows_ * sizeof(char*) +
PrefixSort::maxRequiredBytes(
pool_, data_.get(), sortCompareFlags_, prefixSortConfig_);
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(sortBufferToReserve)) {
return;
}
}

LOG(WARNING) << fmt::format(
"Failed to reserve {} for memory pool {}, usage: {}, reservation: {}",
succinctBytes(sortBufferToReserve),
pool_->name(),
succinctBytes(pool_->usedBytes()),
succinctBytes(pool_->reservedBytes()));
}

void SortBuffer::updateEstimatedOutputRowSize() {
const auto optionalRowSize = data_->estimateRowSize();
if (!optionalRowSize.has_value() || optionalRowSize.value() == 0) {
Expand Down Expand Up @@ -320,11 +360,14 @@ void SortBuffer::spillOutput() {
spillerStoreType_,
spillConfig_,
spillStats_);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
auto spillRows = Spiller::SpillRows(
sortedRows_.begin() + numOutputRows_,
sortedRows_.end(),
*memory::spillMemoryPool());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
sortedRows_.shrink_to_fit();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class SortBuffer {
// Reserves memory for output processing. If reservation cannot be increased,
// spills enough to make output fit.
void ensureOutputFits();
// Reserves memory for sort. If reservation cannot be increased,
// spills enough to make output fit.
void ensureSortFits();
void updateEstimatedOutputRowSize();
// Invoked to initialize or reset the reusable output buffer to get output.
void prepareOutput(vector_size_t maxOutputRows);
Expand Down Expand Up @@ -113,7 +116,7 @@ class SortBuffer {
uint64_t numInputRows_ = 0;
// Used to store the input data in row format.
std::unique_ptr<RowContainer> data_;
std::vector<char*> sortedRows_;
std::vector<char*, memory::StlAllocator<char*>> sortedRows_;

// The data type of the rows stored in 'data_' and spilled on disk. The
// sort key columns are stored first then the non-sorted data columns.
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ SortWindowBuild::SortWindowBuild(
compareFlags_{makeCompareFlags(numPartitionKeys_, node->sortingOrders())},
pool_(pool),
prefixSortConfig_(prefixSortConfig),
spillStats_(spillStats) {
spillStats_(spillStats),
sortedRows_(0, memory::StlAllocator<char*>(*pool)) {
VELOX_CHECK_NOT_NULL(pool_);
allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size());
allKeyInfo_.insert(
Expand Down Expand Up @@ -252,6 +253,7 @@ void SortWindowBuild::noMoreInput() {

void SortWindowBuild::loadNextPartitionFromSpill() {
sortedRows_.clear();
sortedRows_.shrink_to_fit();
data_->clear();

for (;;) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class SortWindowBuild : public WindowBuild {
// The rows are sorted by partitionKeys + sortKeys. This total
// ordering can be used to split partitions (with the correct
// order by) for the processing.
std::vector<char*> sortedRows_;
std::vector<char*, memory::StlAllocator<char*>> sortedRows_;

// This is a vector that gives the index of the start row
// (in sortedRows_) of each partition in the RowContainer data_.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
checkEmptySpillRuns();
}

void Spiller::spill(std::vector<char*>& rows) {
void Spiller::spill(SpillRows& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
VELOX_CHECK(!rows.empty());
Expand Down Expand Up @@ -705,7 +705,7 @@ bool Spiller::fillSpillRuns(RowContainerIterator* iterator) {
return lastRun;
}

void Spiller::fillSpillRun(std::vector<char*>& rows) {
void Spiller::fillSpillRun(SpillRows& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeNs{0};
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Spiller {
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*>& rows);
void spill(SpillRows& rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
Expand Down Expand Up @@ -297,7 +297,7 @@ class Spiller {

// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*>& rows);
void fillSpillRun(SpillRows& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill(bool lastRun);
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/benchmarks/PrefixSortBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class PrefixSortBenchmark {
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags) {
// Copy rows to avoid sort rows already sorted.
std::vector<char*> sortedRows = rows;
auto sortedRows = std::vector<char*, memory::StlAllocator<char*>>(
rows.begin(), rows.end(), *pool_);
PrefixSort::sort(
sortedRows, pool_, rowContainer, compareFlags, kDefaultSortConfig);
}
Expand All @@ -153,7 +154,8 @@ class PrefixSortBenchmark {
const std::vector<char*>& rows,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags) {
std::vector<char*> sortedRows = rows;
auto sortedRows = std::vector<char*, memory::StlAllocator<char*>>(
rows.begin(), rows.end(), *pool_);
PrefixSort::sort(
sortedRows, pool_, rowContainer, compareFlags, kStdSortConfig);
}
Expand Down
26 changes: 19 additions & 7 deletions velox/exec/tests/PrefixSortTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace {

class PrefixSortTest : public exec::test::OperatorTestBase {
protected:
std::vector<char*>
std::vector<char*, memory::StlAllocator<char*>>
storeRows(int numRows, const RowVectorPtr& sortedRows, RowContainer* data);

static constexpr CompareFlags kAsc{
Expand Down Expand Up @@ -57,18 +57,30 @@ class PrefixSortTest : public exec::test::OperatorTestBase {
rowType->children().end()};

RowContainer rowContainer(keyTypes, payloadTypes, pool_.get());
std::vector<char*> rows = storeRows(numRows, data, &rowContainer);

auto rows = storeRows(numRows, data, &rowContainer);
const std::shared_ptr<memory::MemoryPool> sortPool =
rootPool_->addLeafChild("prefixsort");
const auto maxBytes = PrefixSort::maxRequiredBytes(
sortPool.get(),
&rowContainer,
compareFlags,
common::PrefixSortConfig{
1024,
// Set threshold to 0 to enable prefix-sort in small dataset.
0});
const auto beforeBytes = sortPool->peakBytes();
ASSERT_EQ(sortPool->peakBytes(), 0);
// Use PrefixSort to sort rows.
PrefixSort::sort(
rows,
pool_.get(),
sortPool.get(),
&rowContainer,
compareFlags,
common::PrefixSortConfig{
1024,
// Set threshold to 0 to enable prefix-sort in small dataset.
0});
ASSERT_GE(maxBytes, sortPool->peakBytes() - beforeBytes);

// Extract data from the RowContainer in order.
const RowVectorPtr actual =
Expand All @@ -89,11 +101,11 @@ class PrefixSortTest : public exec::test::OperatorTestBase {
const RowVectorPtr& sortedRows);
};

std::vector<char*> PrefixSortTest::storeRows(
std::vector<char*, memory::StlAllocator<char*>> PrefixSortTest::storeRows(
int numRows,
const RowVectorPtr& sortedRows,
RowContainer* data) {
std::vector<char*> rows;
std::vector<char*, memory::StlAllocator<char*>> rows(*pool());
SelectivityVector allRows(numRows);
rows.resize(numRows);
for (int row = 0; row < numRows; ++row) {
Expand All @@ -116,7 +128,7 @@ const RowVectorPtr PrefixSortTest::generateExpectedResult(
const auto rowType = asRowType(sortedRows->type());
const int numKeys = compareFlags.size();
RowContainer rowContainer(rowType->children(), pool_.get());
std::vector<char*> rows = storeRows(numRows, sortedRows, &rowContainer);
auto rows = storeRows(numRows, sortedRows, &rowContainer);

std::sort(
rows.begin(), rows.end(), [&](const char* leftRow, const char* rightRow) {
Expand Down
Loading

0 comments on commit 624a21c

Please sign in to comment.