diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 498f4569c627..42da1fbbd6fd 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -1120,7 +1120,7 @@ void HashBuild::reclaim( memory::createAsyncMemoryReclaimTask([buildOp]() { try { buildOp->spiller_->spill(); - buildOp->table_->clear(); + buildOp->table_->clear(true); // Release the minimum reserved memory. buildOp->pool()->release(); return std::make_unique(nullptr); diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index d70fc1bf8722..7f504c58e42b 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -289,7 +289,7 @@ class BaseHashTable { /// Deletes any content of 'this'. If 'freeTable' is false, then hash table is /// not freed which can be used for flushing a partial group by, for example. - virtual void clear(bool freeTable = false) = 0; + virtual void clear(bool freeTable) = 0; /// Returns the capacity of the internal hash table which is number of rows /// it can stores in a group by or hash join build. @@ -529,7 +529,7 @@ class HashTable : public BaseHashTable { int32_t maxRows, char** rows) override; - void clear(bool freeTable = false) override; + void clear(bool freeTable) override; int64_t allocatedBytes() const override { // For each row: sizeof(char*) per table entry + memory diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index a85bf2688185..26ecbd149631 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -428,7 +428,7 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { if (remainingRowsInPartition_ > 0) { auto& partition = currentPartition(); auto start = partition.rows.size() - remainingRowsInPartition_; - auto numRows = + const auto numRows = std::min(outputBatchSize_, remainingRowsInPartition_); appendPartitionRows(partition, start, numRows, offset, rowNumbers); offset += numRows; @@ -461,7 +461,7 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { if (offset == 0) { data_->clear(); if (table_ != nullptr) { - table_->clear(); + table_->clear(true); } pool()->release(); return nullptr; @@ -503,7 +503,7 @@ void TopNRowNumber::setupNextOutput( int32_t rowNumber) { nextRowNumber_ = rowNumber; - auto lookAhead = merge_->next(); + auto* lookAhead = merge_->next(); if (lookAhead == nullptr) { nextRowNumber_ = 0; return; @@ -521,7 +521,7 @@ void TopNRowNumber::setupNextOutput( // Skip remaining rows for this partition. lookAhead->pop(); - while (auto next = merge_->next()) { + while (auto* next = merge_->next()) { if (isNewPartition(output, output->size(), next)) { nextRowNumber_ = 0; return; @@ -616,18 +616,27 @@ bool TopNRowNumber::isFinished() { void TopNRowNumber::close() { Operator::close(); - if (table_) { - partitionIt_.reset(); - partitions_.resize(1000); - while (auto numPartitions = table_->listAllRows( - &partitionIt_, - partitions_.size(), - RowContainer::kUnlimited, - partitions_.data())) { - for (auto i = 0; i < numPartitions; ++i) { - std::destroy_at( - reinterpret_cast(partitions_[i] + partitionOffset_)); - } + SCOPE_EXIT { + table_.reset(); + singlePartition_.reset(); + data_.reset(); + allocator_.reset(); + }; + + if (table_ == nullptr) { + return; + } + + partitionIt_.reset(); + partitions_.resize(1'000); + while (auto numPartitions = table_->listAllRows( + &partitionIt_, + partitions_.size(), + RowContainer::kUnlimited, + partitions_.data())) { + for (auto i = 0; i < numPartitions; ++i) { + std::destroy_at( + reinterpret_cast(partitions_[i] + partitionOffset_)); } } } @@ -729,7 +738,7 @@ void TopNRowNumber::spill() { updateEstimatedOutputRowSize(); spiller_->spill(); - table_->clear(); + table_->clear(true); data_->clear(); pool()->release(); } diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index a7518accc739..10f2fffca4e0 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -380,5 +380,122 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) { } } +// This test verifies that TopNRowNumber operator reclaim all the memory after +// spill. +DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { + std::atomic_int inputCount{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "TopNRowNumber") { + return; + } + // Inject spill in the middle of aggregation input processing. + if (++inputCount != 3) { + return; + } + testingRunArbitration(op->pool()); + ASSERT_EQ(op->pool()->usedBytes(), 0); + ASSERT_EQ(op->pool()->reservedBytes(), 0); + }))); + + const vector_size_t size = 10'000; + auto data = split( + makeRowVector( + {"d", "s", "p"}, + { + // Data. + makeFlatVector( + size, [](auto row) { return row; }, nullEvery(11)), + // Sorting key. + makeFlatVector( + size, + [](auto row) { return (size - row) * 10; }, + [](auto row) { return row == 123; }), + // Partitioning key. Make sure to spread rows from the same + // partition across multiple batches to trigger de-dup logic when + // reading back spilled data. + makeFlatVector( + size, [](auto row) { return row % 5'000; }, nullEvery(7)), + }), + 10); + + createDuckDbTable(data); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + + core::PlanNodeId topNRowNumberId; + auto plan = PlanBuilder() + .values(data) + .topNRowNumber({"p"}, {"s"}, 1'000, true) + .capturePlanNodeId(topNRowNumberId) + .planNode(); + + const auto sql = + "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000"; + auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") + .spillDirectory(spillDirectory->getPath()) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(topNRowNumberId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); +} + +// This test verifies that TopNRowNumber operator can be closed twice which +// might be triggered by memory pool abort. +DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) { + const std::string errorMessage("doubleClose"); + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::noMoreInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "TopNRowNumber") { + return; + } + op->close(); + VELOX_FAIL(errorMessage); + }))); + + const vector_size_t size = 10'000; + auto data = split( + makeRowVector( + {"d", "s", "p"}, + { + // Data. + makeFlatVector( + size, [](auto row) { return row; }, nullEvery(11)), + // Sorting key. + makeFlatVector( + size, + [](auto row) { return (size - row) * 10; }, + [](auto row) { return row == 123; }), + // Partitioning key. Make sure to spread rows from the same + // partition across multiple batches to trigger de-dup logic when + // reading back spilled data. + makeFlatVector( + size, [](auto row) { return row % 5'000; }, nullEvery(7)), + }), + 10); + + core::PlanNodeId topNRowNumberId; + auto plan = PlanBuilder() + .values(data) + .topNRowNumber({"p"}, {"s"}, 1'000, true) + .capturePlanNodeId(topNRowNumberId) + .planNode(); + + const auto sql = + "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000"; + + VELOX_ASSERT_THROW(assertQuery(plan, sql), errorMessage); +} } // namespace } // namespace facebook::velox::exec