diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 318879abb982a..3eac2f4528f49 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -259,7 +259,7 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { SuspendedSection suspender; velox::exec::MemoryReclaimer::Stats status; auto* mm = memoryManager_->getMemoryManager(); - uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling + uint64_t spilledOut = mm->arbitrator()->shrinkCapacity(remaining); // this conducts spilling LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << " bytes."; uint64_t total = shrunken + spilledOut; VLOG(2) << logPrefix << "Successfully reclaimed total " << total << " bytes."; diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 68c056ad0cab7..19d8fe02121d7 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -35,60 +35,89 @@ namespace gluten { using namespace facebook; +namespace { + +static constexpr std::string_view kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"}; +static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; +static constexpr std::string_view kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"}; +static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20}; + +template +T getConfig( + const std::unordered_map& configs, + const std::string_view& key, + const T& defaultValue) { + if (configs.count(std::string(key)) > 0) { + try { + return folly::to(configs.at(std::string(key))); + } catch (const std::exception& e) { + VELOX_USER_FAIL("Failed while parsing SharedArbitrator configs: {}", e.what()); + } + } + return defaultValue; +} +} // namespace /// We assume in a single Spark task. No thread-safety should be guaranteed. class ListenableArbitrator : public velox::memory::MemoryArbitrator { public: ListenableArbitrator(const Config& config, AllocationListener* listener) - : MemoryArbitrator(config), listener_(listener) {} - + : MemoryArbitrator(config), + listener_(listener), + memoryPoolInitialCapacity_( + getConfig(config.extraConfigs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)), + memoryPoolTransferCapacity_( + getConfig(config.extraConfigs, kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) { + } std::string kind() const override { return kind_; } - uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { - listener_->allocationChanged(targetBytes); - if (!growPool(pool, targetBytes, 0)) { - VELOX_FAIL("Failed to grow root pool's capacity for {}", velox::succinctBytes(targetBytes)); - } - return targetBytes; + void addPool(const std::shared_ptr& pool) override { + VELOX_CHECK_EQ(pool->capacity(), 0); + + std::unique_lock guard{mutex_}; + VELOX_CHECK_EQ(candidates_.count(pool.get()), 0); + candidates_.emplace(pool.get(), pool->weak_from_this()); } - uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { - return shrinkCapacity0(pool, targetBytes); + void removePool(velox::memory::MemoryPool* pool) override { + VELOX_CHECK_EQ(pool->reservedBytes(), 0); + shrinkCapacity(pool, pool->capacity()); + + std::unique_lock guard{mutex_}; + const auto ret = candidates_.erase(pool); + VELOX_CHECK_EQ(ret, 1); } - bool growCapacity( - velox::memory::MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t targetBytes) override { + bool growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { velox::memory::ScopedMemoryArbitrationContext ctx(pool); - std::shared_ptr candidate; + velox::memory::MemoryPool* pool; { std::unique_lock guard{mutex_}; - VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only be used within a single root pool") - candidate = candidatePools.back(); + VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool") + candidate = candidates_.begin()->first; } - VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in ListenableArbitrator"); + VELOX_CHECK(pool->root() == candidate, "Illegal state in ListenableArbitrator"); growCapacity0(pool->root(), targetBytes); return true; } - uint64_t shrinkCapacity( - const std::vector>& pools, - uint64_t targetBytes, - bool allowSpill, - bool allowAbort) override { + uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool allowAbort) override { velox::memory::ScopedMemoryArbitrationContext ctx((const velox::memory::MemoryPool*)nullptr); facebook::velox::exec::MemoryReclaimer::Stats status; - std::shared_ptr pool; + velox::memory::MemoryPool* pool; { std::unique_lock guard{mutex_}; - VELOX_CHECK_EQ(pools.size(), 1, "ListenableArbitrator should only be used within a single root pool") - pool = pools.at(0); + VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool") + pool = candidates_.begin()->first; } pool->reclaim(targetBytes, 0, status); // ignore the output - return shrinkCapacity0(pool.get(), 0); + return shrinkCapacity0(pool, 0); + } + + uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override { + return shrinkCapacity0(pool, targetBytes); } Stats stats() const override { @@ -130,8 +159,12 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { } gluten::AllocationListener* listener_; - std::mutex mutex_; + const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused. + const uint64_t memoryPoolTransferCapacity_; + + mutable std::mutex mutex_; inline static std::string kind_ = "GLUTEN"; + std::unordered_map> candidates_; }; class ArbitratorFactoryRegister {