diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 5235f7ecce4b..9c29edd815ca 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -34,13 +34,10 @@ const std::string getFilePath(const std::string& fileName) { // Used by unit test and benchmark. std::shared_ptr getResultIterator( - std::shared_ptr veloxPool, + VeloxMemoryManager* memoryManager, Runtime* runtime, const std::vector>& setScanInfos, std::shared_ptr& veloxPlan) { - auto ctxPool = veloxPool->addAggregateChild( - "query_benchmark_result_iterator", facebook::velox::memory::MemoryReclaimer::create()); - std::vector> inputIter; std::unordered_map sessionConf = {}; auto veloxPlanConverter = @@ -57,7 +54,7 @@ std::shared_ptr getResultIterator( veloxPlanConverter->splitInfos(), veloxPlan->leafPlanNodeIds(), scanInfos, scanIds, streamIds); auto wholestageIter = std::make_unique( - ctxPool, + memoryManager, veloxPlan, scanIds, setScanInfos, @@ -78,7 +75,6 @@ auto BM = [](::benchmark::State& state, auto memoryManager = getDefaultMemoryManager(); auto runtime = Runtime::create(kVeloxRuntimeKind); - auto veloxPool = memoryManager->getAggregateMemoryPool(); std::vector> scanInfos; scanInfos.reserve(datasetPaths.size()); @@ -96,7 +92,7 @@ auto BM = [](::benchmark::State& state, runtime->parsePlan(reinterpret_cast(plan.data()), plan.size()); std::shared_ptr veloxPlan; - auto resultIter = getResultIterator(veloxPool, runtime, scanInfos, veloxPlan); + auto resultIter = getResultIterator(memoryManager.get(), runtime, scanInfos, veloxPlan); auto outputSchema = toArrowSchema(veloxPlan->outputType(), defaultLeafVeloxMemoryPool().get()); while (resultIter->hasNext()) { auto array = resultIter->next()->exportArrowArray(); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index f54169dfb38d..9cdc61f60a50 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -78,8 +78,6 @@ std::shared_ptr VeloxRuntime::createResultIterator( printSessionConf(sessionConf); #endif - auto veloxPool = getAggregateVeloxPool(memoryManager); - VeloxPlanConverter veloxPlanConverter(inputs, getLeafVeloxPool(memoryManager).get(), sessionConf); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_); @@ -91,15 +89,16 @@ std::shared_ptr VeloxRuntime::createResultIterator( // Separate the scan ids and stream ids, and get the scan infos. getInfoAndIds(veloxPlanConverter.splitInfos(), veloxPlan_->leafPlanNodeIds(), scanInfos, scanIds, streamIds); + auto* vmm = toVeloxMemoryManager(memoryManager); if (scanInfos.size() == 0) { // Source node is not required. auto wholestageIter = std::make_unique( - veloxPool, veloxPlan_, streamIds, spillDir, sessionConf, taskInfo_); + vmm, veloxPlan_, streamIds, spillDir, sessionConf, taskInfo_); auto resultIter = std::make_shared(std::move(wholestageIter), this); return resultIter; } else { auto wholestageIter = std::make_unique( - veloxPool, veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_); + vmm, veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_); auto resultIter = std::make_shared(std::move(wholestageIter), this); return resultIter; } diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 2b3e904291eb..797a13d46e95 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -38,18 +38,18 @@ class VeloxRuntime final : public Runtime { explicit VeloxRuntime(const std::unordered_map& confMap); static std::shared_ptr getAggregateVeloxPool(MemoryManager* memoryManager) { - if (auto veloxMemoryManager = dynamic_cast(memoryManager)) { - return veloxMemoryManager->getAggregateMemoryPool(); - } else { - GLUTEN_CHECK(false, "Should use VeloxMemoryManager here."); - } + return toVeloxMemoryManager(memoryManager)->getAggregateMemoryPool(); } static std::shared_ptr getLeafVeloxPool(MemoryManager* memoryManager) { + return toVeloxMemoryManager(memoryManager)->getLeafMemoryPool(); + } + + static VeloxMemoryManager* toVeloxMemoryManager(MemoryManager* memoryManager) { if (auto veloxMemoryManager = dynamic_cast(memoryManager)) { - return veloxMemoryManager->getLeafMemoryPool(); + return veloxMemoryManager; } else { - GLUTEN_CHECK(false, "Should use VeloxMemoryManager here."); + GLUTEN_CHECK(false, "Velox memory manager should be used for Velox runtime."); } } diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b7c4af43fb4b..69f705e269dd 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -80,11 +80,11 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; } // namespace WholeStageResultIterator::WholeStageResultIterator( - std::shared_ptr pool, + VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, const std::unordered_map& confMap, const SparkTaskInfo& taskInfo) - : veloxPlan_(planNode), confMap_(confMap), taskInfo_(taskInfo), pool_(pool) { + : veloxPlan_(planNode), confMap_(confMap), taskInfo_(taskInfo), memoryManager_(memoryManager) { #ifdef ENABLE_HDFS updateHdfsTokens(); #endif @@ -100,7 +100,7 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ facebook::velox::core::QueryConfig{getQueryContextConf()}, connectorConfigs, gluten::VeloxBackend::get()->getAsyncDataCache(), - pool_, + memoryManager_->getAggregateMemoryPool(), nullptr, ""); return ctx; @@ -153,15 +153,10 @@ class ConditionalSuspendedSection { } // namespace int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { - std::string poolName{pool_->root()->name() + "/" + pool_->name()}; + auto pool = memoryManager_->getAggregateMemoryPool(); + std::string poolName{pool->root()->name() + "/" + pool->name()}; std::string logPrefix{"Spill[" + poolName + "]: "}; - VLOG(2) << logPrefix << "Trying to reclaim " << size << " bytes of data..."; - VLOG(2) << logPrefix << "Pool has reserved " << pool_->currentBytes() << "/" << pool_->root()->reservedBytes() << "/" - << pool_->root()->capacity() << "/" << pool_->root()->maxCapacity() << " bytes."; - VLOG(2) << logPrefix << "Shrinking..."; - int64_t shrunken = pool_->shrinkManaged(pool_.get(), size); - VLOG(2) << logPrefix << shrunken << " bytes released from shrinking."; - + int64_t shrunken = memoryManager_->shrink(size); // todo return the actual spilled size? if (spillStrategy_ == "auto") { int64_t remaining = size - shrunken; @@ -176,7 +171,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { // suspend the driver when we are on it ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr); velox::exec::MemoryReclaimer::Stats status; - uint64_t spilledOut = pool_->reclaim(remaining, status); + auto* mm = memoryManager_->getMemoryManager(); + uint64_t spilledOut = mm->arbitrator()->shrinkMemory({pool}, remaining); // this conducts spilling LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << " bytes."; uint64_t total = shrunken + spilledOut; VLOG(2) << logPrefix << "Successfully reclaimed total " << total << " bytes."; @@ -400,7 +396,7 @@ std::shared_ptr WholeStageResultIterator::createConnectorConfig() } WholeStageResultIteratorFirstStage::WholeStageResultIteratorFirstStage( - std::shared_ptr pool, + VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, const std::vector& scanNodeIds, const std::vector>& scanInfos, @@ -408,7 +404,7 @@ WholeStageResultIteratorFirstStage::WholeStageResultIteratorFirstStage( const std::string spillDir, const std::unordered_map& confMap, const SparkTaskInfo& taskInfo) - : WholeStageResultIterator(pool, planNode, confMap, taskInfo), + : WholeStageResultIterator(memoryManager, planNode, confMap, taskInfo), scanNodeIds_(scanNodeIds), scanInfos_(scanInfos), streamIds_(streamIds) { @@ -495,13 +491,13 @@ void WholeStageResultIteratorFirstStage::constructPartitionColumns( } WholeStageResultIteratorMiddleStage::WholeStageResultIteratorMiddleStage( - std::shared_ptr pool, + VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, const std::vector& streamIds, const std::string spillDir, const std::unordered_map& confMap, const SparkTaskInfo& taskInfo) - : WholeStageResultIterator(pool, planNode, confMap, taskInfo), streamIds_(streamIds) { + : WholeStageResultIterator(memoryManager, planNode, confMap, taskInfo), streamIds_(streamIds) { std::unordered_set emptySet; velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; std::shared_ptr queryCtx = createNewVeloxQueryCtx(); diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 7b32a6d48d6c..4e1f6ac3c594 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -34,7 +34,7 @@ namespace gluten { class WholeStageResultIterator : public ColumnarBatchIterator { public: WholeStageResultIterator( - std::shared_ptr pool, + VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, const std::unordered_map& confMap, const SparkTaskInfo& taskInfo); @@ -96,7 +96,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator { const std::unordered_map& runtimeStats, const std::string& metricId); - std::shared_ptr pool_; + VeloxMemoryManager* memoryManager_; // spill std::string spillStrategy_; @@ -113,7 +113,7 @@ class WholeStageResultIterator : public ColumnarBatchIterator { class WholeStageResultIteratorFirstStage final : public WholeStageResultIterator { public: WholeStageResultIteratorFirstStage( - std::shared_ptr pool, + VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, const std::vector& scanNodeIds, const std::vector>& scanInfos, @@ -137,7 +137,7 @@ class WholeStageResultIteratorFirstStage final : public WholeStageResultIterator class WholeStageResultIteratorMiddleStage final : public WholeStageResultIterator { public: WholeStageResultIteratorMiddleStage( - std::shared_ptr pool, + VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, const std::vector& streamIds, const std::string spillDir, diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index a8313310ac8d..1c78283f2830 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -18,6 +18,7 @@ #include "VeloxMemoryManager.h" #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/MemoryPool.h" +#include "velox/exec/MemoryReclaimer.h" #include "memory/ArrowMemoryPool.h" #include "utils/exception.h" @@ -62,26 +63,28 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { } void reserveMemory(velox::memory::MemoryPool* pool, uint64_t) override { - growPool(pool, memoryPoolInitCapacity_); + std::lock_guard l(mutex_); + growPoolLocked(pool, memoryPoolInitCapacity_); } - uint64_t releaseMemory(velox::memory::MemoryPool* pool, uint64_t bytes) override { - uint64_t freeBytes = pool->shrink(bytes); - listener_->allocationChanged(-freeBytes); - if (bytes == 0 && pool->capacity() != 0) { - // So far only MemoryManager::dropPool() calls with 0 bytes. Let's assert the pool - // gives all capacity back to Spark - // - // We are likely in destructor, do not throw. INFO log is fine since we have leak checks from Spark's memory - // manager - LOG(INFO) << "Memory pool " << pool->name() << " not completely shrunken when Memory::dropPool() is called"; - } - return freeBytes; + void releaseMemory(velox::memory::MemoryPool* pool) override { + std::lock_guard l(mutex_); + releaseMemoryLocked(pool); } uint64_t shrinkMemory(const std::vector>& pools, uint64_t targetBytes) override { - GLUTEN_CHECK(false, "Not implemented"); + facebook::velox::exec::MemoryReclaimer::Stats status; + GLUTEN_CHECK(pools.size() == 1, "Should shrink a single pool at a time"); + std::lock_guard l(mutex_); // FIXME: Do we have recursive locking for this mutex? + auto pool = pools.at(0); + const uint64_t oldCapacity = pool->capacity(); + uint64_t spilledOut = pool->reclaim(targetBytes, status); // ignore the output + uint64_t shrunken = pool->shrink(0); + const uint64_t newCapacity = pool->capacity(); + uint64_t total = oldCapacity - newCapacity; + listener_->allocationChanged(-total); + return total; } bool growMemory( @@ -91,7 +94,10 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { GLUTEN_CHECK(candidatePools.size() == 1, "ListenableArbitrator should only be used within a single root pool"); auto candidate = candidatePools.back(); GLUTEN_CHECK(pool->root() == candidate.get(), "Illegal state in ListenableArbitrator"); - growPool(pool, targetBytes); + { + std::lock_guard l(mutex_); + growPoolLocked(pool, targetBytes); + } return true; } @@ -105,12 +111,18 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { } private: - void growPool(velox::memory::MemoryPool* pool, uint64_t bytes) { + void growPoolLocked(velox::memory::MemoryPool* pool, uint64_t bytes) { listener_->allocationChanged(bytes); pool->grow(bytes); } + void releaseMemoryLocked(velox::memory::MemoryPool* pool) { + uint64_t freeBytes = pool->shrink(0); + listener_->allocationChanged(-freeBytes); + } + gluten::AllocationListener* listener_; + std::recursive_mutex mutex_; inline static std::string kind_ = "GLUTEN"; }; @@ -194,14 +206,17 @@ MemoryUsageStats collectMemoryUsageStatsInternal(const velox::memory::MemoryPool return stats; } -int64_t shrinkVeloxMemoryPool(velox::memory::MemoryPool* pool, int64_t size) { +int64_t shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::MemoryPool* pool, int64_t size) { std::string poolName{pool->root()->name() + "/" + pool->name()}; std::string logPrefix{"Shrink[" + poolName + "]: "}; VLOG(2) << logPrefix << "Trying to shrink " << size << " bytes of data..."; VLOG(2) << logPrefix << "Pool has reserved " << pool->currentBytes() << "/" << pool->root()->reservedBytes() << "/" << pool->root()->capacity() << "/" << pool->root()->maxCapacity() << " bytes."; VLOG(2) << logPrefix << "Shrinking..."; - int64_t shrunken = pool->shrinkManaged(pool, size); + const uint64_t oldCapacity = pool->capacity(); + mm->arbitrator()->releaseMemory(pool); + const uint64_t newCapacity = pool->capacity(); + int64_t shrunken = oldCapacity - newCapacity; VLOG(2) << logPrefix << shrunken << " bytes released from shrinking."; return shrunken; } @@ -212,7 +227,7 @@ const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const { } const int64_t VeloxMemoryManager::shrink(int64_t size) { - return shrinkVeloxMemoryPool(veloxAggregatePool_.get(), size); + return shrinkVeloxMemoryPool(veloxMemoryManager_.get(), veloxAggregatePool_.get(), size); } namespace { diff --git a/cpp/velox/memory/VeloxMemoryManager.h b/cpp/velox/memory/VeloxMemoryManager.h index faf48c344fca..875fb1847274 100644 --- a/cpp/velox/memory/VeloxMemoryManager.h +++ b/cpp/velox/memory/VeloxMemoryManager.h @@ -45,6 +45,10 @@ class VeloxMemoryManager final : public MemoryManager { return veloxLeafPool_; } + facebook::velox::memory::MemoryManager* getMemoryManager() const { + return veloxMemoryManager_.get(); + } + arrow::MemoryPool* getArrowMemoryPool() override { return arrowPool_.get(); } diff --git a/gluten-data/src/main/java/io/glutenproject/memory/nmm/LoggingReservationListener.java b/gluten-data/src/main/java/io/glutenproject/memory/nmm/LoggingReservationListener.java new file mode 100644 index 000000000000..b4d6b586284f --- /dev/null +++ b/gluten-data/src/main/java/io/glutenproject/memory/nmm/LoggingReservationListener.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.memory.nmm; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// For debugging purpose only +public class LoggingReservationListener implements ReservationListener { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggingReservationListener.class); + + private final ReservationListener delegated; + + public LoggingReservationListener(ReservationListener delegated) { + this.delegated = delegated; + } + + @Override + public long reserve(long size) { + long before = getUsedBytes(); + long reserved = delegated.reserve(size); + long after = getUsedBytes(); + LOGGER.info( + String.format( + "Reservation[%s]: %d + %d(%d) = %d", this.toString(), before, reserved, size, after)); + return reserved; + } + + @Override + public long unreserve(long size) { + long before = getUsedBytes(); + long unreserved = delegated.unreserve(size); + long after = getUsedBytes(); + LOGGER.info( + String.format( + "Unreservation[%s]: %d - %d(%d) = %d", + this.toString(), before, unreserved, size, after)); + return unreserved; + } + + @Override + public long getUsedBytes() { + return delegated.getUsedBytes(); + } +} diff --git a/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java b/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java index 797f75a7e9dd..c798d292b3fc 100644 --- a/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java +++ b/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java @@ -91,7 +91,10 @@ public void release() throws Exception { if (listener.getUsedBytes() != 0) { LOGGER.warn( name - + " Reservation listener still reserved non-zero bytes, which may cause " + + " Reservation listener " + + listener.toString() + + " " + + "still reserved non-zero bytes, which may cause " + "memory leak, size: " + Utils.bytesToString(listener.getUsedBytes())); }