Skip to content

Commit

Permalink
[GLUTEN-6477][VL] Fix occasional dead lock during spilling (apache#6515)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored and weiting-chen committed Jul 25, 2024
1 parent ad97dea commit 5e22be8
Showing 1 changed file with 12 additions and 22 deletions.
34 changes: 12 additions & 22 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,25 @@ std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
}

namespace {
class ConditionalSuspendedSection {
class SuspendedSection {
public:
ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
if (condition) {
section_ = new velox::exec::SuspendedSection(driver);
}
SuspendedSection() {
reclaimer_->enterArbitration();
}

virtual ~ConditionalSuspendedSection() {
if (section_) {
delete section_;
}
virtual ~SuspendedSection() {
reclaimer_->leaveArbitration();
}

// singleton
ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = delete;
ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = delete;
SuspendedSection(const SuspendedSection&) = delete;
SuspendedSection(SuspendedSection&&) = delete;
SuspendedSection& operator=(const SuspendedSection&) = delete;
SuspendedSection& operator=(SuspendedSection&&) = delete;

private:
velox::exec::SuspendedSection* section_ = nullptr;
// We only use suspension APIs in exec::MemoryReclaimer.
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer_{velox::exec::MemoryReclaimer::create()};
};
} // namespace

Expand All @@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
if (spillStrategy_ == "auto") {
int64_t remaining = size - shrunken;
LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes...";
// if we are on one of the driver of the spilled task, suspend it
velox::exec::Driver* thisDriver = nullptr;
task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
if (driver->isOnThread()) {
thisDriver = driver;
}
});
// suspend the driver when we are on it
ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
SuspendedSection suspender;
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling
Expand Down

0 comments on commit 5e22be8

Please sign in to comment.