Skip to content

Commit

Permalink
Fix spill related issues in TopNRowNumber operator (facebookincubator…
Browse files Browse the repository at this point in the history
…#11310)

Summary:
Pull Request resolved: facebookincubator#11310

This change fix two spilling related issues in TopNRowNumber:
(1) close method for TopNRowNumber is not reenterable if the operator fails in the middle of
processing as the close methods does in-place destruction to free up memory back to the HSA. This
can cause problem in query abort code path as the aborted operator's close method could
be called twice even through there is no-concurrency problem there.
This PR fixes the issue by clearing
the used hash table, row container and HSA on the first close. This is verified with unit test and failed LBM
query. Note this is found in LBM stress test.
(2) free the hash table after spill to clear more memory to make spill more efficient and verified with unit test
by checking the operator's memory usage goes zero after spill.

Reviewed By: bikramSingh91, oerling

Differential Revision: D64654069

fbshipit-source-id: fb4bead42f4002071dcce8879dcf83f35ff6e6ea
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 22, 2024
1 parent 516cc9b commit 005b52b
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 20 deletions.
2 changes: 1 addition & 1 deletion velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ void HashBuild::reclaim(
memory::createAsyncMemoryReclaimTask<SpillResult>([buildOp]() {
try {
buildOp->spiller_->spill();
buildOp->table_->clear();
buildOp->table_->clear(true);
// Release the minimum reserved memory.
buildOp->pool()->release();
return std::make_unique<SpillResult>(nullptr);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class BaseHashTable {

/// Deletes any content of 'this'. If 'freeTable' is false, then hash table is
/// not freed which can be used for flushing a partial group by, for example.
virtual void clear(bool freeTable = false) = 0;
virtual void clear(bool freeTable) = 0;

/// Returns the capacity of the internal hash table which is number of rows
/// it can stores in a group by or hash join build.
Expand Down Expand Up @@ -529,7 +529,7 @@ class HashTable : public BaseHashTable {
int32_t maxRows,
char** rows) override;

void clear(bool freeTable = false) override;
void clear(bool freeTable) override;

int64_t allocatedBytes() const override {
// For each row: sizeof(char*) per table entry + memory
Expand Down
43 changes: 26 additions & 17 deletions velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() {
if (remainingRowsInPartition_ > 0) {
auto& partition = currentPartition();
auto start = partition.rows.size() - remainingRowsInPartition_;
auto numRows =
const auto numRows =
std::min<vector_size_t>(outputBatchSize_, remainingRowsInPartition_);
appendPartitionRows(partition, start, numRows, offset, rowNumbers);
offset += numRows;
Expand Down Expand Up @@ -461,7 +461,7 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() {
if (offset == 0) {
data_->clear();
if (table_ != nullptr) {
table_->clear();
table_->clear(true);
}
pool()->release();
return nullptr;
Expand Down Expand Up @@ -503,7 +503,7 @@ void TopNRowNumber::setupNextOutput(
int32_t rowNumber) {
nextRowNumber_ = rowNumber;

auto lookAhead = merge_->next();
auto* lookAhead = merge_->next();
if (lookAhead == nullptr) {
nextRowNumber_ = 0;
return;
Expand All @@ -521,7 +521,7 @@ void TopNRowNumber::setupNextOutput(
// Skip remaining rows for this partition.
lookAhead->pop();

while (auto next = merge_->next()) {
while (auto* next = merge_->next()) {
if (isNewPartition(output, output->size(), next)) {
nextRowNumber_ = 0;
return;
Expand Down Expand Up @@ -616,18 +616,27 @@ bool TopNRowNumber::isFinished() {
void TopNRowNumber::close() {
Operator::close();

if (table_) {
partitionIt_.reset();
partitions_.resize(1000);
while (auto numPartitions = table_->listAllRows(
&partitionIt_,
partitions_.size(),
RowContainer::kUnlimited,
partitions_.data())) {
for (auto i = 0; i < numPartitions; ++i) {
std::destroy_at(
reinterpret_cast<TopRows*>(partitions_[i] + partitionOffset_));
}
SCOPE_EXIT {
table_.reset();
singlePartition_.reset();
data_.reset();
allocator_.reset();
};

if (table_ == nullptr) {
return;
}

partitionIt_.reset();
partitions_.resize(1'000);
while (auto numPartitions = table_->listAllRows(
&partitionIt_,
partitions_.size(),
RowContainer::kUnlimited,
partitions_.data())) {
for (auto i = 0; i < numPartitions; ++i) {
std::destroy_at(
reinterpret_cast<TopRows*>(partitions_[i] + partitionOffset_));
}
}
}
Expand Down Expand Up @@ -729,7 +738,7 @@ void TopNRowNumber::spill() {
updateEstimatedOutputRowSize();

spiller_->spill();
table_->clear();
table_->clear(true);
data_->clear();
pool()->release();
}
Expand Down
117 changes: 117 additions & 0 deletions velox/exec/tests/TopNRowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,5 +380,122 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) {
}
}

// This test verifies that TopNRowNumber operator reclaim all the memory after
// spill.
DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) {
std::atomic_int inputCount{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::addInput",
std::function<void(exec::Operator*)>(([&](exec::Operator* op) {
if (op->testingOperatorCtx()->operatorType() != "TopNRowNumber") {
return;
}
// Inject spill in the middle of aggregation input processing.
if (++inputCount != 3) {
return;
}
testingRunArbitration(op->pool());
ASSERT_EQ(op->pool()->usedBytes(), 0);
ASSERT_EQ(op->pool()->reservedBytes(), 0);
})));

const vector_size_t size = 10'000;
auto data = split(
makeRowVector(
{"d", "s", "p"},
{
// Data.
makeFlatVector<int64_t>(
size, [](auto row) { return row; }, nullEvery(11)),
// Sorting key.
makeFlatVector<int64_t>(
size,
[](auto row) { return (size - row) * 10; },
[](auto row) { return row == 123; }),
// Partitioning key. Make sure to spread rows from the same
// partition across multiple batches to trigger de-dup logic when
// reading back spilled data.
makeFlatVector<int64_t>(
size, [](auto row) { return row % 5'000; }, nullEvery(7)),
}),
10);

createDuckDbTable(data);

auto spillDirectory = exec::test::TempDirectoryPath::create();

core::PlanNodeId topNRowNumberId;
auto plan = PlanBuilder()
.values(data)
.topNRowNumber({"p"}, {"s"}, 1'000, true)
.capturePlanNodeId(topNRowNumberId)
.planNode();

const auto sql =
"SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) "
" WHERE rn <= 1000";
auto task = AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true")
.spillDirectory(spillDirectory->getPath())
.assertResults(sql);

auto taskStats = exec::toPlanStats(task->taskStats());
const auto& stats = taskStats.at(topNRowNumberId);

ASSERT_GT(stats.spilledBytes, 0);
ASSERT_GT(stats.spilledRows, 0);
ASSERT_GT(stats.spilledFiles, 0);
ASSERT_GT(stats.spilledPartitions, 0);
}

// This test verifies that TopNRowNumber operator can be closed twice which
// might be triggered by memory pool abort.
DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) {
const std::string errorMessage("doubleClose");
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::noMoreInput",
std::function<void(Operator*)>(([&](Operator* op) {
if (op->operatorType() != "TopNRowNumber") {
return;
}
op->close();
VELOX_FAIL(errorMessage);
})));

const vector_size_t size = 10'000;
auto data = split(
makeRowVector(
{"d", "s", "p"},
{
// Data.
makeFlatVector<int64_t>(
size, [](auto row) { return row; }, nullEvery(11)),
// Sorting key.
makeFlatVector<int64_t>(
size,
[](auto row) { return (size - row) * 10; },
[](auto row) { return row == 123; }),
// Partitioning key. Make sure to spread rows from the same
// partition across multiple batches to trigger de-dup logic when
// reading back spilled data.
makeFlatVector<int64_t>(
size, [](auto row) { return row % 5'000; }, nullEvery(7)),
}),
10);

core::PlanNodeId topNRowNumberId;
auto plan = PlanBuilder()
.values(data)
.topNRowNumber({"p"}, {"s"}, 1'000, true)
.capturePlanNodeId(topNRowNumberId)
.planNode();

const auto sql =
"SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) "
" WHERE rn <= 1000";

VELOX_ASSERT_THROW(assertQuery(plan, sql), errorMessage);
}
} // namespace
} // namespace facebook::velox::exec

0 comments on commit 005b52b

Please sign in to comment.