From f40fa8acfabbafd0fdd18da981b60b76538a7b5d Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Wed, 7 Aug 2024 01:24:56 -0700 Subject: [PATCH] Let memory arbitration directly track the liveness of query memory pools (#10674) Summary: In current implementation, memory manager tracks the liveness of query memory pools. For each memory arbitration, it makes a copy of all the alive memory pools and pass them to the memory arbitrator which is not required most of the time. This PR changes to let memory arbitrator track the memory pools directly to avoid this cost. This also enables the global memory arbitration optimizations in followup. The Meta internal shadow run shows 4% cpu reduction. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10674 Reviewed By: tanjialiang Differential Revision: D60868050 Pulled By: xiaoxmeng fbshipit-source-id: 816f799a82e35a7bb0f58434c614895128d8e159 --- velox/common/base/tests/StatsReporterTest.cpp | 21 +- velox/common/memory/Memory.cpp | 12 +- velox/common/memory/MemoryArbitrator.cpp | 41 +-- velox/common/memory/MemoryArbitrator.h | 69 ++-- velox/common/memory/SharedArbitrator.cpp | 302 ++++++++++-------- velox/common/memory/SharedArbitrator.h | 94 +++--- .../memory/tests/MemoryAllocatorTest.cpp | 1 - .../memory/tests/MemoryArbitratorTest.cpp | 63 ++-- .../common/memory/tests/MemoryManagerTest.cpp | 20 +- .../memory/tests/MockSharedArbitratorTest.cpp | 1 + .../memory/tests/SharedArbitratorTest.cpp | 11 +- velox/exec/tests/HashJoinTest.cpp | 1 - velox/exec/tests/TableWriteTest.cpp | 6 +- 13 files changed, 312 insertions(+), 330 deletions(-) diff --git a/velox/common/base/tests/StatsReporterTest.cpp b/velox/common/base/tests/StatsReporterTest.cpp index 3b199e64cb3e..bf268d1a78ef 100644 --- a/velox/common/base/tests/StatsReporterTest.cpp +++ b/velox/common/base/tests/StatsReporterTest.cpp @@ -242,15 +242,13 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator { return "test"; } - uint64_t growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/) - override { - return 0; + void addPool(const std::shared_ptr& /*unused*/) override { } - bool growCapacity( - memory::MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + void removePool(memory::MemoryPool* /*unused*/) override {} + + bool growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/) + override { return false; } @@ -259,11 +257,8 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator { return 0; } - uint64_t shrinkCapacity( - const std::vector>& /*unused*/, - uint64_t /*unused*/, - bool /*unused*/, - bool /*unused*/) override { + uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/) + override { return 0; } @@ -546,7 +541,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { .sumEvictScore = 10, .ssdStats = newSsdStats}); arbitrator.updateStats(memory::MemoryArbitrator::Stats( - 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10)); + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10)); std::this_thread::sleep_for(std::chrono::milliseconds(4'000)); // Stop right after sufficient wait to ensure the following reads from main diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 31071b6d5a2f..b73575d546b2 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -76,6 +76,8 @@ std::unique_ptr createArbitrator( // code will be removed. extraArbitratorConfigs["reserved-capacity"] = folly::to(options.arbitratorReservedCapacity); + extraArbitratorConfigs["memory-pool-initial-capacity"] = + folly::to(options.memoryPoolInitCapacity); extraArbitratorConfigs["memory-pool-reserved-capacity"] = folly::to(options.memoryPoolReservedCapacity); extraArbitratorConfigs["memory-pool-transfer-capacity"] = @@ -271,8 +273,7 @@ std::shared_ptr MemoryManager::addRootPool( options); pools_.emplace(poolName, pool); VELOX_CHECK_EQ(pool->capacity(), 0); - arbitrator_->growCapacity( - pool.get(), std::min(poolInitCapacity_, maxCapacity)); + arbitrator_->addPool(pool); RECORD_HISTOGRAM_METRIC_VALUE( kMetricMemoryPoolInitialCapacityBytes, pool->capacity()); return pool; @@ -292,15 +293,14 @@ std::shared_ptr MemoryManager::addLeafPool( bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) { VELOX_CHECK_NOT_NULL(pool); VELOX_CHECK_NE(pool->capacity(), kMaxMemory); - return arbitrator_->growCapacity(pool, getAlivePools(), incrementBytes); + return arbitrator_->growCapacity(pool, incrementBytes); } uint64_t MemoryManager::shrinkPools( uint64_t targetBytes, bool allowSpill, bool allowAbort) { - return arbitrator_->shrinkCapacity( - getAlivePools(), targetBytes, allowSpill, allowAbort); + return arbitrator_->shrinkCapacity(targetBytes, allowSpill, allowAbort); } void MemoryManager::dropPool(MemoryPool* pool) { @@ -312,7 +312,7 @@ void MemoryManager::dropPool(MemoryPool* pool) { } pools_.erase(it); VELOX_DCHECK_EQ(pool->reservedBytes(), 0); - arbitrator_->shrinkCapacity(pool, 0); + arbitrator_->removePool(pool); } MemoryPool& MemoryManager::deprecatedSharedLeafPool() { diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 106b79e82c3c..edac6dc1973f 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -91,19 +91,18 @@ class NoopArbitrator : public MemoryArbitrator { return "NOOP"; } - // Noop arbitrator has no memory capacity limit so no operation needed for - // memory pool capacity reserve. - uint64_t growCapacity(MemoryPool* pool, uint64_t /*unused*/) override { - growPool(pool, pool->maxCapacity(), 0); - return pool->capacity(); + void addPool(const std::shared_ptr& pool) override { + VELOX_CHECK_EQ(pool->capacity(), 0); + growPool(pool.get(), pool->maxCapacity(), 0); + } + + void removePool(MemoryPool* pool) override { + VELOX_CHECK_EQ(pool->reservedBytes(), 0); } // Noop arbitrator has no memory capacity limit so no operation needed for // memory pool capacity grow. - bool growCapacity( - MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { return false; } @@ -117,7 +116,6 @@ class NoopArbitrator : public MemoryArbitrator { // Noop arbitrator has no memory capacity limit so no operation needed for // memory pool capacity shrink. uint64_t shrinkCapacity( - const std::vector>& /* unused */, uint64_t /* unused */, bool /* unused */, bool /* unused */) override { @@ -336,8 +334,7 @@ MemoryArbitrator::Stats::Stats( uint64_t _freeReservedCapacityBytes, uint64_t _reclaimTimeUs, uint64_t _numNonReclaimableAttempts, - uint64_t _numReserves, - uint64_t _numReleases) + uint64_t _numShrinks) : numRequests(_numRequests), numSucceeded(_numSucceeded), numAborted(_numAborted), @@ -351,21 +348,19 @@ MemoryArbitrator::Stats::Stats( freeReservedCapacityBytes(_freeReservedCapacityBytes), reclaimTimeUs(_reclaimTimeUs), numNonReclaimableAttempts(_numNonReclaimableAttempts), - numReserves(_numReserves), - numReleases(_numReleases) {} + numShrinks(_numShrinks) {} std::string MemoryArbitrator::Stats::toString() const { return fmt::format( "STATS[numRequests {} numAborted {} numFailures {} " - "numNonReclaimableAttempts {} numReserves {} numReleases {} " + "numNonReclaimableAttempts {} numShrinks {} " "queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} " "reclaimedMemory {} maxCapacity {} freeCapacity {} freeReservedCapacity {}]", numRequests, numAborted, numFailures, numNonReclaimableAttempts, - numReserves, - numReleases, + numShrinks, succinctMicros(queueTimeUs), succinctMicros(arbitrationTimeUs), succinctMicros(reclaimTimeUs), @@ -393,8 +388,7 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-( result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs; result.numNonReclaimableAttempts = numNonReclaimableAttempts - other.numNonReclaimableAttempts; - result.numReserves = numReserves - other.numReserves; - result.numReleases = numReleases - other.numReleases; + result.numShrinks = numShrinks - other.numShrinks; return result; } @@ -413,8 +407,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { freeReservedCapacityBytes, reclaimTimeUs, numNonReclaimableAttempts, - numReserves, - numReleases) == + numShrinks) == std::tie( other.numRequests, other.numSucceeded, @@ -429,8 +422,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { other.freeReservedCapacityBytes, other.reclaimTimeUs, other.numNonReclaimableAttempts, - other.numReserves, - other.numReleases); + other.numShrinks); } bool MemoryArbitrator::Stats::operator!=(const Stats& other) const { @@ -459,8 +451,7 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const { UPDATE_COUNTER(numReclaimedBytes); UPDATE_COUNTER(reclaimTimeUs); UPDATE_COUNTER(numNonReclaimableAttempts); - UPDATE_COUNTER(numReserves); - UPDATE_COUNTER(numReleases); + UPDATE_COUNTER(numShrinks); #undef UPDATE_COUNTER VELOX_CHECK( !((gtCount > 0) && (ltCount > 0)), diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 01cd82393705..be4768ab0d95 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -57,11 +57,6 @@ class MemoryArbitrator { /// manager. int64_t capacity; - /// TODO: Remove this call back from Config. It can be directly implemented - /// in SharedArbitrator instead of passing in as an config option. Provided - /// - /// by the query system to validate the state after a memory pool enters - /// arbitration if not null. For instance, Prestissimo provides /// callback to check if a memory arbitration request is issued from a /// driver thread, then the driver should be put in suspended state to avoid /// the potential deadlock when reclaim memory from the task of the request @@ -107,29 +102,23 @@ class MemoryArbitrator { virtual ~MemoryArbitrator() = default; - /// Invoked by the memory manager to allocate up to 'targetBytes' of free - /// memory capacity without triggering memory arbitration. The function will - /// grow the memory pool's capacity based on the free available memory - /// capacity in the arbitrator, and returns the actual grown capacity in - /// bytes. - virtual uint64_t growCapacity(MemoryPool* pool, uint64_t bytes) = 0; + /// Invoked by the memory manager to add a newly created memory pool. The + /// memory arbitrator allocates the initial capacity for 'pool' and + /// dynamically adjusts its capacity based query memory needs through memory + /// arbitration. + virtual void addPool(const std::shared_ptr& pool) = 0; + + /// Invoked by the memory manager to remove a destroyed memory pool. The + /// memory arbitrator frees up all its capacity and stops memory arbitration + /// operation on it. + virtual void removePool(MemoryPool* pool) = 0; /// Invoked by the memory manager to grow a memory pool's capacity. - /// 'pool' is the memory pool to request to grow. 'candidates' is a list - /// of query root pools to participate in the memory arbitration. The memory - /// arbitrator picks up a number of pools to either shrink its memory capacity - /// without actually freeing memory or reclaim its used memory to free up - /// enough memory for 'requestor' to grow. Different arbitrators use different - /// policies to select the candidate pools. The shared memory arbitrator used - /// by both Prestissimo and Prestissimo-on-Spark, selects the candidates with - /// more memory capacity. - /// - /// NOTE: the memory manager keeps 'candidates' valid during the arbitration - /// processing. - virtual bool growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t targetBytes) = 0; + /// 'pool' is the memory pool to request to grow. The memory arbitrator picks + /// up a number of pools to either shrink its memory capacity without actually + /// freeing memory or reclaim its used memory to free up enough memory for + /// 'requestor' to grow. + virtual bool growCapacity(MemoryPool* pool, uint64_t targetBytes) = 0; /// Invoked by the memory manager to shrink up to 'targetBytes' free capacity /// from a memory 'pool', and returns them back to the arbitrator. If @@ -137,17 +126,16 @@ class MemoryArbitrator { /// pool. The function returns the actual freed capacity from 'pool'. virtual uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) = 0; - /// Invoked by the memory manager to shrink memory capacity from a given list - /// of memory pools by reclaiming free and used memory. The freed memory - /// capacity is given back to the arbitrator. If 'targetBytes' is zero, then - /// try to reclaim all the memory from 'pools'. The function returns the - /// actual freed memory capacity in bytes. If 'allowSpill' is true, it - /// reclaims the used memory by spilling. If 'allowAbort' is true, it reclaims - /// the used memory by aborting the queries with the most memory usage. If - /// both are true, it first reclaims the used memory by spilling and then - /// abort queries to reach the reclaim target. + /// Invoked by the memory manager to shrink memory capacity from memory pools + /// by reclaiming free and used memory. The freed memory capacity is given + /// back to the arbitrator. If 'targetBytes' is zero, then try to reclaim all + /// the memory from 'pools'. The function returns the actual freed memory + /// capacity in bytes. If 'allowSpill' is true, it reclaims the used memory by + /// spilling. If 'allowAbort' is true, it reclaims the used memory by aborting + /// the queries with the most memory usage. If both are true, it first + /// reclaims the used memory by spilling and then abort queries to reach the + /// reclaim target. virtual uint64_t shrinkCapacity( - const std::vector>& pools, uint64_t targetBytes, bool allowSpill = true, bool allowAbort = false) = 0; @@ -183,10 +171,8 @@ class MemoryArbitrator { /// The total number of times of the reclaim attempts that end up failing /// due to reclaiming at non-reclaimable stage. uint64_t numNonReclaimableAttempts{0}; - /// The total number of memory reservations. - uint64_t numReserves{0}; - /// The total number of memory releases. - uint64_t numReleases{0}; + /// The total number of memory capacity shrinks. + uint64_t numShrinks{0}; Stats( uint64_t _numRequests, @@ -202,8 +188,7 @@ class MemoryArbitrator { uint64_t _freeReservedCapacityBytes, uint64_t _reclaimTimeUs, uint64_t _numNonReclaimableAttempts, - uint64_t _numReserves, - uint64_t _numReleases); + uint64_t _numShrinks); Stats() = default; diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index b8e5d4d43a39..e9d429be0d01 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -63,86 +63,6 @@ std::string memoryPoolAbortMessage( return out.str(); } -void sortCandidatesByReclaimableFreeCapacity( - std::vector& candidates) { - std::sort( - candidates.begin(), - candidates.end(), - [&](const SharedArbitrator::Candidate& lhs, - const SharedArbitrator::Candidate& rhs) { - return lhs.freeBytes > rhs.freeBytes; - }); - - TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableFreeCapacity", - &candidates); -} - -void sortCandidatesByReclaimableUsedCapacity( - std::vector& candidates) { - std::sort( - candidates.begin(), - candidates.end(), - [](const SharedArbitrator::Candidate& lhs, - const SharedArbitrator::Candidate& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; - }); - - TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", - &candidates); -} - -void sortCandidatesByUsage( - std::vector& candidates) { - std::sort( - candidates.begin(), - candidates.end(), - [](const SharedArbitrator::Candidate& lhs, - const SharedArbitrator::Candidate& rhs) { - return lhs.reservedBytes > rhs.reservedBytes; - }); -} - -// Finds the candidate with the largest capacity. For 'requestor', the -// capacity for comparison including its current capacity and the capacity to -// grow. -const SharedArbitrator::Candidate& findCandidateWithLargestCapacity( - MemoryPool* requestor, - uint64_t targetBytes, - const std::vector& candidates) { - VELOX_CHECK(!candidates.empty()); - int32_t candidateIdx{-1}; - int64_t maxCapacity{-1}; - for (int32_t i = 0; i < candidates.size(); ++i) { - const bool isCandidate = candidates[i].pool == requestor; - // For capacity comparison, the requestor's capacity should include both its - // current capacity and the capacity growth. - const int64_t capacity = - candidates[i].pool->capacity() + (isCandidate ? targetBytes : 0); - if (i == 0) { - candidateIdx = 0; - maxCapacity = capacity; - continue; - } - if (capacity < maxCapacity) { - continue; - } - if (capacity > maxCapacity) { - candidateIdx = i; - maxCapacity = capacity; - continue; - } - // With the same amount of capacity, we prefer to kill the requestor itself - // without affecting the other query. - if (isCandidate) { - candidateIdx = i; - } - } - VELOX_CHECK_NE(candidateIdx, -1); - return candidates[candidateIdx]; -} - template T getConfig( const std::unordered_map& configs, @@ -166,6 +86,12 @@ int64_t SharedArbitrator::ExtraConfig::getReservedCapacity( configs, kReservedCapacity, kDefaultReservedCapacity); } +uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity( + const std::unordered_map& configs) { + return getConfig( + configs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity); +} + uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity( const std::unordered_map& configs) { return getConfig( @@ -198,6 +124,8 @@ bool SharedArbitrator::ExtraConfig::getCheckUsageLeak( SharedArbitrator::SharedArbitrator(const Config& config) : MemoryArbitrator(config), reservedCapacity_(ExtraConfig::getReservedCapacity(config.extraConfigs)), + memoryPoolInitialCapacity_( + ExtraConfig::getMemoryPoolInitialCapacity(config.extraConfigs)), memoryPoolReservedCapacity_( ExtraConfig::getMemoryPoolReservedCapacity(config.extraConfigs)), memoryPoolTransferCapacity_( @@ -216,12 +144,13 @@ SharedArbitrator::SharedArbitrator(const Config& config) std::string SharedArbitrator::Candidate::toString() const { return fmt::format( "CANDIDATE[{}] RECLAIMABLE_BYTES[{}] FREE_BYTES[{}]]", - pool->root()->name(), + pool->name(), succinctBytes(reclaimableBytes), succinctBytes(freeBytes)); } SharedArbitrator::~SharedArbitrator() { + VELOX_CHECK(candidates_.empty()); if (freeNonReservedCapacity_ + freeReservedCapacity_ != capacity_) { const std::string errMsg = fmt::format( "Unexpected free capacity leak in arbitrator: freeNonReservedCapacity_[{}] + freeReservedCapacity_[{}] != capacity_[{}])\\n{}", @@ -237,19 +166,135 @@ SharedArbitrator::~SharedArbitrator() { } } -void SharedArbitrator::getCandidateStats( +void SharedArbitrator::addPool(const std::shared_ptr& pool) { + VELOX_CHECK_EQ(pool->capacity(), 0); + { + std::unique_lock guard{poolLock_}; + VELOX_CHECK_EQ(candidates_.count(pool.get()), 0); + candidates_.emplace(pool.get(), pool); + } + + std::lock_guard l(stateLock_); + const uint64_t maxBytesToReserve = + std::min(maxGrowCapacity(*pool), memoryPoolInitialCapacity_); + const uint64_t minBytesToReserve = minGrowCapacity(*pool); + const uint64_t reservedBytes = + decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); + try { + checkedGrow(pool.get(), reservedBytes, 0); + } catch (const VeloxRuntimeError&) { + incrementFreeCapacityLocked(reservedBytes); + } +} + +void SharedArbitrator::removePool(MemoryPool* pool) { + VELOX_CHECK_EQ(pool->reservedBytes(), 0); + shrinkCapacity(pool, pool->capacity()); + + std::unique_lock guard{poolLock_}; + const auto ret = candidates_.erase(pool); + VELOX_CHECK_EQ(ret, 1); +} + +void SharedArbitrator::getCandidates( ArbitrationOperation* op, bool freeCapacityOnly) { op->candidates.clear(); - op->candidates.reserve(op->candidatePools.size()); - for (const auto& pool : op->candidatePools) { - const bool selfCandidate = op->requestRoot == pool.get(); + + std::shared_lock guard{poolLock_}; + op->candidates.reserve(candidates_.size()); + for (const auto& candidate : candidates_) { + const bool selfCandidate = op->requestRoot == candidate.first; + std::shared_ptr pool = candidate.second.lock(); + if (pool == nullptr) { + VELOX_CHECK(!selfCandidate); + continue; + } op->candidates.push_back( - {freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool, selfCandidate), + {pool, + freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool, selfCandidate), reclaimableFreeCapacity(*pool, selfCandidate), - pool->reservedBytes(), - pool.get()}); + pool->reservedBytes()}); } + VELOX_CHECK(!op->candidates.empty()); +} + +void SharedArbitrator::sortCandidatesByReclaimableFreeCapacity( + std::vector& candidates) { + std::sort( + candidates.begin(), + candidates.end(), + [&](const SharedArbitrator::Candidate& lhs, + const SharedArbitrator::Candidate& rhs) { + return lhs.freeBytes > rhs.freeBytes; + }); + + TestValue::adjust( + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableFreeCapacity", + &candidates); +} + +void SharedArbitrator::sortCandidatesByReclaimableUsedCapacity( + std::vector& candidates) { + std::sort( + candidates.begin(), + candidates.end(), + [](const SharedArbitrator::Candidate& lhs, + const SharedArbitrator::Candidate& rhs) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + }); + + TestValue::adjust( + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", + &candidates); +} + +void SharedArbitrator::sortCandidatesByUsage( + std::vector& candidates) { + std::sort( + candidates.begin(), + candidates.end(), + [](const SharedArbitrator::Candidate& lhs, + const SharedArbitrator::Candidate& rhs) { + return lhs.reservedBytes > rhs.reservedBytes; + }); +} + +const SharedArbitrator::Candidate& +SharedArbitrator::findCandidateWithLargestCapacity( + MemoryPool* requestor, + uint64_t targetBytes, + const std::vector& candidates) { + VELOX_CHECK(!candidates.empty()); + int32_t candidateIdx{-1}; + uint64_t maxCapacity{0}; + for (int32_t i = 0; i < candidates.size(); ++i) { + const bool isCandidate = candidates[i].pool.get() == requestor; + // For capacity comparison, the requestor's capacity should include both its + // current capacity and the capacity growth. + const uint64_t capacity = + candidates[i].pool->capacity() + (isCandidate ? targetBytes : 0); + if (i == 0) { + candidateIdx = 0; + maxCapacity = capacity; + continue; + } + if (capacity < maxCapacity) { + continue; + } + if (capacity > maxCapacity) { + candidateIdx = i; + maxCapacity = capacity; + continue; + } + // With the same amount of capacity, we prefer to kill the requestor itself + // without affecting the other query. + if (isCandidate) { + candidateIdx = i; + } + } + VELOX_CHECK_NE(candidateIdx, -1); + return candidates[candidateIdx]; } void SharedArbitrator::updateArbitrationRequestStats() { @@ -298,30 +343,12 @@ int64_t SharedArbitrator::minGrowCapacity(const MemoryPool& pool) const { pool.capacity()); } -uint64_t SharedArbitrator::growCapacity( - MemoryPool* pool, - uint64_t requestBytes) { - std::lock_guard l(mutex_); - ++numReserves_; - const int64_t maxBytesToReserve = - std::min(maxGrowCapacity(*pool), requestBytes); - const int64_t minBytesToReserve = minGrowCapacity(*pool); - uint64_t reservedBytes = - decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); - try { - checkedGrow(pool, reservedBytes, 0); - } catch (const VeloxRuntimeError&) { - reservedBytes = 0; - } - return reservedBytes; -} - uint64_t SharedArbitrator::decrementFreeCapacity( uint64_t maxBytesToReserve, uint64_t minBytesToReserve) { uint64_t reservedBytes{0}; { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); reservedBytes = decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); } @@ -346,28 +373,27 @@ uint64_t SharedArbitrator::decrementFreeCapacityLocked( uint64_t SharedArbitrator::shrinkCapacity( MemoryPool* pool, uint64_t requestBytes) { - std::lock_guard l(mutex_); - ++numReleases_; + std::lock_guard l(stateLock_); + ++numShrinks_; const uint64_t freedBytes = shrinkPool(pool, requestBytes); incrementFreeCapacityLocked(freedBytes); return freedBytes; } uint64_t SharedArbitrator::shrinkCapacity( - const std::vector>& pools, uint64_t requestBytes, bool allowSpill, bool allowAbort) { incrementGlobalArbitrationCount(); requestBytes = requestBytes == 0 ? capacity_ : requestBytes; - ArbitrationOperation op(requestBytes, pools); + ArbitrationOperation op(requestBytes); ScopedArbitration scopedArbitration(this, &op); uint64_t fastReclaimTargetBytes = std::max(memoryPoolTransferCapacity_, requestBytes); std::lock_guard exclusiveLock(arbitrationLock_); - getCandidateStats(&op); + getCandidates(&op); uint64_t freedBytes = reclaimFreeMemoryFromCandidates(&op, fastReclaimTargetBytes, false); auto freeGuard = folly::makeGuard([&]() { @@ -387,7 +413,7 @@ uint64_t SharedArbitrator::shrinkCapacity( } if (allowAbort) { // Candidate stats may change after spilling. - getCandidateStats(&op); + getCandidates(&op); } } if (allowAbort) { @@ -397,7 +423,7 @@ uint64_t SharedArbitrator::shrinkCapacity( } void SharedArbitrator::testingFreeCapacity(uint64_t capacity) { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); incrementFreeCapacityLocked(capacity); } @@ -405,11 +431,8 @@ uint64_t SharedArbitrator::testingNumRequests() const { return numRequests_; } -bool SharedArbitrator::growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t requestBytes) { - ArbitrationOperation op(pool, requestBytes, candidatePools); +bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) { + ArbitrationOperation op(pool, requestBytes); ScopedArbitration scopedArbitration(this, &op); bool needGlobalArbitration{false}; @@ -482,7 +505,7 @@ bool SharedArbitrator::runLocalArbitration( } VELOX_CHECK_LT(freedBytes, maxGrowTarget); - getCandidateStats(op, true); + getCandidates(op, /*freeCapacityOnly=*/true); freedBytes += reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, true); if (freedBytes >= op->requestBytes) { @@ -604,7 +627,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) { bool SharedArbitrator::handleOOM(ArbitrationOperation* op) { MemoryPool* victim = findCandidateWithLargestCapacity( op->requestRoot, op->requestBytes, op->candidates) - .pool; + .pool.get(); if (op->requestRoot == victim) { VELOX_MEM_LOG(ERROR) << "Requestor memory pool " << op->requestRoot->name() @@ -665,7 +688,7 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) { VELOX_CHECK_LT(freedBytes, maxGrowTarget); // Get refreshed stats before the global memory arbitration run. - getCandidateStats(op); + getCandidates(op); freedBytes += reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, false); @@ -704,15 +727,15 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( // Sort candidate memory pools based on their reclaimable free capacity. sortCandidatesByReclaimableFreeCapacity(op->candidates); - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); uint64_t reclaimedBytes{0}; for (const auto& candidate : op->candidates) { VELOX_CHECK_LT(reclaimedBytes, reclaimTargetBytes); if (candidate.freeBytes == 0) { break; } - if (isLocalArbitration && (candidate.pool != op->requestRoot) && - isUnderArbitrationLocked(candidate.pool)) { + if (isLocalArbitration && (candidate.pool.get() != op->requestRoot) && + isUnderArbitrationLocked(candidate.pool.get())) { // If the reclamation is for local arbitration and the candidate pool is // also under arbitration processing, then we can't reclaim from the // candidate pool as it might cause concurrent changes to the candidate @@ -722,11 +745,11 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( const int64_t bytesToReclaim = std::min( reclaimTargetBytes - reclaimedBytes, reclaimableFreeCapacity( - *candidate.pool, candidate.pool == op->requestRoot)); + *candidate.pool, candidate.pool.get() == op->requestRoot)); if (bytesToReclaim <= 0) { continue; } - reclaimedBytes += shrinkPool(candidate.pool, bytesToReclaim); + reclaimedBytes += shrinkPool(candidate.pool.get(), bytesToReclaim); if (reclaimedBytes >= reclaimTargetBytes) { break; } @@ -746,7 +769,8 @@ void SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill( if (candidate.reclaimableBytes == 0) { break; } - freedBytes += reclaim(candidate.pool, op->requestBytes - freedBytes, false); + freedBytes += + reclaim(candidate.pool.get(), op->requestBytes - freedBytes, false); if ((freedBytes >= op->requestBytes) || (op->requestRoot != nullptr && op->requestRoot->aborted())) { break; @@ -772,9 +796,9 @@ void SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort( candidate.pool->toString(), candidate.pool->treeMemoryUsage())); } catch (VeloxRuntimeError&) { - abort(candidate.pool, std::current_exception()); + abort(candidate.pool.get(), std::current_exception()); } - freedBytes += shrinkPool(candidate.pool, 0); + freedBytes += shrinkPool(candidate.pool.get(), 0); if (freedBytes >= op->requestBytes) { break; } @@ -849,7 +873,7 @@ void SharedArbitrator::abort( } void SharedArbitrator::incrementFreeCapacity(uint64_t bytes) { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); incrementFreeCapacityLocked(bytes); } @@ -876,7 +900,7 @@ void SharedArbitrator::incrementFreeReservedCapacityLocked(uint64_t& bytes) { } MemoryArbitrator::Stats SharedArbitrator::stats() const { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); return statsLocked(); } @@ -894,13 +918,12 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const { stats.freeReservedCapacityBytes = freeReservedCapacity_; stats.reclaimTimeUs = reclaimTimeUs_; stats.numNonReclaimableAttempts = numNonReclaimableAttempts_; - stats.numReserves = numReserves_; - stats.numReleases = numReleases_; + stats.numShrinks = numShrinks_; return stats; } std::string SharedArbitrator::toString() const { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); return toStringLocked(); } @@ -991,7 +1014,7 @@ void SharedArbitrator::startArbitration(ArbitrationOperation* op) { updateArbitrationRequestStats(); ContinueFuture waitPromise{ContinueFuture::makeEmpty()}; { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); ++numPending_; if (op->requestPool != nullptr) { auto it = arbitrationQueues_.find(op->requestRoot); @@ -1024,7 +1047,7 @@ void SharedArbitrator::startArbitration(ArbitrationOperation* op) { void SharedArbitrator::finishArbitration(ArbitrationOperation* op) { ContinuePromise resumePromise{ContinuePromise::makeEmpty()}; { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); VELOX_CHECK_GT(numPending_, 0); --numPending_; if (op->requestPool != nullptr) { @@ -1048,11 +1071,6 @@ void SharedArbitrator::finishArbitration(ArbitrationOperation* op) { } } -bool SharedArbitrator::isUnderArbitration(MemoryPool* pool) const { - std::lock_guard l(mutex_); - return isUnderArbitrationLocked(pool); -} - bool SharedArbitrator::isUnderArbitrationLocked(MemoryPool* pool) const { return arbitrationQueues_.count(pool) != 0; } diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index dcd66eb677bd..f513a12825b0 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -19,6 +19,7 @@ #include "velox/common/memory/MemoryArbitrator.h" #include "velox/common/base/Counters.h" +#include "velox/common/base/GTestMacros.h" #include "velox/common/base/StatsReporter.h" #include "velox/common/future/VeloxPromise.h" #include "velox/common/memory/Memory.h" @@ -43,6 +44,14 @@ class SharedArbitrator : public memory::MemoryArbitrator { static int64_t getReservedCapacity( const std::unordered_map& configs); + /// The initial memory capacity to reserve for a newly created query memory + /// pool. + static constexpr std::string_view kMemoryPoolInitialCapacity{ + "memory-pool-initial-capacity"}; + static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; + static uint64_t getMemoryPoolInitialCapacity( + const std::unordered_map& configs); + /// The minimal amount of memory capacity reserved for each query to run. static constexpr std::string_view kMemoryPoolReservedCapacity{ "memory-pool-reserved-capacity"}; @@ -95,17 +104,15 @@ class SharedArbitrator : public memory::MemoryArbitrator { static void unregisterFactory(); - uint64_t growCapacity(MemoryPool* pool, uint64_t requestBytes) final; + void addPool(const std::shared_ptr& pool) final; - bool growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t requestBytes) final; + void removePool(MemoryPool* pool) final; + + bool growCapacity(MemoryPool* pool, uint64_t requestBytes) final; uint64_t shrinkCapacity(MemoryPool* pool, uint64_t requestBytes) final; uint64_t shrinkCapacity( - const std::vector>& pools, uint64_t requestBytes, bool allowSpill = true, bool force = false) override final; @@ -116,16 +123,6 @@ class SharedArbitrator : public memory::MemoryArbitrator { std::string toString() const final; - /// The candidate memory pool stats used by arbitration. - struct Candidate { - int64_t reclaimableBytes{0}; - int64_t freeBytes{0}; - int64_t reservedBytes{0}; - MemoryPool* pool; - - std::string toString() const; - }; - /// Returns 'freeCapacity' back to the arbitrator for testing. void testingFreeCapacity(uint64_t freeCapacity); @@ -151,6 +148,16 @@ class SharedArbitrator : public memory::MemoryArbitrator { static inline const std::string kGlobalArbitrationLockWaitWallNanos{ "globalArbitrationLockWaitWallNanos"}; + /// The candidate memory pool stats used by arbitration. + struct Candidate { + std::shared_ptr pool; + int64_t reclaimableBytes{0}; + int64_t freeBytes{0}; + int64_t reservedBytes{0}; + + std::string toString() const; + }; + private: // The kind string of shared arbitrator. inline static const std::string kind_{"SHARED"}; @@ -159,12 +166,11 @@ class SharedArbitrator : public memory::MemoryArbitrator { struct ArbitrationOperation { MemoryPool* const requestPool; MemoryPool* const requestRoot; - const std::vector>& candidatePools; const uint64_t requestBytes; // The start time of this arbitration operation. const std::chrono::steady_clock::time_point startTime; - // The stats of candidate memory pools used for memory arbitration. + // The candidate memory pools. std::vector candidates; // The time that waits in local arbitration queue. @@ -176,18 +182,12 @@ class SharedArbitrator : public memory::MemoryArbitrator { // The time that waits to acquire the global arbitration lock. uint64_t globalArbitrationLockWaitTimeUs{0}; - ArbitrationOperation( - uint64_t requestBytes, - const std::vector>& candidatePools) - : ArbitrationOperation(nullptr, requestBytes, candidatePools) {} + explicit ArbitrationOperation(uint64_t requestBytes) + : ArbitrationOperation(nullptr, requestBytes) {} - ArbitrationOperation( - MemoryPool* _requestor, - uint64_t _requestBytes, - const std::vector>& _candidatePools) + ArbitrationOperation(MemoryPool* _requestor, uint64_t _requestBytes) : requestPool(_requestor), requestRoot(_requestor == nullptr ? nullptr : _requestor->root()), - candidatePools(_candidatePools), requestBytes(_requestBytes), startTime(std::chrono::steady_clock::now()) {} @@ -282,12 +282,29 @@ class SharedArbitrator : public memory::MemoryArbitrator { uint64_t& maxGrowTarget, uint64_t& minGrowTarget); - // Invoked to get or refresh the memory stats of the candidate memory pools - // for arbitration. If 'freeCapacityOnly' is true, then we only get free - // capacity stats for each candidate memory pool. - void getCandidateStats( - ArbitrationOperation* op, - bool freeCapacityOnly = false); + // Invoked to get or refresh the candidate memory pools for arbitration. If + // 'freeCapacityOnly' is true, then we only get free capacity stats for each + // candidate memory pool. + void getCandidates(ArbitrationOperation* op, bool freeCapacityOnly = false); + + // Sorts 'candidates' based on reclaimable free capacity in descending order. + static void sortCandidatesByReclaimableFreeCapacity( + std::vector& candidates); + + // Sorts 'candidates' based on reclaimable used capacity in descending order. + static void sortCandidatesByReclaimableUsedCapacity( + std::vector& candidates); + + // Sorts 'candidates' based on actual used memory in descending order. + static void sortCandidatesByUsage(std::vector& candidates); + + // Finds the candidate with the largest capacity. For 'requestor', the + // capacity for comparison including its current capacity and the capacity to + // grow. + static const SharedArbitrator::Candidate& findCandidateWithLargestCapacity( + MemoryPool* requestor, + uint64_t targetBytes, + const std::vector& candidates); // Invoked to reclaim free memory capacity from 'candidates' without // actually freeing used memory. @@ -400,21 +417,25 @@ class SharedArbitrator : public memory::MemoryArbitrator { int64_t minGrowCapacity(const MemoryPool& pool) const; // Returns true if 'pool' is under memory arbitration. - bool isUnderArbitration(MemoryPool* pool) const; bool isUnderArbitrationLocked(MemoryPool* pool) const; void updateArbitrationRequestStats(); + void updateArbitrationFailureStats(); const uint64_t reservedCapacity_; + const uint64_t memoryPoolInitialCapacity_; const uint64_t memoryPoolReservedCapacity_; const uint64_t memoryPoolTransferCapacity_; const uint64_t memoryReclaimWaitMs_; const bool globalArbitrationEnabled_; const bool checkUsageLeak_; + mutable folly::SharedMutex poolLock_; + std::unordered_map> candidates_; + // Lock used to protect the arbitrator state. - mutable std::mutex mutex_; + mutable std::mutex stateLock_; tsan_atomic freeReservedCapacity_{0}; tsan_atomic freeNonReservedCapacity_{0}; @@ -440,7 +461,6 @@ class SharedArbitrator : public memory::MemoryArbitrator { tsan_atomic reclaimedUsedBytes_{0}; tsan_atomic reclaimTimeUs_{0}; tsan_atomic numNonReclaimableAttempts_{0}; - tsan_atomic numReserves_{0}; - tsan_atomic numReleases_{0}; + tsan_atomic numShrinks_{0}; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/MemoryAllocatorTest.cpp b/velox/common/memory/tests/MemoryAllocatorTest.cpp index f83d384a80eb..3c2f48f0c4cc 100644 --- a/velox/common/memory/tests/MemoryAllocatorTest.cpp +++ b/velox/common/memory/tests/MemoryAllocatorTest.cpp @@ -26,7 +26,6 @@ #include #include #include -#include #include #include diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index a5b88e541895..bb7f0bd78138 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -63,14 +63,14 @@ TEST_F(MemoryArbitrationTest, stats) { ASSERT_EQ( stats.toString(), "STATS[numRequests 2 numAborted 3 numFailures 100 " - "numNonReclaimableAttempts 5 numReserves 0 numReleases 0 " + "numNonReclaimableAttempts 5 numShrinks 0 " "queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms " "shrunkMemory 95.37MB reclaimedMemory 9.77KB " "maxCapacity 0B freeCapacity 1.95KB freeReservedCapacity 1000B]"); ASSERT_EQ( fmt::format("{}", stats), "STATS[numRequests 2 numAborted 3 numFailures 100 " - "numNonReclaimableAttempts 5 numReserves 0 numReleases 0 " + "numNonReclaimableAttempts 5 numShrinks 0 " "queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms " "shrunkMemory 95.37MB reclaimedMemory 9.77KB " "maxCapacity 0B freeCapacity 1.95KB freeReservedCapacity 1000B]"); @@ -135,33 +135,34 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { auto rootPool = manager.addRootPool("root-1", 8L << 20, MemoryReclaimer::create()); ASSERT_EQ(rootPool->capacity(), 1 << 20); - ASSERT_EQ( - manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20), 1 << 20); - ASSERT_EQ( - manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20), 2 << 20); + ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20)); + ASSERT_EQ(rootPool->capacity(), 1 << 20); + ASSERT_FALSE(manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20)); + ASSERT_EQ(rootPool->capacity(), 1 << 20); + ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 2 << 20)); + ASSERT_EQ(rootPool->capacity(), 4 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeCapacityBytes, 2 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeReservedCapacityBytes, 2 << 20); auto leafPool = rootPool->addLeafChild("leaf-1.0"); - void* buffer; VELOX_ASSERT_THROW( - buffer = leafPool->allocate(7L << 20), + leafPool->allocate(7L << 20), "Exceeded memory pool capacity after attempt to grow capacity through " "arbitration. Requestor pool name 'leaf-1.0', request size 7.00MB, " "memory pool capacity 4.00MB, memory pool max capacity 8.00MB"); - ASSERT_NO_THROW(buffer = leafPool->allocate(4L << 20)); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 0); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 1 << 20); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 1), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 1), 0); - leafPool->free(buffer, 4L << 20); + ASSERT_EQ(rootPool->capacity(), 3 << 20); + static_cast(rootPool.get())->testingSetReservation(0); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(leafPool.get(), 1 << 20), 1 << 20); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(rootPool.get(), 1 << 20), 1 << 20); - ASSERT_EQ(rootPool->capacity(), 2 << 20); - ASSERT_EQ(leafPool->capacity(), 2 << 20); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 2 << 20); + ASSERT_EQ(rootPool->capacity(), 1 << 20); + ASSERT_EQ(leafPool->capacity(), 1 << 20); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 1 << 20); ASSERT_EQ(rootPool->capacity(), 0); ASSERT_EQ(leafPool->capacity(), 0); } @@ -305,19 +306,17 @@ TEST_F(MemoryArbitrationTest, reservedCapacityFreeByPoolShrink) { pools.push_back(manager.addRootPool("", kMaxMemory)); ASSERT_GE(pools.back()->capacity(), 0); - ASSERT_EQ(arbitrator->shrinkCapacity(pools, 1 << 20), 2 << 20); - ASSERT_EQ(arbitrator->growCapacity(pools[numPools - 1].get(), 1 << 20), 0); - ASSERT_EQ(arbitrator->growCapacity(pools.back().get(), 2 << 20), 1 << 20); + ASSERT_EQ(arbitrator->shrinkCapacity(1 << 20), 2 << 20); } TEST_F(MemoryArbitrationTest, arbitratorStats) { const MemoryArbitrator::Stats emptyStats; ASSERT_TRUE(emptyStats.empty()); const MemoryArbitrator::Stats anchorStats( - 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); ASSERT_FALSE(anchorStats.empty()); const MemoryArbitrator::Stats largeStats( - 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); ASSERT_FALSE(largeStats.empty()); ASSERT_TRUE(!(anchorStats == largeStats)); ASSERT_TRUE(anchorStats != largeStats); @@ -327,11 +326,10 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(!(anchorStats >= largeStats)); const auto delta = largeStats - anchorStats; ASSERT_EQ( - delta, - MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 8, 3, 3, 3, 3)); + delta, MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 8, 3, 3, 3)); const MemoryArbitrator::Stats smallStats( - 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); ASSERT_TRUE(!(anchorStats == smallStats)); ASSERT_TRUE(anchorStats != smallStats); ASSERT_TRUE(!(anchorStats < smallStats)); @@ -340,7 +338,7 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(anchorStats >= smallStats); const MemoryArbitrator::Stats invalidStats( - 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 8, 2, 8, 2); + 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 2, 8, 2); ASSERT_TRUE(!(anchorStats == invalidStats)); ASSERT_TRUE(anchorStats != invalidStats); VELOX_ASSERT_THROW(anchorStats < invalidStats, ""); @@ -362,23 +360,16 @@ class FakeTestArbitrator : public MemoryArbitrator { return "USER"; } - uint64_t growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { - VELOX_NYI(); - return 0; - } + void addPool(const std::shared_ptr& /*unused*/) override {} - bool growCapacity( - MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + void removePool(MemoryPool* /*unused*/) override {} + + bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { VELOX_NYI(); } - uint64_t shrinkCapacity( - const std::vector>& /*unused*/, - uint64_t /*unused*/, - bool /*unused*/, - bool /*unused*/) override { + uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/) + override { VELOX_NYI(); } diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index e5c7c5eef9de..348ed8db3377 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -108,7 +108,7 @@ TEST_F(MemoryManagerTest, ctor) { "allocated pages 0 mapped pages 0]\n" "ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] " "STATS[numRequests 0 numAborted 0 numFailures 0 " - "numNonReclaimableAttempts 0 numReserves 0 numReleases 0 queueTime 0us " + "numNonReclaimableAttempts 0 numShrinks 0 queueTime 0us " "arbitrationTime 0us reclaimTime 0us shrunkMemory 0B " "reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]]"); } @@ -123,22 +123,16 @@ class FakeTestArbitrator : public MemoryArbitrator { .capacity = config.capacity, .extraConfigs = config.extraConfigs}) {} - uint64_t growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { - VELOX_NYI(); - } + void addPool(const std::shared_ptr& /*unused*/) override {} - bool growCapacity( - MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + void removePool(MemoryPool* /*unused*/) override {} + + bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { VELOX_NYI(); } - uint64_t shrinkCapacity( - const std::vector>& /*unused*/, - uint64_t /*unused*/, - bool /*unused*/, - bool /*unused*/) override { + uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/) + override { VELOX_NYI(); } diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 182074d42ee0..19da53e346fd 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -2446,6 +2446,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, orderedArbitration) { (*candidates)[i - 1].reclaimableBytes); } }))); + folly::Random::DefaultGenerator rng; rng.seed(512); const uint64_t memCapacity = 512 * MB; diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 06b4f143082f..9eaf70010021 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -526,7 +526,6 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); - ASSERT_EQ(arbitrator_->stats().numReserves, numAddedPools_); ASSERT_GT(orderByQueryCtx->pool()->stats().numCapacityGrowths, 0); } } @@ -629,7 +628,6 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); - ASSERT_EQ(newStats.numReserves, numAddedPools_); } } @@ -742,7 +740,6 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); - ASSERT_EQ(arbitrator_->stats().numReserves, numAddedPools_); } } @@ -1289,10 +1286,8 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { threads.emplace_back([&]() { { std::lock_guard l(mutex); - auto oldNum = arbitrator_->stats().numReserves; queries.emplace_back( newQueryCtx(memoryManager_.get(), executor_.get())); - ASSERT_EQ(arbitrator_->stats().numReserves, oldNum + 1); } }); } @@ -1300,11 +1295,9 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { for (auto& queryThread : threads) { queryThread.join(); } - ASSERT_EQ(arbitrator_->stats().numReserves, numRootPools); - ASSERT_EQ(arbitrator_->stats().numReleases, 0); + ASSERT_EQ(arbitrator_->stats().numShrinks, 0); } - ASSERT_EQ(arbitrator_->stats().numReserves, numRootPools); - ASSERT_EQ(arbitrator_->stats().numReleases, numRootPools); + ASSERT_EQ(arbitrator_->stats().numShrinks, numRootPools); } } } // namespace facebook::velox::memory diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index f8f00d5303a4..5331f57837a7 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7287,7 +7287,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, joinBuildSpillError) { waitForAllTasksToBeDeleted(); ASSERT_EQ(arbitrator->stats().numFailures, 1); - ASSERT_EQ(arbitrator->stats().numReserves, 1); // Wait again here as this test uses on-demand created memory manager instead // of the global one. We need to make sure any used memory got cleaned up diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index a3b97f979d34..27612fc1ca2b 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -3742,8 +3742,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { ASSERT_GE(arbitrator->stats().numReclaimedBytes, testParam.bytesToReserve); waitForAllTasksToBeDeleted(3'000'000); queryCtx.reset(); - ASSERT_EQ(arbitrator->stats().numReserves, 1); - ASSERT_EQ(arbitrator->stats().numReleases, 1); + ASSERT_EQ(arbitrator->stats().numShrinks, 1); } } @@ -3823,7 +3822,6 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator->stats().numFailures, 1); ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); - ASSERT_EQ(arbitrator->stats().numReserves, 1); waitForAllTasksToBeDeleted(); } @@ -3916,7 +3914,6 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); ASSERT_EQ(arbitrator->stats().numFailures, 0); ASSERT_GT(arbitrator->stats().numReclaimedBytes, 0); - ASSERT_EQ(arbitrator->stats().numReserves, 1); waitForAllTasksToBeDeleted(); } @@ -4014,7 +4011,6 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator->stats().numFailures, 1); ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); - ASSERT_EQ(arbitrator->stats().numReserves, 1); const auto updatedSpillStats = common::globalSpillStats(); ASSERT_EQ(updatedSpillStats, spillStats); waitForAllTasksToBeDeleted();