From 52130a747248c2e1d879296fa568ea5a244af1ce Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 19 Jul 2024 11:36:01 +0800 Subject: [PATCH] fixup --- cpp/velox/compute/WholeStageResultIterator.cc | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 8439545ca3828..6a004e07d97a0 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -210,28 +210,24 @@ std::shared_ptr WholeStageResultIterator::next() { } namespace { -class ConditionalSuspendedSection { +class SuspendedSection { public: - ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) { - if (condition) { - section_ = new velox::exec::SuspendedSection(driver); - } + SuspendedSection(velox::memory::MemoryPool* pool) : pool_(pool) { + pool_->enterArbitration(); } - virtual ~ConditionalSuspendedSection() { - if (section_) { - delete section_; - } + virtual ~SuspendedSection() { + pool_->leaveArbitration(); } // singleton - ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete; - ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete; - ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = delete; - ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = delete; + SuspendedSection(const SuspendedSection&) = delete; + SuspendedSection(SuspendedSection&&) = delete; + SuspendedSection& operator=(const SuspendedSection&) = delete; + SuspendedSection& operator=(SuspendedSection&&) = delete; private: - velox::exec::SuspendedSection* section_ = nullptr; + velox::memory::MemoryPool* pool_ = nullptr; }; } // namespace @@ -244,15 +240,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { if (spillStrategy_ == "auto") { int64_t remaining = size - shrunken; LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes..."; - // if we are on one of the driver of the spilled task, suspend it - velox::exec::Driver* thisDriver = nullptr; - task_->testingVisitDrivers([&](velox::exec::Driver* driver) { - if (driver->isOnThread()) { - thisDriver = driver; - } - }); // suspend the driver when we are on it - ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr); + SuspendedSection suspender(pool.get()); velox::exec::MemoryReclaimer::Stats status; auto* mm = memoryManager_->getMemoryManager(); uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling