Skip to content

Commit

Permalink
Let memory arbitration directly track the liveness of query memory po…
Browse files Browse the repository at this point in the history
…ols (facebookincubator#10674)

Summary:
In current implementation, memory manager tracks the liveness of query memory pools. For each memory arbitration, it makes a copy of all the alive memory pools and pass them to the memory arbitrator which is not required most of the time. This PR changes to let memory arbitrator track the memory pools directly to avoid this cost. This also enables the global memory arbitration optimizations in followup. The Meta internal shadow run shows 4% cpu reduction.

Pull Request resolved: facebookincubator#10674

Reviewed By: tanjialiang

Differential Revision: D60868050

Pulled By: xiaoxmeng

fbshipit-source-id: 816f799a82e35a7bb0f58434c614895128d8e159
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Aug 7, 2024
1 parent efe44c1 commit f40fa8a
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 330 deletions.
21 changes: 8 additions & 13 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,13 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator {
return "test";
}

uint64_t growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/)
override {
return 0;
void addPool(const std::shared_ptr<memory::MemoryPool>& /*unused*/) override {
}

bool growCapacity(
memory::MemoryPool* /*unused*/,
const std::vector<std::shared_ptr<memory::MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override {
void removePool(memory::MemoryPool* /*unused*/) override {}

bool growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/)
override {
return false;
}

Expand All @@ -259,11 +257,8 @@ class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator {
return 0;
}

uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<memory::MemoryPool>>& /*unused*/,
uint64_t /*unused*/,
bool /*unused*/,
bool /*unused*/) override {
uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/)
override {
return 0;
}

Expand Down Expand Up @@ -546,7 +541,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
.sumEvictScore = 10,
.ssdStats = newSsdStats});
arbitrator.updateStats(memory::MemoryArbitrator::Stats(
10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10));
10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10));
std::this_thread::sleep_for(std::chrono::milliseconds(4'000));

// Stop right after sufficient wait to ensure the following reads from main
Expand Down
12 changes: 6 additions & 6 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
// code will be removed.
extraArbitratorConfigs["reserved-capacity"] =
folly::to<std::string>(options.arbitratorReservedCapacity);
extraArbitratorConfigs["memory-pool-initial-capacity"] =
folly::to<std::string>(options.memoryPoolInitCapacity);
extraArbitratorConfigs["memory-pool-reserved-capacity"] =
folly::to<std::string>(options.memoryPoolReservedCapacity);
extraArbitratorConfigs["memory-pool-transfer-capacity"] =
Expand Down Expand Up @@ -271,8 +273,7 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
options);
pools_.emplace(poolName, pool);
VELOX_CHECK_EQ(pool->capacity(), 0);
arbitrator_->growCapacity(
pool.get(), std::min<uint64_t>(poolInitCapacity_, maxCapacity));
arbitrator_->addPool(pool);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricMemoryPoolInitialCapacityBytes, pool->capacity());
return pool;
Expand All @@ -292,15 +293,14 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
VELOX_CHECK_NOT_NULL(pool);
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
return arbitrator_->growCapacity(pool, getAlivePools(), incrementBytes);
return arbitrator_->growCapacity(pool, incrementBytes);
}

uint64_t MemoryManager::shrinkPools(
uint64_t targetBytes,
bool allowSpill,
bool allowAbort) {
return arbitrator_->shrinkCapacity(
getAlivePools(), targetBytes, allowSpill, allowAbort);
return arbitrator_->shrinkCapacity(targetBytes, allowSpill, allowAbort);
}

void MemoryManager::dropPool(MemoryPool* pool) {
Expand All @@ -312,7 +312,7 @@ void MemoryManager::dropPool(MemoryPool* pool) {
}
pools_.erase(it);
VELOX_DCHECK_EQ(pool->reservedBytes(), 0);
arbitrator_->shrinkCapacity(pool, 0);
arbitrator_->removePool(pool);
}

MemoryPool& MemoryManager::deprecatedSharedLeafPool() {
Expand Down
41 changes: 16 additions & 25 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,18 @@ class NoopArbitrator : public MemoryArbitrator {
return "NOOP";
}

// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity reserve.
uint64_t growCapacity(MemoryPool* pool, uint64_t /*unused*/) override {
growPool(pool, pool->maxCapacity(), 0);
return pool->capacity();
void addPool(const std::shared_ptr<MemoryPool>& pool) override {
VELOX_CHECK_EQ(pool->capacity(), 0);
growPool(pool.get(), pool->maxCapacity(), 0);
}

void removePool(MemoryPool* pool) override {
VELOX_CHECK_EQ(pool->reservedBytes(), 0);
}

// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity grow.
bool growCapacity(
MemoryPool* /*unused*/,
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override {
bool growCapacity(MemoryPool* /*unused*/, uint64_t /*unused*/) override {
return false;
}

Expand All @@ -117,7 +116,6 @@ class NoopArbitrator : public MemoryArbitrator {
// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity shrink.
uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& /* unused */,
uint64_t /* unused */,
bool /* unused */,
bool /* unused */) override {
Expand Down Expand Up @@ -336,8 +334,7 @@ MemoryArbitrator::Stats::Stats(
uint64_t _freeReservedCapacityBytes,
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts,
uint64_t _numReserves,
uint64_t _numReleases)
uint64_t _numShrinks)
: numRequests(_numRequests),
numSucceeded(_numSucceeded),
numAborted(_numAborted),
Expand All @@ -351,21 +348,19 @@ MemoryArbitrator::Stats::Stats(
freeReservedCapacityBytes(_freeReservedCapacityBytes),
reclaimTimeUs(_reclaimTimeUs),
numNonReclaimableAttempts(_numNonReclaimableAttempts),
numReserves(_numReserves),
numReleases(_numReleases) {}
numShrinks(_numShrinks) {}

std::string MemoryArbitrator::Stats::toString() const {
return fmt::format(
"STATS[numRequests {} numAborted {} numFailures {} "
"numNonReclaimableAttempts {} numReserves {} numReleases {} "
"numNonReclaimableAttempts {} numShrinks {} "
"queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} "
"reclaimedMemory {} maxCapacity {} freeCapacity {} freeReservedCapacity {}]",
numRequests,
numAborted,
numFailures,
numNonReclaimableAttempts,
numReserves,
numReleases,
numShrinks,
succinctMicros(queueTimeUs),
succinctMicros(arbitrationTimeUs),
succinctMicros(reclaimTimeUs),
Expand Down Expand Up @@ -393,8 +388,7 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs;
result.numNonReclaimableAttempts =
numNonReclaimableAttempts - other.numNonReclaimableAttempts;
result.numReserves = numReserves - other.numReserves;
result.numReleases = numReleases - other.numReleases;
result.numShrinks = numShrinks - other.numShrinks;
return result;
}

Expand All @@ -413,8 +407,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
freeReservedCapacityBytes,
reclaimTimeUs,
numNonReclaimableAttempts,
numReserves,
numReleases) ==
numShrinks) ==
std::tie(
other.numRequests,
other.numSucceeded,
Expand All @@ -429,8 +422,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
other.freeReservedCapacityBytes,
other.reclaimTimeUs,
other.numNonReclaimableAttempts,
other.numReserves,
other.numReleases);
other.numShrinks);
}

bool MemoryArbitrator::Stats::operator!=(const Stats& other) const {
Expand Down Expand Up @@ -459,8 +451,7 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const {
UPDATE_COUNTER(numReclaimedBytes);
UPDATE_COUNTER(reclaimTimeUs);
UPDATE_COUNTER(numNonReclaimableAttempts);
UPDATE_COUNTER(numReserves);
UPDATE_COUNTER(numReleases);
UPDATE_COUNTER(numShrinks);
#undef UPDATE_COUNTER
VELOX_CHECK(
!((gtCount > 0) && (ltCount > 0)),
Expand Down
69 changes: 27 additions & 42 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ class MemoryArbitrator {
/// manager.
int64_t capacity;

/// TODO: Remove this call back from Config. It can be directly implemented
/// in SharedArbitrator instead of passing in as an config option. Provided
///
/// by the query system to validate the state after a memory pool enters
/// arbitration if not null. For instance, Prestissimo provides
/// callback to check if a memory arbitration request is issued from a
/// driver thread, then the driver should be put in suspended state to avoid
/// the potential deadlock when reclaim memory from the task of the request
Expand Down Expand Up @@ -107,47 +102,40 @@ class MemoryArbitrator {

virtual ~MemoryArbitrator() = default;

/// Invoked by the memory manager to allocate up to 'targetBytes' of free
/// memory capacity without triggering memory arbitration. The function will
/// grow the memory pool's capacity based on the free available memory
/// capacity in the arbitrator, and returns the actual grown capacity in
/// bytes.
virtual uint64_t growCapacity(MemoryPool* pool, uint64_t bytes) = 0;
/// Invoked by the memory manager to add a newly created memory pool. The
/// memory arbitrator allocates the initial capacity for 'pool' and
/// dynamically adjusts its capacity based query memory needs through memory
/// arbitration.
virtual void addPool(const std::shared_ptr<MemoryPool>& pool) = 0;

/// Invoked by the memory manager to remove a destroyed memory pool. The
/// memory arbitrator frees up all its capacity and stops memory arbitration
/// operation on it.
virtual void removePool(MemoryPool* pool) = 0;

/// Invoked by the memory manager to grow a memory pool's capacity.
/// 'pool' is the memory pool to request to grow. 'candidates' is a list
/// of query root pools to participate in the memory arbitration. The memory
/// arbitrator picks up a number of pools to either shrink its memory capacity
/// without actually freeing memory or reclaim its used memory to free up
/// enough memory for 'requestor' to grow. Different arbitrators use different
/// policies to select the candidate pools. The shared memory arbitrator used
/// by both Prestissimo and Prestissimo-on-Spark, selects the candidates with
/// more memory capacity.
///
/// NOTE: the memory manager keeps 'candidates' valid during the arbitration
/// processing.
virtual bool growCapacity(
MemoryPool* pool,
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) = 0;
/// 'pool' is the memory pool to request to grow. The memory arbitrator picks
/// up a number of pools to either shrink its memory capacity without actually
/// freeing memory or reclaim its used memory to free up enough memory for
/// 'requestor' to grow.
virtual bool growCapacity(MemoryPool* pool, uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink up to 'targetBytes' free capacity
/// from a memory 'pool', and returns them back to the arbitrator. If
/// 'targetBytes' is zero, we shrink all the free capacity from the memory
/// pool. The function returns the actual freed capacity from 'pool'.
virtual uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink memory capacity from a given list
/// of memory pools by reclaiming free and used memory. The freed memory
/// capacity is given back to the arbitrator. If 'targetBytes' is zero, then
/// try to reclaim all the memory from 'pools'. The function returns the
/// actual freed memory capacity in bytes. If 'allowSpill' is true, it
/// reclaims the used memory by spilling. If 'allowAbort' is true, it reclaims
/// the used memory by aborting the queries with the most memory usage. If
/// both are true, it first reclaims the used memory by spilling and then
/// abort queries to reach the reclaim target.
/// Invoked by the memory manager to shrink memory capacity from memory pools
/// by reclaiming free and used memory. The freed memory capacity is given
/// back to the arbitrator. If 'targetBytes' is zero, then try to reclaim all
/// the memory from 'pools'. The function returns the actual freed memory
/// capacity in bytes. If 'allowSpill' is true, it reclaims the used memory by
/// spilling. If 'allowAbort' is true, it reclaims the used memory by aborting
/// the queries with the most memory usage. If both are true, it first
/// reclaims the used memory by spilling and then abort queries to reach the
/// reclaim target.
virtual uint64_t shrinkCapacity(
const std::vector<std::shared_ptr<MemoryPool>>& pools,
uint64_t targetBytes,
bool allowSpill = true,
bool allowAbort = false) = 0;
Expand Down Expand Up @@ -183,10 +171,8 @@ class MemoryArbitrator {
/// The total number of times of the reclaim attempts that end up failing
/// due to reclaiming at non-reclaimable stage.
uint64_t numNonReclaimableAttempts{0};
/// The total number of memory reservations.
uint64_t numReserves{0};
/// The total number of memory releases.
uint64_t numReleases{0};
/// The total number of memory capacity shrinks.
uint64_t numShrinks{0};

Stats(
uint64_t _numRequests,
Expand All @@ -202,8 +188,7 @@ class MemoryArbitrator {
uint64_t _freeReservedCapacityBytes,
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts,
uint64_t _numReserves,
uint64_t _numReleases);
uint64_t _numShrinks);

Stats() = default;

Expand Down
Loading

0 comments on commit f40fa8a

Please sign in to comment.