Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jul 19, 2024
1 parent 8a2ac2a commit 52130a7
Showing 1 changed file with 11 additions and 22 deletions.
33 changes: 11 additions & 22 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,24 @@ std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
}

namespace {
class ConditionalSuspendedSection {
class SuspendedSection {
public:
ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
if (condition) {
section_ = new velox::exec::SuspendedSection(driver);
}
SuspendedSection(velox::memory::MemoryPool* pool) : pool_(pool) {
pool_->enterArbitration();
}

virtual ~ConditionalSuspendedSection() {
if (section_) {
delete section_;
}
virtual ~SuspendedSection() {
pool_->leaveArbitration();
}

// singleton
ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = delete;
ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = delete;
SuspendedSection(const SuspendedSection&) = delete;
SuspendedSection(SuspendedSection&&) = delete;
SuspendedSection& operator=(const SuspendedSection&) = delete;
SuspendedSection& operator=(SuspendedSection&&) = delete;

private:
velox::exec::SuspendedSection* section_ = nullptr;
velox::memory::MemoryPool* pool_ = nullptr;
};
} // namespace

Expand All @@ -244,15 +240,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
if (spillStrategy_ == "auto") {
int64_t remaining = size - shrunken;
LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes...";
// if we are on one of the driver of the spilled task, suspend it
velox::exec::Driver* thisDriver = nullptr;
task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
if (driver->isOnThread()) {
thisDriver = driver;
}
});
// suspend the driver when we are on it
ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
SuspendedSection suspender(pool.get());
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling
Expand Down

0 comments on commit 52130a7

Please sign in to comment.