From 0b5e5af76ebb19c1e3e579ee3975b515d9ffe295 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 23 Jul 2024 17:13:53 +0800 Subject: [PATCH] fixup --- cpp/velox/compute/WholeStageResultIterator.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b15f2085599f..6d0a94c3b70e 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -212,12 +212,12 @@ std::shared_ptr WholeStageResultIterator::next() { namespace { class SuspendedSection { public: - SuspendedSection(velox::memory::MemoryPool* pool) : pool_(pool) { - pool_->enterArbitration(); + SuspendedSection() { + reclaimer_->enterArbitration(); } virtual ~SuspendedSection() { - pool_->leaveArbitration(); + reclaimer_->leaveArbitration(); } // singleton @@ -227,7 +227,8 @@ class SuspendedSection { SuspendedSection& operator=(SuspendedSection&&) = delete; private: - velox::memory::MemoryPool* pool_ = nullptr; + // We only use suspension APIs in exec::MemoryReclaimer. + std::unique_ptr reclaimer_{velox::exec::MemoryReclaimer::create()}; }; } // namespace @@ -241,7 +242,7 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { int64_t remaining = size - shrunken; LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes..."; // suspend the driver when we are on it - SuspendedSection suspender(pool.get()); + SuspendedSection suspender; velox::exec::MemoryReclaimer::Stats status; auto* mm = memoryManager_->getMemoryManager(); uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling