From 1d4b1859cc06349db0724923bd7e4848f16e9921 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 8 Aug 2024 13:22:27 +0000 Subject: [PATCH] fix compile --- cpp/velox/compute/WholeStageResultIterator.cc | 2 +- cpp/velox/memory/VeloxMemoryManager.cc | 120 ++++++++++-------- 2 files changed, 66 insertions(+), 56 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 4544c01653d97..c87e6e27f0289 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -245,7 +245,7 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { SuspendedSection suspender; velox::exec::MemoryReclaimer::Stats status; auto* mm = memoryManager_->getMemoryManager(); - uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling + uint64_t spilledOut = mm->arbitrator()->shrinkCapacity(pool.get(), remaining); // this conducts spilling LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << " bytes."; uint64_t total = shrunken + spilledOut; VLOG(2) << logPrefix << "Successfully reclaimed total " << total << " bytes."; diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 442090004a417..89bc3ebbaf2b5 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -38,58 +38,89 @@ using namespace facebook; /// We assume in a single Spark task. No thread-safety should be guaranteed. class ListenableArbitrator : public velox::memory::MemoryArbitrator { public: + struct ExtraConfig { + /// The memory capacity reserved to ensure each running query has minimal + /// capacity of 'memoryPoolReservedCapacity' to run. + static constexpr std::string_view kReservedCapacity{"reserved-capacity"}; + static constexpr int64_t kDefaultReservedCapacity{0}; + static int64_t getReservedCapacity(const std::unordered_map& configs); + + /// The initial memory capacity to reserve for a newly created query memory + /// pool. + static constexpr std::string_view kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"}; + static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; + static uint64_t getMemoryPoolInitialCapacity(const std::unordered_map& configs); + + /// The minimal amount of memory capacity reserved for each query to run. + static constexpr std::string_view kMemoryPoolReservedCapacity{"memory-pool-reserved-capacity"}; + static constexpr uint64_t kDefaultMemoryPoolReservedCapacity{0}; + static uint64_t getMemoryPoolReservedCapacity(const std::unordered_map& configs); + + /// The minimal memory capacity to transfer out of or into a memory pool + /// during the memory arbitration. + static constexpr std::string_view kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"}; + static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20}; + static uint64_t getMemoryPoolTransferCapacity(const std::unordered_map& configs); + + /// Specifies the max time to wait for memory reclaim by arbitration. The + /// memory reclaim might fail if the max time has exceeded. This prevents + /// the memory arbitration from getting stuck when the memory reclaim waits + /// for a hanging query task to pause. If it is zero, then there is no + /// timeout. + static constexpr std::string_view kMemoryReclaimWaitMs{"memory-reclaim-wait-ms"}; + static constexpr uint64_t kDefaultMemoryReclaimWaitMs{0}; + static uint64_t getMemoryReclaimWaitMs(const std::unordered_map& configs); + + /// If true, it allows memory arbitrator to reclaim used memory cross query + /// memory pools. + static constexpr std::string_view kGlobalArbitrationEnabled{"global-arbitration-enabled"}; + static constexpr bool kDefaultGlobalArbitrationEnabled{false}; + static bool getGlobalArbitrationEnabled(const std::unordered_map& configs); + + /// If true, do sanity check on the arbitrator state on destruction. + /// + /// TODO: deprecate this flag after all the existing memory leak use cases + /// have been fixed. + static constexpr std::string_view kCheckUsageLeak{"check-usage-leak"}; + static constexpr bool kDefaultCheckUsageLeak{true}; + static bool getCheckUsageLeak(const std::unordered_map& configs); + }; + ListenableArbitrator(const Config& config, AllocationListener* listener) - : MemoryArbitrator(config), listener_(listener) {} + : MemoryArbitrator(config), + listener_(listener), + memoryPoolTransferCapacity_(ExtraConfig::getMemoryPoolTransferCapacity(config.extraConfigs)) {} std::string kind() const override { return kind_; } - uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { + void addPool(const std::shared_ptr& pool) override { std::lock_guard l(mutex_); + VELOX_CHECK_EQ(pool->capacity(), 0); + auto targetBytes = pool->maxCapacity(); listener_->allocationChanged(targetBytes); - if (!growPool(pool, targetBytes, 0)) { + if (!growPool(pool.get(), targetBytes, 0)) { VELOX_FAIL("Failed to grow root pool's capacity for {}", velox::succinctBytes(targetBytes)); } - return targetBytes; } - uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { + void removePool(velox::memory::MemoryPool* pool) override { std::lock_guard l(mutex_); - return shrinkCapacityLocked(pool, targetBytes); + VELOX_CHECK_EQ(pool->reservedBytes(), 0); } - bool growCapacity( - velox::memory::MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t targetBytes) override { - velox::memory::ScopedMemoryArbitrationContext ctx(pool); - VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only be used within a single root pool") - auto candidate = candidatePools.back(); - VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in ListenableArbitrator"); + bool growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { + return false; + } + uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { std::lock_guard l(mutex_); - growCapacityLocked(pool->root(), targetBytes); - return true; + return shrinkCapacityLocked(pool, targetBytes); } - uint64_t shrinkCapacity( - const std::vector>& pools, - uint64_t targetBytes, - bool allowSpill, - bool allowAbort) override { - velox::memory::ScopedMemoryArbitrationContext ctx((const velox::memory::MemoryPool*)nullptr); - facebook::velox::exec::MemoryReclaimer::Stats status; - VELOX_CHECK_EQ(pools.size(), 1, "Gluten only has one root pool"); - std::lock_guard l(mutex_); // FIXME: Do we have recursive locking for this mutex? - auto pool = pools.at(0); - const uint64_t oldCapacity = pool->capacity(); - pool->reclaim(targetBytes, 0, status); // ignore the output - shrinkPool(pool.get(), 0); - const uint64_t newCapacity = pool->capacity(); - uint64_t total = oldCapacity - newCapacity; - listener_->allocationChanged(-total); - return total; + uint64_t shrinkCapacity(uint64_t /* unused */, bool /* unused */, bool /* unused */) override { + return 0; } Stats stats() const override { @@ -102,28 +133,6 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { } private: - void growCapacityLocked(velox::memory::MemoryPool* pool, uint64_t bytes) { - // Since - // https://github.com/facebookincubator/velox/pull/9557/files#diff-436e44b7374032f8f5d7eb45869602add6f955162daa2798d01cc82f8725724dL812-L820, - // We should pass bytes as parameter "reservationBytes" when calling ::grow. - auto freeByes = pool->freeBytes(); - if (freeByes > bytes) { - if (growPool(pool, 0, bytes)) { - return; - } - } - auto reclaimedFreeBytes = shrinkPool(pool, 0); - auto neededBytes = velox::bits::roundUp(bytes - reclaimedFreeBytes, memoryPoolTransferCapacity_); - listener_->allocationChanged(neededBytes); - auto ret = growPool(pool, reclaimedFreeBytes + neededBytes, bytes); - VELOX_CHECK( - ret, - "{} failed to grow {} bytes, current state {}", - pool->name(), - velox::succinctBytes(bytes), - pool->toString()) - } - uint64_t shrinkCapacityLocked(velox::memory::MemoryPool* pool, uint64_t bytes) { uint64_t freeBytes = shrinkPool(pool, bytes); listener_->allocationChanged(-freeBytes); @@ -132,6 +141,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { gluten::AllocationListener* listener_; std::recursive_mutex mutex_; + const uint64_t memoryPoolTransferCapacity_; inline static std::string kind_ = "GLUTEN"; };