From 005b52bea48301a8c15b348e0c5e5bd41ea744c6 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 21 Oct 2024 22:37:10 -0700 Subject: [PATCH] Fix spill related issues in TopNRowNumber operator (#11310) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11310 This change fix two spilling related issues in TopNRowNumber: (1) close method for TopNRowNumber is not reenterable if the operator fails in the middle of processing as the close methods does in-place destruction to free up memory back to the HSA. This can cause problem in query abort code path as the aborted operator's close method could be called twice even through there is no-concurrency problem there. This PR fixes the issue by clearing the used hash table, row container and HSA on the first close. This is verified with unit test and failed LBM query. Note this is found in LBM stress test. (2) free the hash table after spill to clear more memory to make spill more efficient and verified with unit test by checking the operator's memory usage goes zero after spill. Reviewed By: bikramSingh91, oerling Differential Revision: D64654069 fbshipit-source-id: fb4bead42f4002071dcce8879dcf83f35ff6e6ea --- velox/exec/HashBuild.cpp | 2 +- velox/exec/HashTable.h | 4 +- velox/exec/TopNRowNumber.cpp | 43 +++++---- velox/exec/tests/TopNRowNumberTest.cpp | 117 +++++++++++++++++++++++++ 4 files changed, 146 insertions(+), 20 deletions(-) diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 498f4569c627..42da1fbbd6fd 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -1120,7 +1120,7 @@ void HashBuild::reclaim( memory::createAsyncMemoryReclaimTask([buildOp]() { try { buildOp->spiller_->spill(); - buildOp->table_->clear(); + buildOp->table_->clear(true); // Release the minimum reserved memory. buildOp->pool()->release(); return std::make_unique(nullptr); diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index d70fc1bf8722..7f504c58e42b 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -289,7 +289,7 @@ class BaseHashTable { /// Deletes any content of 'this'. If 'freeTable' is false, then hash table is /// not freed which can be used for flushing a partial group by, for example. - virtual void clear(bool freeTable = false) = 0; + virtual void clear(bool freeTable) = 0; /// Returns the capacity of the internal hash table which is number of rows /// it can stores in a group by or hash join build. @@ -529,7 +529,7 @@ class HashTable : public BaseHashTable { int32_t maxRows, char** rows) override; - void clear(bool freeTable = false) override; + void clear(bool freeTable) override; int64_t allocatedBytes() const override { // For each row: sizeof(char*) per table entry + memory diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index a85bf2688185..26ecbd149631 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -428,7 +428,7 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { if (remainingRowsInPartition_ > 0) { auto& partition = currentPartition(); auto start = partition.rows.size() - remainingRowsInPartition_; - auto numRows = + const auto numRows = std::min(outputBatchSize_, remainingRowsInPartition_); appendPartitionRows(partition, start, numRows, offset, rowNumbers); offset += numRows; @@ -461,7 +461,7 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { if (offset == 0) { data_->clear(); if (table_ != nullptr) { - table_->clear(); + table_->clear(true); } pool()->release(); return nullptr; @@ -503,7 +503,7 @@ void TopNRowNumber::setupNextOutput( int32_t rowNumber) { nextRowNumber_ = rowNumber; - auto lookAhead = merge_->next(); + auto* lookAhead = merge_->next(); if (lookAhead == nullptr) { nextRowNumber_ = 0; return; @@ -521,7 +521,7 @@ void TopNRowNumber::setupNextOutput( // Skip remaining rows for this partition. lookAhead->pop(); - while (auto next = merge_->next()) { + while (auto* next = merge_->next()) { if (isNewPartition(output, output->size(), next)) { nextRowNumber_ = 0; return; @@ -616,18 +616,27 @@ bool TopNRowNumber::isFinished() { void TopNRowNumber::close() { Operator::close(); - if (table_) { - partitionIt_.reset(); - partitions_.resize(1000); - while (auto numPartitions = table_->listAllRows( - &partitionIt_, - partitions_.size(), - RowContainer::kUnlimited, - partitions_.data())) { - for (auto i = 0; i < numPartitions; ++i) { - std::destroy_at( - reinterpret_cast(partitions_[i] + partitionOffset_)); - } + SCOPE_EXIT { + table_.reset(); + singlePartition_.reset(); + data_.reset(); + allocator_.reset(); + }; + + if (table_ == nullptr) { + return; + } + + partitionIt_.reset(); + partitions_.resize(1'000); + while (auto numPartitions = table_->listAllRows( + &partitionIt_, + partitions_.size(), + RowContainer::kUnlimited, + partitions_.data())) { + for (auto i = 0; i < numPartitions; ++i) { + std::destroy_at( + reinterpret_cast(partitions_[i] + partitionOffset_)); } } } @@ -729,7 +738,7 @@ void TopNRowNumber::spill() { updateEstimatedOutputRowSize(); spiller_->spill(); - table_->clear(); + table_->clear(true); data_->clear(); pool()->release(); } diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index a7518accc739..10f2fffca4e0 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -380,5 +380,122 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) { } } +// This test verifies that TopNRowNumber operator reclaim all the memory after +// spill. +DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { + std::atomic_int inputCount{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "TopNRowNumber") { + return; + } + // Inject spill in the middle of aggregation input processing. + if (++inputCount != 3) { + return; + } + testingRunArbitration(op->pool()); + ASSERT_EQ(op->pool()->usedBytes(), 0); + ASSERT_EQ(op->pool()->reservedBytes(), 0); + }))); + + const vector_size_t size = 10'000; + auto data = split( + makeRowVector( + {"d", "s", "p"}, + { + // Data. + makeFlatVector( + size, [](auto row) { return row; }, nullEvery(11)), + // Sorting key. + makeFlatVector( + size, + [](auto row) { return (size - row) * 10; }, + [](auto row) { return row == 123; }), + // Partitioning key. Make sure to spread rows from the same + // partition across multiple batches to trigger de-dup logic when + // reading back spilled data. + makeFlatVector( + size, [](auto row) { return row % 5'000; }, nullEvery(7)), + }), + 10); + + createDuckDbTable(data); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + + core::PlanNodeId topNRowNumberId; + auto plan = PlanBuilder() + .values(data) + .topNRowNumber({"p"}, {"s"}, 1'000, true) + .capturePlanNodeId(topNRowNumberId) + .planNode(); + + const auto sql = + "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000"; + auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") + .spillDirectory(spillDirectory->getPath()) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(topNRowNumberId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); +} + +// This test verifies that TopNRowNumber operator can be closed twice which +// might be triggered by memory pool abort. +DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) { + const std::string errorMessage("doubleClose"); + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::noMoreInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "TopNRowNumber") { + return; + } + op->close(); + VELOX_FAIL(errorMessage); + }))); + + const vector_size_t size = 10'000; + auto data = split( + makeRowVector( + {"d", "s", "p"}, + { + // Data. + makeFlatVector( + size, [](auto row) { return row; }, nullEvery(11)), + // Sorting key. + makeFlatVector( + size, + [](auto row) { return (size - row) * 10; }, + [](auto row) { return row == 123; }), + // Partitioning key. Make sure to spread rows from the same + // partition across multiple batches to trigger de-dup logic when + // reading back spilled data. + makeFlatVector( + size, [](auto row) { return row % 5'000; }, nullEvery(7)), + }), + 10); + + core::PlanNodeId topNRowNumberId; + auto plan = PlanBuilder() + .values(data) + .topNRowNumber({"p"}, {"s"}, 1'000, true) + .capturePlanNodeId(topNRowNumberId) + .planNode(); + + const auto sql = + "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000"; + + VELOX_ASSERT_THROW(assertQuery(plan, sql), errorMessage); +} } // namespace } // namespace facebook::velox::exec