diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 02125897cc47..4544c01653d9 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -210,28 +210,25 @@ std::shared_ptr WholeStageResultIterator::next() { } namespace { -class ConditionalSuspendedSection { +class SuspendedSection { public: - ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) { - if (condition) { - section_ = new velox::exec::SuspendedSection(driver); - } + SuspendedSection() { + reclaimer_->enterArbitration(); } - virtual ~ConditionalSuspendedSection() { - if (section_) { - delete section_; - } + virtual ~SuspendedSection() { + reclaimer_->leaveArbitration(); } // singleton - ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete; - ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete; - ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = delete; - ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = delete; + SuspendedSection(const SuspendedSection&) = delete; + SuspendedSection(SuspendedSection&&) = delete; + SuspendedSection& operator=(const SuspendedSection&) = delete; + SuspendedSection& operator=(SuspendedSection&&) = delete; private: - velox::exec::SuspendedSection* section_ = nullptr; + // We only use suspension APIs in exec::MemoryReclaimer. + std::unique_ptr reclaimer_{velox::exec::MemoryReclaimer::create()}; }; } // namespace @@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { if (spillStrategy_ == "auto") { int64_t remaining = size - shrunken; LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes..."; - // if we are on one of the driver of the spilled task, suspend it - velox::exec::Driver* thisDriver = nullptr; - task_->testingVisitDrivers([&](velox::exec::Driver* driver) { - if (driver->isOnThread()) { - thisDriver = driver; - } - }); // suspend the driver when we are on it - ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr); + SuspendedSection suspender; velox::exec::MemoryReclaimer::Stats status; auto* mm = memoryManager_->getMemoryManager(); uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling