diff --git a/velox/common/base/tests/StatsReporterTest.cpp b/velox/common/base/tests/StatsReporterTest.cpp index 3b199e64cb3e..bf268d1a78ef 100644 --- a/velox/common/base/tests/StatsReporterTest.cpp +++ b/velox/common/base/tests/StatsReporterTest.cpp @@ -242,15 +242,13 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator { return "test"; } - uint64_t growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/) - override { - return 0; + void addPool(const std::shared_ptr& /*unused*/) override { } - bool growCapacity( - memory::MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + void removePool(memory::MemoryPool* /*unused*/) override {} + + bool growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/) + override { return false; } @@ -259,11 +257,8 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator { return 0; } - uint64_t shrinkCapacity( - const std::vector>& /*unused*/, - uint64_t /*unused*/, - bool /*unused*/, - bool /*unused*/) override { + uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/) + override { return 0; } @@ -546,7 +541,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { .sumEvictScore = 10, .ssdStats = newSsdStats}); arbitrator.updateStats(memory::MemoryArbitrator::Stats( - 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10)); + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10)); std::this_thread::sleep_for(std::chrono::milliseconds(4'000)); // Stop right after sufficient wait to ensure the following reads from main diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 31071b6d5a2f..b73575d546b2 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -76,6 +76,8 @@ std::unique_ptr createArbitrator( // code will be removed. extraArbitratorConfigs["reserved-capacity"] = folly::to(options.arbitratorReservedCapacity); + extraArbitratorConfigs["memory-pool-initial-capacity"] = + folly::to(options.memoryPoolInitCapacity); extraArbitratorConfigs["memory-pool-reserved-capacity"] = folly::to(options.memoryPoolReservedCapacity); extraArbitratorConfigs["memory-pool-transfer-capacity"] = @@ -271,8 +273,7 @@ std::shared_ptr MemoryManager::addRootPool( options); pools_.emplace(poolName, pool); VELOX_CHECK_EQ(pool->capacity(), 0); - arbitrator_->growCapacity( - pool.get(), std::min(poolInitCapacity_, maxCapacity)); + arbitrator_->addPool(pool); RECORD_HISTOGRAM_METRIC_VALUE( kMetricMemoryPoolInitialCapacityBytes, pool->capacity()); return pool; @@ -292,15 +293,14 @@ std::shared_ptr MemoryManager::addLeafPool( bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) { VELOX_CHECK_NOT_NULL(pool); VELOX_CHECK_NE(pool->capacity(), kMaxMemory); - return arbitrator_->growCapacity(pool, getAlivePools(), incrementBytes); + return arbitrator_->growCapacity(pool, incrementBytes); } uint64_t MemoryManager::shrinkPools( uint64_t targetBytes, bool allowSpill, bool allowAbort) { - return arbitrator_->shrinkCapacity( - getAlivePools(), targetBytes, allowSpill, allowAbort); + return arbitrator_->shrinkCapacity(targetBytes, allowSpill, allowAbort); } void MemoryManager::dropPool(MemoryPool* pool) { @@ -312,7 +312,7 @@ void MemoryManager::dropPool(MemoryPool* pool) { } pools_.erase(it); VELOX_DCHECK_EQ(pool->reservedBytes(), 0); - arbitrator_->shrinkCapacity(pool, 0); + arbitrator_->removePool(pool); } MemoryPool& MemoryManager::deprecatedSharedLeafPool() { diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 106b79e82c3c..edac6dc1973f 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -91,19 +91,18 @@ class NoopArbitrator : public MemoryArbitrator { return "NOOP"; } - // Noop arbitrator has no memory capacity limit so no operation needed for - // memory pool capacity reserve. - uint64_t growCapacity(MemoryPool* pool, uint64_t /*unused*/) override { - growPool(pool, pool->maxCapacity(), 0); - return pool->capacity(); + void addPool(const std::shared_ptr& pool) override { + VELOX_CHECK_EQ(pool->capacity(), 0); + growPool(pool.get(), pool->maxCapacity(), 0); + } + + void removePool(MemoryPool* pool) override { + VELOX_CHECK_EQ(pool->reservedBytes(), 0); } // Noop arbitrator has no memory capacity limit so no operation needed for // memory pool capacity grow. - bool growCapacity( - MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { return false; } @@ -117,7 +116,6 @@ class NoopArbitrator : public MemoryArbitrator { // Noop arbitrator has no memory capacity limit so no operation needed for // memory pool capacity shrink. uint64_t shrinkCapacity( - const std::vector>& /* unused */, uint64_t /* unused */, bool /* unused */, bool /* unused */) override { @@ -336,8 +334,7 @@ MemoryArbitrator::Stats::Stats( uint64_t _freeReservedCapacityBytes, uint64_t _reclaimTimeUs, uint64_t _numNonReclaimableAttempts, - uint64_t _numReserves, - uint64_t _numReleases) + uint64_t _numShrinks) : numRequests(_numRequests), numSucceeded(_numSucceeded), numAborted(_numAborted), @@ -351,21 +348,19 @@ MemoryArbitrator::Stats::Stats( freeReservedCapacityBytes(_freeReservedCapacityBytes), reclaimTimeUs(_reclaimTimeUs), numNonReclaimableAttempts(_numNonReclaimableAttempts), - numReserves(_numReserves), - numReleases(_numReleases) {} + numShrinks(_numShrinks) {} std::string MemoryArbitrator::Stats::toString() const { return fmt::format( "STATS[numRequests {} numAborted {} numFailures {} " - "numNonReclaimableAttempts {} numReserves {} numReleases {} " + "numNonReclaimableAttempts {} numShrinks {} " "queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} " "reclaimedMemory {} maxCapacity {} freeCapacity {} freeReservedCapacity {}]", numRequests, numAborted, numFailures, numNonReclaimableAttempts, - numReserves, - numReleases, + numShrinks, succinctMicros(queueTimeUs), succinctMicros(arbitrationTimeUs), succinctMicros(reclaimTimeUs), @@ -393,8 +388,7 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-( result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs; result.numNonReclaimableAttempts = numNonReclaimableAttempts - other.numNonReclaimableAttempts; - result.numReserves = numReserves - other.numReserves; - result.numReleases = numReleases - other.numReleases; + result.numShrinks = numShrinks - other.numShrinks; return result; } @@ -413,8 +407,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { freeReservedCapacityBytes, reclaimTimeUs, numNonReclaimableAttempts, - numReserves, - numReleases) == + numShrinks) == std::tie( other.numRequests, other.numSucceeded, @@ -429,8 +422,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { other.freeReservedCapacityBytes, other.reclaimTimeUs, other.numNonReclaimableAttempts, - other.numReserves, - other.numReleases); + other.numShrinks); } bool MemoryArbitrator::Stats::operator!=(const Stats& other) const { @@ -459,8 +451,7 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const { UPDATE_COUNTER(numReclaimedBytes); UPDATE_COUNTER(reclaimTimeUs); UPDATE_COUNTER(numNonReclaimableAttempts); - UPDATE_COUNTER(numReserves); - UPDATE_COUNTER(numReleases); + UPDATE_COUNTER(numShrinks); #undef UPDATE_COUNTER VELOX_CHECK( !((gtCount > 0) && (ltCount > 0)), diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 01cd82393705..be4768ab0d95 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -57,11 +57,6 @@ class MemoryArbitrator { /// manager. int64_t capacity; - /// TODO: Remove this call back from Config. It can be directly implemented - /// in SharedArbitrator instead of passing in as an config option. Provided - /// - /// by the query system to validate the state after a memory pool enters - /// arbitration if not null. For instance, Prestissimo provides /// callback to check if a memory arbitration request is issued from a /// driver thread, then the driver should be put in suspended state to avoid /// the potential deadlock when reclaim memory from the task of the request @@ -107,29 +102,23 @@ class MemoryArbitrator { virtual ~MemoryArbitrator() = default; - /// Invoked by the memory manager to allocate up to 'targetBytes' of free - /// memory capacity without triggering memory arbitration. The function will - /// grow the memory pool's capacity based on the free available memory - /// capacity in the arbitrator, and returns the actual grown capacity in - /// bytes. - virtual uint64_t growCapacity(MemoryPool* pool, uint64_t bytes) = 0; + /// Invoked by the memory manager to add a newly created memory pool. The + /// memory arbitrator allocates the initial capacity for 'pool' and + /// dynamically adjusts its capacity based query memory needs through memory + /// arbitration. + virtual void addPool(const std::shared_ptr& pool) = 0; + + /// Invoked by the memory manager to remove a destroyed memory pool. The + /// memory arbitrator frees up all its capacity and stops memory arbitration + /// operation on it. + virtual void removePool(MemoryPool* pool) = 0; /// Invoked by the memory manager to grow a memory pool's capacity. - /// 'pool' is the memory pool to request to grow. 'candidates' is a list - /// of query root pools to participate in the memory arbitration. The memory - /// arbitrator picks up a number of pools to either shrink its memory capacity - /// without actually freeing memory or reclaim its used memory to free up - /// enough memory for 'requestor' to grow. Different arbitrators use different - /// policies to select the candidate pools. The shared memory arbitrator used - /// by both Prestissimo and Prestissimo-on-Spark, selects the candidates with - /// more memory capacity. - /// - /// NOTE: the memory manager keeps 'candidates' valid during the arbitration - /// processing. - virtual bool growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t targetBytes) = 0; + /// 'pool' is the memory pool to request to grow. The memory arbitrator picks + /// up a number of pools to either shrink its memory capacity without actually + /// freeing memory or reclaim its used memory to free up enough memory for + /// 'requestor' to grow. + virtual bool growCapacity(MemoryPool* pool, uint64_t targetBytes) = 0; /// Invoked by the memory manager to shrink up to 'targetBytes' free capacity /// from a memory 'pool', and returns them back to the arbitrator. If @@ -137,17 +126,16 @@ class MemoryArbitrator { /// pool. The function returns the actual freed capacity from 'pool'. virtual uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) = 0; - /// Invoked by the memory manager to shrink memory capacity from a given list - /// of memory pools by reclaiming free and used memory. The freed memory - /// capacity is given back to the arbitrator. If 'targetBytes' is zero, then - /// try to reclaim all the memory from 'pools'. The function returns the - /// actual freed memory capacity in bytes. If 'allowSpill' is true, it - /// reclaims the used memory by spilling. If 'allowAbort' is true, it reclaims - /// the used memory by aborting the queries with the most memory usage. If - /// both are true, it first reclaims the used memory by spilling and then - /// abort queries to reach the reclaim target. + /// Invoked by the memory manager to shrink memory capacity from memory pools + /// by reclaiming free and used memory. The freed memory capacity is given + /// back to the arbitrator. If 'targetBytes' is zero, then try to reclaim all + /// the memory from 'pools'. The function returns the actual freed memory + /// capacity in bytes. If 'allowSpill' is true, it reclaims the used memory by + /// spilling. If 'allowAbort' is true, it reclaims the used memory by aborting + /// the queries with the most memory usage. If both are true, it first + /// reclaims the used memory by spilling and then abort queries to reach the + /// reclaim target. virtual uint64_t shrinkCapacity( - const std::vector>& pools, uint64_t targetBytes, bool allowSpill = true, bool allowAbort = false) = 0; @@ -183,10 +171,8 @@ class MemoryArbitrator { /// The total number of times of the reclaim attempts that end up failing /// due to reclaiming at non-reclaimable stage. uint64_t numNonReclaimableAttempts{0}; - /// The total number of memory reservations. - uint64_t numReserves{0}; - /// The total number of memory releases. - uint64_t numReleases{0}; + /// The total number of memory capacity shrinks. + uint64_t numShrinks{0}; Stats( uint64_t _numRequests, @@ -202,8 +188,7 @@ class MemoryArbitrator { uint64_t _freeReservedCapacityBytes, uint64_t _reclaimTimeUs, uint64_t _numNonReclaimableAttempts, - uint64_t _numReserves, - uint64_t _numReleases); + uint64_t _numShrinks); Stats() = default; diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index b8e5d4d43a39..e9d429be0d01 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -63,86 +63,6 @@ std::string memoryPoolAbortMessage( return out.str(); } -void sortCandidatesByReclaimableFreeCapacity( - std::vector& candidates) { - std::sort( - candidates.begin(), - candidates.end(), - [&](const SharedArbitrator::Candidate& lhs, - const SharedArbitrator::Candidate& rhs) { - return lhs.freeBytes > rhs.freeBytes; - }); - - TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableFreeCapacity", - &candidates); -} - -void sortCandidatesByReclaimableUsedCapacity( - std::vector& candidates) { - std::sort( - candidates.begin(), - candidates.end(), - [](const SharedArbitrator::Candidate& lhs, - const SharedArbitrator::Candidate& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; - }); - - TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", - &candidates); -} - -void sortCandidatesByUsage( - std::vector& candidates) { - std::sort( - candidates.begin(), - candidates.end(), - [](const SharedArbitrator::Candidate& lhs, - const SharedArbitrator::Candidate& rhs) { - return lhs.reservedBytes > rhs.reservedBytes; - }); -} - -// Finds the candidate with the largest capacity. For 'requestor', the -// capacity for comparison including its current capacity and the capacity to -// grow. -const SharedArbitrator::Candidate& findCandidateWithLargestCapacity( - MemoryPool* requestor, - uint64_t targetBytes, - const std::vector& candidates) { - VELOX_CHECK(!candidates.empty()); - int32_t candidateIdx{-1}; - int64_t maxCapacity{-1}; - for (int32_t i = 0; i < candidates.size(); ++i) { - const bool isCandidate = candidates[i].pool == requestor; - // For capacity comparison, the requestor's capacity should include both its - // current capacity and the capacity growth. - const int64_t capacity = - candidates[i].pool->capacity() + (isCandidate ? targetBytes : 0); - if (i == 0) { - candidateIdx = 0; - maxCapacity = capacity; - continue; - } - if (capacity < maxCapacity) { - continue; - } - if (capacity > maxCapacity) { - candidateIdx = i; - maxCapacity = capacity; - continue; - } - // With the same amount of capacity, we prefer to kill the requestor itself - // without affecting the other query. - if (isCandidate) { - candidateIdx = i; - } - } - VELOX_CHECK_NE(candidateIdx, -1); - return candidates[candidateIdx]; -} - template T getConfig( const std::unordered_map& configs, @@ -166,6 +86,12 @@ int64_t SharedArbitrator::ExtraConfig::getReservedCapacity( configs, kReservedCapacity, kDefaultReservedCapacity); } +uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity( + const std::unordered_map& configs) { + return getConfig( + configs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity); +} + uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity( const std::unordered_map& configs) { return getConfig( @@ -198,6 +124,8 @@ bool SharedArbitrator::ExtraConfig::getCheckUsageLeak( SharedArbitrator::SharedArbitrator(const Config& config) : MemoryArbitrator(config), reservedCapacity_(ExtraConfig::getReservedCapacity(config.extraConfigs)), + memoryPoolInitialCapacity_( + ExtraConfig::getMemoryPoolInitialCapacity(config.extraConfigs)), memoryPoolReservedCapacity_( ExtraConfig::getMemoryPoolReservedCapacity(config.extraConfigs)), memoryPoolTransferCapacity_( @@ -216,12 +144,13 @@ SharedArbitrator::SharedArbitrator(const Config& config) std::string SharedArbitrator::Candidate::toString() const { return fmt::format( "CANDIDATE[{}] RECLAIMABLE_BYTES[{}] FREE_BYTES[{}]]", - pool->root()->name(), + pool->name(), succinctBytes(reclaimableBytes), succinctBytes(freeBytes)); } SharedArbitrator::~SharedArbitrator() { + VELOX_CHECK(candidates_.empty()); if (freeNonReservedCapacity_ + freeReservedCapacity_ != capacity_) { const std::string errMsg = fmt::format( "Unexpected free capacity leak in arbitrator: freeNonReservedCapacity_[{}] + freeReservedCapacity_[{}] != capacity_[{}])\\n{}", @@ -237,19 +166,135 @@ SharedArbitrator::~SharedArbitrator() { } } -void SharedArbitrator::getCandidateStats( +void SharedArbitrator::addPool(const std::shared_ptr& pool) { + VELOX_CHECK_EQ(pool->capacity(), 0); + { + std::unique_lock guard{poolLock_}; + VELOX_CHECK_EQ(candidates_.count(pool.get()), 0); + candidates_.emplace(pool.get(), pool); + } + + std::lock_guard l(stateLock_); + const uint64_t maxBytesToReserve = + std::min(maxGrowCapacity(*pool), memoryPoolInitialCapacity_); + const uint64_t minBytesToReserve = minGrowCapacity(*pool); + const uint64_t reservedBytes = + decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); + try { + checkedGrow(pool.get(), reservedBytes, 0); + } catch (const VeloxRuntimeError&) { + incrementFreeCapacityLocked(reservedBytes); + } +} + +void SharedArbitrator::removePool(MemoryPool* pool) { + VELOX_CHECK_EQ(pool->reservedBytes(), 0); + shrinkCapacity(pool, pool->capacity()); + + std::unique_lock guard{poolLock_}; + const auto ret = candidates_.erase(pool); + VELOX_CHECK_EQ(ret, 1); +} + +void SharedArbitrator::getCandidates( ArbitrationOperation* op, bool freeCapacityOnly) { op->candidates.clear(); - op->candidates.reserve(op->candidatePools.size()); - for (const auto& pool : op->candidatePools) { - const bool selfCandidate = op->requestRoot == pool.get(); + + std::shared_lock guard{poolLock_}; + op->candidates.reserve(candidates_.size()); + for (const auto& candidate : candidates_) { + const bool selfCandidate = op->requestRoot == candidate.first; + std::shared_ptr pool = candidate.second.lock(); + if (pool == nullptr) { + VELOX_CHECK(!selfCandidate); + continue; + } op->candidates.push_back( - {freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool, selfCandidate), + {pool, + freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool, selfCandidate), reclaimableFreeCapacity(*pool, selfCandidate), - pool->reservedBytes(), - pool.get()}); + pool->reservedBytes()}); } + VELOX_CHECK(!op->candidates.empty()); +} + +void SharedArbitrator::sortCandidatesByReclaimableFreeCapacity( + std::vector& candidates) { + std::sort( + candidates.begin(), + candidates.end(), + [&](const SharedArbitrator::Candidate& lhs, + const SharedArbitrator::Candidate& rhs) { + return lhs.freeBytes > rhs.freeBytes; + }); + + TestValue::adjust( + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableFreeCapacity", + &candidates); +} + +void SharedArbitrator::sortCandidatesByReclaimableUsedCapacity( + std::vector& candidates) { + std::sort( + candidates.begin(), + candidates.end(), + [](const SharedArbitrator::Candidate& lhs, + const SharedArbitrator::Candidate& rhs) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + }); + + TestValue::adjust( + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", + &candidates); +} + +void SharedArbitrator::sortCandidatesByUsage( + std::vector& candidates) { + std::sort( + candidates.begin(), + candidates.end(), + [](const SharedArbitrator::Candidate& lhs, + const SharedArbitrator::Candidate& rhs) { + return lhs.reservedBytes > rhs.reservedBytes; + }); +} + +const SharedArbitrator::Candidate& +SharedArbitrator::findCandidateWithLargestCapacity( + MemoryPool* requestor, + uint64_t targetBytes, + const std::vector& candidates) { + VELOX_CHECK(!candidates.empty()); + int32_t candidateIdx{-1}; + uint64_t maxCapacity{0}; + for (int32_t i = 0; i < candidates.size(); ++i) { + const bool isCandidate = candidates[i].pool.get() == requestor; + // For capacity comparison, the requestor's capacity should include both its + // current capacity and the capacity growth. + const uint64_t capacity = + candidates[i].pool->capacity() + (isCandidate ? targetBytes : 0); + if (i == 0) { + candidateIdx = 0; + maxCapacity = capacity; + continue; + } + if (capacity < maxCapacity) { + continue; + } + if (capacity > maxCapacity) { + candidateIdx = i; + maxCapacity = capacity; + continue; + } + // With the same amount of capacity, we prefer to kill the requestor itself + // without affecting the other query. + if (isCandidate) { + candidateIdx = i; + } + } + VELOX_CHECK_NE(candidateIdx, -1); + return candidates[candidateIdx]; } void SharedArbitrator::updateArbitrationRequestStats() { @@ -298,30 +343,12 @@ int64_t SharedArbitrator::minGrowCapacity(const MemoryPool& pool) const { pool.capacity()); } -uint64_t SharedArbitrator::growCapacity( - MemoryPool* pool, - uint64_t requestBytes) { - std::lock_guard l(mutex_); - ++numReserves_; - const int64_t maxBytesToReserve = - std::min(maxGrowCapacity(*pool), requestBytes); - const int64_t minBytesToReserve = minGrowCapacity(*pool); - uint64_t reservedBytes = - decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); - try { - checkedGrow(pool, reservedBytes, 0); - } catch (const VeloxRuntimeError&) { - reservedBytes = 0; - } - return reservedBytes; -} - uint64_t SharedArbitrator::decrementFreeCapacity( uint64_t maxBytesToReserve, uint64_t minBytesToReserve) { uint64_t reservedBytes{0}; { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); reservedBytes = decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); } @@ -346,28 +373,27 @@ uint64_t SharedArbitrator::decrementFreeCapacityLocked( uint64_t SharedArbitrator::shrinkCapacity( MemoryPool* pool, uint64_t requestBytes) { - std::lock_guard l(mutex_); - ++numReleases_; + std::lock_guard l(stateLock_); + ++numShrinks_; const uint64_t freedBytes = shrinkPool(pool, requestBytes); incrementFreeCapacityLocked(freedBytes); return freedBytes; } uint64_t SharedArbitrator::shrinkCapacity( - const std::vector>& pools, uint64_t requestBytes, bool allowSpill, bool allowAbort) { incrementGlobalArbitrationCount(); requestBytes = requestBytes == 0 ? capacity_ : requestBytes; - ArbitrationOperation op(requestBytes, pools); + ArbitrationOperation op(requestBytes); ScopedArbitration scopedArbitration(this, &op); uint64_t fastReclaimTargetBytes = std::max(memoryPoolTransferCapacity_, requestBytes); std::lock_guard exclusiveLock(arbitrationLock_); - getCandidateStats(&op); + getCandidates(&op); uint64_t freedBytes = reclaimFreeMemoryFromCandidates(&op, fastReclaimTargetBytes, false); auto freeGuard = folly::makeGuard([&]() { @@ -387,7 +413,7 @@ uint64_t SharedArbitrator::shrinkCapacity( } if (allowAbort) { // Candidate stats may change after spilling. - getCandidateStats(&op); + getCandidates(&op); } } if (allowAbort) { @@ -397,7 +423,7 @@ uint64_t SharedArbitrator::shrinkCapacity( } void SharedArbitrator::testingFreeCapacity(uint64_t capacity) { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); incrementFreeCapacityLocked(capacity); } @@ -405,11 +431,8 @@ uint64_t SharedArbitrator::testingNumRequests() const { return numRequests_; } -bool SharedArbitrator::growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t requestBytes) { - ArbitrationOperation op(pool, requestBytes, candidatePools); +bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) { + ArbitrationOperation op(pool, requestBytes); ScopedArbitration scopedArbitration(this, &op); bool needGlobalArbitration{false}; @@ -482,7 +505,7 @@ bool SharedArbitrator::runLocalArbitration( } VELOX_CHECK_LT(freedBytes, maxGrowTarget); - getCandidateStats(op, true); + getCandidates(op, /*freeCapacityOnly=*/true); freedBytes += reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, true); if (freedBytes >= op->requestBytes) { @@ -604,7 +627,7 @@ bool SharedArbitrator::ensureCapacity(ArbitrationOperation* op) { bool SharedArbitrator::handleOOM(ArbitrationOperation* op) { MemoryPool* victim = findCandidateWithLargestCapacity( op->requestRoot, op->requestBytes, op->candidates) - .pool; + .pool.get(); if (op->requestRoot == victim) { VELOX_MEM_LOG(ERROR) << "Requestor memory pool " << op->requestRoot->name() @@ -665,7 +688,7 @@ bool SharedArbitrator::arbitrateMemory(ArbitrationOperation* op) { VELOX_CHECK_LT(freedBytes, maxGrowTarget); // Get refreshed stats before the global memory arbitration run. - getCandidateStats(op); + getCandidates(op); freedBytes += reclaimFreeMemoryFromCandidates(op, maxGrowTarget - freedBytes, false); @@ -704,15 +727,15 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( // Sort candidate memory pools based on their reclaimable free capacity. sortCandidatesByReclaimableFreeCapacity(op->candidates); - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); uint64_t reclaimedBytes{0}; for (const auto& candidate : op->candidates) { VELOX_CHECK_LT(reclaimedBytes, reclaimTargetBytes); if (candidate.freeBytes == 0) { break; } - if (isLocalArbitration && (candidate.pool != op->requestRoot) && - isUnderArbitrationLocked(candidate.pool)) { + if (isLocalArbitration && (candidate.pool.get() != op->requestRoot) && + isUnderArbitrationLocked(candidate.pool.get())) { // If the reclamation is for local arbitration and the candidate pool is // also under arbitration processing, then we can't reclaim from the // candidate pool as it might cause concurrent changes to the candidate @@ -722,11 +745,11 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( const int64_t bytesToReclaim = std::min( reclaimTargetBytes - reclaimedBytes, reclaimableFreeCapacity( - *candidate.pool, candidate.pool == op->requestRoot)); + *candidate.pool, candidate.pool.get() == op->requestRoot)); if (bytesToReclaim <= 0) { continue; } - reclaimedBytes += shrinkPool(candidate.pool, bytesToReclaim); + reclaimedBytes += shrinkPool(candidate.pool.get(), bytesToReclaim); if (reclaimedBytes >= reclaimTargetBytes) { break; } @@ -746,7 +769,8 @@ void SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill( if (candidate.reclaimableBytes == 0) { break; } - freedBytes += reclaim(candidate.pool, op->requestBytes - freedBytes, false); + freedBytes += + reclaim(candidate.pool.get(), op->requestBytes - freedBytes, false); if ((freedBytes >= op->requestBytes) || (op->requestRoot != nullptr && op->requestRoot->aborted())) { break; @@ -772,9 +796,9 @@ void SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort( candidate.pool->toString(), candidate.pool->treeMemoryUsage())); } catch (VeloxRuntimeError&) { - abort(candidate.pool, std::current_exception()); + abort(candidate.pool.get(), std::current_exception()); } - freedBytes += shrinkPool(candidate.pool, 0); + freedBytes += shrinkPool(candidate.pool.get(), 0); if (freedBytes >= op->requestBytes) { break; } @@ -849,7 +873,7 @@ void SharedArbitrator::abort( } void SharedArbitrator::incrementFreeCapacity(uint64_t bytes) { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); incrementFreeCapacityLocked(bytes); } @@ -876,7 +900,7 @@ void SharedArbitrator::incrementFreeReservedCapacityLocked(uint64_t& bytes) { } MemoryArbitrator::Stats SharedArbitrator::stats() const { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); return statsLocked(); } @@ -894,13 +918,12 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const { stats.freeReservedCapacityBytes = freeReservedCapacity_; stats.reclaimTimeUs = reclaimTimeUs_; stats.numNonReclaimableAttempts = numNonReclaimableAttempts_; - stats.numReserves = numReserves_; - stats.numReleases = numReleases_; + stats.numShrinks = numShrinks_; return stats; } std::string SharedArbitrator::toString() const { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); return toStringLocked(); } @@ -991,7 +1014,7 @@ void SharedArbitrator::startArbitration(ArbitrationOperation* op) { updateArbitrationRequestStats(); ContinueFuture waitPromise{ContinueFuture::makeEmpty()}; { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); ++numPending_; if (op->requestPool != nullptr) { auto it = arbitrationQueues_.find(op->requestRoot); @@ -1024,7 +1047,7 @@ void SharedArbitrator::startArbitration(ArbitrationOperation* op) { void SharedArbitrator::finishArbitration(ArbitrationOperation* op) { ContinuePromise resumePromise{ContinuePromise::makeEmpty()}; { - std::lock_guard l(mutex_); + std::lock_guard l(stateLock_); VELOX_CHECK_GT(numPending_, 0); --numPending_; if (op->requestPool != nullptr) { @@ -1048,11 +1071,6 @@ void SharedArbitrator::finishArbitration(ArbitrationOperation* op) { } } -bool SharedArbitrator::isUnderArbitration(MemoryPool* pool) const { - std::lock_guard l(mutex_); - return isUnderArbitrationLocked(pool); -} - bool SharedArbitrator::isUnderArbitrationLocked(MemoryPool* pool) const { return arbitrationQueues_.count(pool) != 0; } diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index dcd66eb677bd..f513a12825b0 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -19,6 +19,7 @@ #include "velox/common/memory/MemoryArbitrator.h" #include "velox/common/base/Counters.h" +#include "velox/common/base/GTestMacros.h" #include "velox/common/base/StatsReporter.h" #include "velox/common/future/VeloxPromise.h" #include "velox/common/memory/Memory.h" @@ -43,6 +44,14 @@ class SharedArbitrator : public memory::MemoryArbitrator { static int64_t getReservedCapacity( const std::unordered_map& configs); + /// The initial memory capacity to reserve for a newly created query memory + /// pool. + static constexpr std::string_view kMemoryPoolInitialCapacity{ + "memory-pool-initial-capacity"}; + static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; + static uint64_t getMemoryPoolInitialCapacity( + const std::unordered_map& configs); + /// The minimal amount of memory capacity reserved for each query to run. static constexpr std::string_view kMemoryPoolReservedCapacity{ "memory-pool-reserved-capacity"}; @@ -95,17 +104,15 @@ class SharedArbitrator : public memory::MemoryArbitrator { static void unregisterFactory(); - uint64_t growCapacity(MemoryPool* pool, uint64_t requestBytes) final; + void addPool(const std::shared_ptr& pool) final; - bool growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t requestBytes) final; + void removePool(MemoryPool* pool) final; + + bool growCapacity(MemoryPool* pool, uint64_t requestBytes) final; uint64_t shrinkCapacity(MemoryPool* pool, uint64_t requestBytes) final; uint64_t shrinkCapacity( - const std::vector>& pools, uint64_t requestBytes, bool allowSpill = true, bool force = false) override final; @@ -116,16 +123,6 @@ class SharedArbitrator : public memory::MemoryArbitrator { std::string toString() const final; - /// The candidate memory pool stats used by arbitration. - struct Candidate { - int64_t reclaimableBytes{0}; - int64_t freeBytes{0}; - int64_t reservedBytes{0}; - MemoryPool* pool; - - std::string toString() const; - }; - /// Returns 'freeCapacity' back to the arbitrator for testing. void testingFreeCapacity(uint64_t freeCapacity); @@ -151,6 +148,16 @@ class SharedArbitrator : public memory::MemoryArbitrator { static inline const std::string kGlobalArbitrationLockWaitWallNanos{ "globalArbitrationLockWaitWallNanos"}; + /// The candidate memory pool stats used by arbitration. + struct Candidate { + std::shared_ptr pool; + int64_t reclaimableBytes{0}; + int64_t freeBytes{0}; + int64_t reservedBytes{0}; + + std::string toString() const; + }; + private: // The kind string of shared arbitrator. inline static const std::string kind_{"SHARED"}; @@ -159,12 +166,11 @@ class SharedArbitrator : public memory::MemoryArbitrator { struct ArbitrationOperation { MemoryPool* const requestPool; MemoryPool* const requestRoot; - const std::vector>& candidatePools; const uint64_t requestBytes; // The start time of this arbitration operation. const std::chrono::steady_clock::time_point startTime; - // The stats of candidate memory pools used for memory arbitration. + // The candidate memory pools. std::vector candidates; // The time that waits in local arbitration queue. @@ -176,18 +182,12 @@ class SharedArbitrator : public memory::MemoryArbitrator { // The time that waits to acquire the global arbitration lock. uint64_t globalArbitrationLockWaitTimeUs{0}; - ArbitrationOperation( - uint64_t requestBytes, - const std::vector>& candidatePools) - : ArbitrationOperation(nullptr, requestBytes, candidatePools) {} + explicit ArbitrationOperation(uint64_t requestBytes) + : ArbitrationOperation(nullptr, requestBytes) {} - ArbitrationOperation( - MemoryPool* _requestor, - uint64_t _requestBytes, - const std::vector>& _candidatePools) + ArbitrationOperation(MemoryPool* _requestor, uint64_t _requestBytes) : requestPool(_requestor), requestRoot(_requestor == nullptr ? nullptr : _requestor->root()), - candidatePools(_candidatePools), requestBytes(_requestBytes), startTime(std::chrono::steady_clock::now()) {} @@ -282,12 +282,29 @@ class SharedArbitrator : public memory::MemoryArbitrator { uint64_t& maxGrowTarget, uint64_t& minGrowTarget); - // Invoked to get or refresh the memory stats of the candidate memory pools - // for arbitration. If 'freeCapacityOnly' is true, then we only get free - // capacity stats for each candidate memory pool. - void getCandidateStats( - ArbitrationOperation* op, - bool freeCapacityOnly = false); + // Invoked to get or refresh the candidate memory pools for arbitration. If + // 'freeCapacityOnly' is true, then we only get free capacity stats for each + // candidate memory pool. + void getCandidates(ArbitrationOperation* op, bool freeCapacityOnly = false); + + // Sorts 'candidates' based on reclaimable free capacity in descending order. + static void sortCandidatesByReclaimableFreeCapacity( + std::vector& candidates); + + // Sorts 'candidates' based on reclaimable used capacity in descending order. + static void sortCandidatesByReclaimableUsedCapacity( + std::vector& candidates); + + // Sorts 'candidates' based on actual used memory in descending order. + static void sortCandidatesByUsage(std::vector& candidates); + + // Finds the candidate with the largest capacity. For 'requestor', the + // capacity for comparison including its current capacity and the capacity to + // grow. + static const SharedArbitrator::Candidate& findCandidateWithLargestCapacity( + MemoryPool* requestor, + uint64_t targetBytes, + const std::vector& candidates); // Invoked to reclaim free memory capacity from 'candidates' without // actually freeing used memory. @@ -400,21 +417,25 @@ class SharedArbitrator : public memory::MemoryArbitrator { int64_t minGrowCapacity(const MemoryPool& pool) const; // Returns true if 'pool' is under memory arbitration. - bool isUnderArbitration(MemoryPool* pool) const; bool isUnderArbitrationLocked(MemoryPool* pool) const; void updateArbitrationRequestStats(); + void updateArbitrationFailureStats(); const uint64_t reservedCapacity_; + const uint64_t memoryPoolInitialCapacity_; const uint64_t memoryPoolReservedCapacity_; const uint64_t memoryPoolTransferCapacity_; const uint64_t memoryReclaimWaitMs_; const bool globalArbitrationEnabled_; const bool checkUsageLeak_; + mutable folly::SharedMutex poolLock_; + std::unordered_map> candidates_; + // Lock used to protect the arbitrator state. - mutable std::mutex mutex_; + mutable std::mutex stateLock_; tsan_atomic freeReservedCapacity_{0}; tsan_atomic freeNonReservedCapacity_{0}; @@ -440,7 +461,6 @@ class SharedArbitrator : public memory::MemoryArbitrator { tsan_atomic reclaimedUsedBytes_{0}; tsan_atomic reclaimTimeUs_{0}; tsan_atomic numNonReclaimableAttempts_{0}; - tsan_atomic numReserves_{0}; - tsan_atomic numReleases_{0}; + tsan_atomic numShrinks_{0}; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/MemoryAllocatorTest.cpp b/velox/common/memory/tests/MemoryAllocatorTest.cpp index f83d384a80eb..3c2f48f0c4cc 100644 --- a/velox/common/memory/tests/MemoryAllocatorTest.cpp +++ b/velox/common/memory/tests/MemoryAllocatorTest.cpp @@ -26,7 +26,6 @@ #include #include #include -#include #include #include diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index a5b88e541895..bb7f0bd78138 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -63,14 +63,14 @@ TEST_F(MemoryArbitrationTest, stats) { ASSERT_EQ( stats.toString(), "STATS[numRequests 2 numAborted 3 numFailures 100 " - "numNonReclaimableAttempts 5 numReserves 0 numReleases 0 " + "numNonReclaimableAttempts 5 numShrinks 0 " "queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms " "shrunkMemory 95.37MB reclaimedMemory 9.77KB " "maxCapacity 0B freeCapacity 1.95KB freeReservedCapacity 1000B]"); ASSERT_EQ( fmt::format("{}", stats), "STATS[numRequests 2 numAborted 3 numFailures 100 " - "numNonReclaimableAttempts 5 numReserves 0 numReleases 0 " + "numNonReclaimableAttempts 5 numShrinks 0 " "queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms " "shrunkMemory 95.37MB reclaimedMemory 9.77KB " "maxCapacity 0B freeCapacity 1.95KB freeReservedCapacity 1000B]"); @@ -135,33 +135,34 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { auto rootPool = manager.addRootPool("root-1", 8L << 20, MemoryReclaimer::create()); ASSERT_EQ(rootPool->capacity(), 1 << 20); - ASSERT_EQ( - manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20), 1 << 20); - ASSERT_EQ( - manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20), 2 << 20); + ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20)); + ASSERT_EQ(rootPool->capacity(), 1 << 20); + ASSERT_FALSE(manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20)); + ASSERT_EQ(rootPool->capacity(), 1 << 20); + ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 2 << 20)); + ASSERT_EQ(rootPool->capacity(), 4 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeCapacityBytes, 2 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeReservedCapacityBytes, 2 << 20); auto leafPool = rootPool->addLeafChild("leaf-1.0"); - void* buffer; VELOX_ASSERT_THROW( - buffer = leafPool->allocate(7L << 20), + leafPool->allocate(7L << 20), "Exceeded memory pool capacity after attempt to grow capacity through " "arbitration. Requestor pool name 'leaf-1.0', request size 7.00MB, " "memory pool capacity 4.00MB, memory pool max capacity 8.00MB"); - ASSERT_NO_THROW(buffer = leafPool->allocate(4L << 20)); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 0); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 1 << 20); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 1), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 1), 0); - leafPool->free(buffer, 4L << 20); + ASSERT_EQ(rootPool->capacity(), 3 << 20); + static_cast(rootPool.get())->testingSetReservation(0); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(leafPool.get(), 1 << 20), 1 << 20); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(rootPool.get(), 1 << 20), 1 << 20); - ASSERT_EQ(rootPool->capacity(), 2 << 20); - ASSERT_EQ(leafPool->capacity(), 2 << 20); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 2 << 20); + ASSERT_EQ(rootPool->capacity(), 1 << 20); + ASSERT_EQ(leafPool->capacity(), 1 << 20); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 1 << 20); ASSERT_EQ(rootPool->capacity(), 0); ASSERT_EQ(leafPool->capacity(), 0); } @@ -305,19 +306,17 @@ TEST_F(MemoryArbitrationTest, reservedCapacityFreeByPoolShrink) { pools.push_back(manager.addRootPool("", kMaxMemory)); ASSERT_GE(pools.back()->capacity(), 0); - ASSERT_EQ(arbitrator->shrinkCapacity(pools, 1 << 20), 2 << 20); - ASSERT_EQ(arbitrator->growCapacity(pools[numPools - 1].get(), 1 << 20), 0); - ASSERT_EQ(arbitrator->growCapacity(pools.back().get(), 2 << 20), 1 << 20); + ASSERT_EQ(arbitrator->shrinkCapacity(1 << 20), 2 << 20); } TEST_F(MemoryArbitrationTest, arbitratorStats) { const MemoryArbitrator::Stats emptyStats; ASSERT_TRUE(emptyStats.empty()); const MemoryArbitrator::Stats anchorStats( - 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); ASSERT_FALSE(anchorStats.empty()); const MemoryArbitrator::Stats largeStats( - 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); ASSERT_FALSE(largeStats.empty()); ASSERT_TRUE(!(anchorStats == largeStats)); ASSERT_TRUE(anchorStats != largeStats); @@ -327,11 +326,10 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(!(anchorStats >= largeStats)); const auto delta = largeStats - anchorStats; ASSERT_EQ( - delta, - MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 8, 3, 3, 3, 3)); + delta, MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 8, 3, 3, 3)); const MemoryArbitrator::Stats smallStats( - 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); ASSERT_TRUE(!(anchorStats == smallStats)); ASSERT_TRUE(anchorStats != smallStats); ASSERT_TRUE(!(anchorStats < smallStats)); @@ -340,7 +338,7 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(anchorStats >= smallStats); const MemoryArbitrator::Stats invalidStats( - 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 8, 2, 8, 2); + 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 2, 8, 2); ASSERT_TRUE(!(anchorStats == invalidStats)); ASSERT_TRUE(anchorStats != invalidStats); VELOX_ASSERT_THROW(anchorStats < invalidStats, ""); @@ -362,23 +360,16 @@ class FakeTestArbitrator : public MemoryArbitrator { return "USER"; } - uint64_t growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { - VELOX_NYI(); - return 0; - } + void addPool(const std::shared_ptr& /*unused*/) override {} - bool growCapacity( - MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + void removePool(MemoryPool* /*unused*/) override {} + + bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { VELOX_NYI(); } - uint64_t shrinkCapacity( - const std::vector>& /*unused*/, - uint64_t /*unused*/, - bool /*unused*/, - bool /*unused*/) override { + uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/) + override { VELOX_NYI(); } diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index e5c7c5eef9de..348ed8db3377 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -108,7 +108,7 @@ TEST_F(MemoryManagerTest, ctor) { "allocated pages 0 mapped pages 0]\n" "ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] " "STATS[numRequests 0 numAborted 0 numFailures 0 " - "numNonReclaimableAttempts 0 numReserves 0 numReleases 0 queueTime 0us " + "numNonReclaimableAttempts 0 numShrinks 0 queueTime 0us " "arbitrationTime 0us reclaimTime 0us shrunkMemory 0B " "reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]]"); } @@ -123,22 +123,16 @@ class FakeTestArbitrator : public MemoryArbitrator { .capacity = config.capacity, .extraConfigs = config.extraConfigs}) {} - uint64_t growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { - VELOX_NYI(); - } + void addPool(const std::shared_ptr& /*unused*/) override {} - bool growCapacity( - MemoryPool* /*unused*/, - const std::vector>& /*unused*/, - uint64_t /*unused*/) override { + void removePool(MemoryPool* /*unused*/) override {} + + bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override { VELOX_NYI(); } - uint64_t shrinkCapacity( - const std::vector>& /*unused*/, - uint64_t /*unused*/, - bool /*unused*/, - bool /*unused*/) override { + uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/) + override { VELOX_NYI(); } diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 182074d42ee0..19da53e346fd 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -2446,6 +2446,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, orderedArbitration) { (*candidates)[i - 1].reclaimableBytes); } }))); + folly::Random::DefaultGenerator rng; rng.seed(512); const uint64_t memCapacity = 512 * MB; diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 06b4f143082f..9eaf70010021 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -526,7 +526,6 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); - ASSERT_EQ(arbitrator_->stats().numReserves, numAddedPools_); ASSERT_GT(orderByQueryCtx->pool()->stats().numCapacityGrowths, 0); } } @@ -629,7 +628,6 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); - ASSERT_EQ(newStats.numReserves, numAddedPools_); } } @@ -742,7 +740,6 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); - ASSERT_EQ(arbitrator_->stats().numReserves, numAddedPools_); } } @@ -1289,10 +1286,8 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { threads.emplace_back([&]() { { std::lock_guard l(mutex); - auto oldNum = arbitrator_->stats().numReserves; queries.emplace_back( newQueryCtx(memoryManager_.get(), executor_.get())); - ASSERT_EQ(arbitrator_->stats().numReserves, oldNum + 1); } }); } @@ -1300,11 +1295,9 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { for (auto& queryThread : threads) { queryThread.join(); } - ASSERT_EQ(arbitrator_->stats().numReserves, numRootPools); - ASSERT_EQ(arbitrator_->stats().numReleases, 0); + ASSERT_EQ(arbitrator_->stats().numShrinks, 0); } - ASSERT_EQ(arbitrator_->stats().numReserves, numRootPools); - ASSERT_EQ(arbitrator_->stats().numReleases, numRootPools); + ASSERT_EQ(arbitrator_->stats().numShrinks, numRootPools); } } } // namespace facebook::velox::memory diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index f8f00d5303a4..5331f57837a7 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7287,7 +7287,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, joinBuildSpillError) { waitForAllTasksToBeDeleted(); ASSERT_EQ(arbitrator->stats().numFailures, 1); - ASSERT_EQ(arbitrator->stats().numReserves, 1); // Wait again here as this test uses on-demand created memory manager instead // of the global one. We need to make sure any used memory got cleaned up diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index a3b97f979d34..27612fc1ca2b 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -3742,8 +3742,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { ASSERT_GE(arbitrator->stats().numReclaimedBytes, testParam.bytesToReserve); waitForAllTasksToBeDeleted(3'000'000); queryCtx.reset(); - ASSERT_EQ(arbitrator->stats().numReserves, 1); - ASSERT_EQ(arbitrator->stats().numReleases, 1); + ASSERT_EQ(arbitrator->stats().numShrinks, 1); } } @@ -3823,7 +3822,6 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator->stats().numFailures, 1); ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); - ASSERT_EQ(arbitrator->stats().numReserves, 1); waitForAllTasksToBeDeleted(); } @@ -3916,7 +3914,6 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); ASSERT_EQ(arbitrator->stats().numFailures, 0); ASSERT_GT(arbitrator->stats().numReclaimedBytes, 0); - ASSERT_EQ(arbitrator->stats().numReserves, 1); waitForAllTasksToBeDeleted(); } @@ -4014,7 +4011,6 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator->stats().numFailures, 1); ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); - ASSERT_EQ(arbitrator->stats().numReserves, 1); const auto updatedSpillStats = common::globalSpillStats(); ASSERT_EQ(updatedSpillStats, spillStats); waitForAllTasksToBeDeleted();