diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index ae4a2e1ef6a3..e0faf199699e 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -215,7 +215,6 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidates( break; } } - numReclaimedBytes_ += freedBytes; return freedBytes; } @@ -223,8 +222,12 @@ uint64_t SharedArbitrator::reclaim( MemoryPool* pool, uint64_t targetBytes) noexcept { const uint64_t oldCapacity = pool->capacity(); + uint64_t freedBytes{0}; try { - pool->reclaim(targetBytes); + freedBytes = pool->shrink(targetBytes); + if (freedBytes < targetBytes) { + pool->reclaim(targetBytes - freedBytes); + } } catch (const std::exception& e) { VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool " << pool->name() << ", aborting it!"; @@ -235,7 +238,10 @@ uint64_t SharedArbitrator::reclaim( } const uint64_t newCapacity = pool->capacity(); VELOX_CHECK_GE(oldCapacity, newCapacity); - return oldCapacity - newCapacity; + const uint64_t reclaimedbytes = oldCapacity - newCapacity; + numShrunkBytes_ += freedBytes; + numReclaimedBytes_ += reclaimedbytes - freedBytes; + return reclaimedbytes; } void SharedArbitrator::abort(MemoryPool* pool) { diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 1464aeab5410..077aa20f8e05 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -1323,6 +1323,55 @@ TEST_F(MockSharedArbitrationTest, memoryPoolAbortThrow) { ASSERT_EQ(arbitrator_->stats().numAborted, 1); } +DEBUG_ONLY_TEST_F( + MockSharedArbitrationTest, + freeUnusedCapacityWhenReclaimMemoryPool) { + const int allocationSize = kMemoryCapacity / 4; + const int reclaimedMemoryCapacity = kMemoryCapacity; + std::shared_ptr reclaimedQuery = addQuery(kMemoryCapacity); + MockMemoryOperator* reclaimedQueryOp = addMemoryOp(reclaimedQuery); + // The buffer to free later. + void* bufferToFree = reclaimedQueryOp->allocate(allocationSize); + reclaimedQueryOp->allocate(kMemoryCapacity - allocationSize); + + std::shared_ptr arbitrationQuery = addQuery(kMemoryCapacity); + MockMemoryOperator* arbitrationQueryOp = addMemoryOp(arbitrationQuery); + + folly::EventCount reclaimWait; + auto reclaimWaitKey = reclaimWait.prepareWait(); + folly::EventCount reclaimBlock; + auto reclaimBlockKey = reclaimBlock.prepareWait(); + SCOPED_TESTVALUE_SET( + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableMemory", + std::function(([&](const MemoryPool* /*unsed*/) { + reclaimWait.notify(); + reclaimBlock.wait(reclaimBlockKey); + }))); + + const auto oldStats = arbitrator_->stats(); + + std::thread allocThread([&]() { + // Allocate to trigger arbitration. + arbitrationQueryOp->allocate(allocationSize); + }); + + reclaimWait.wait(reclaimWaitKey); + reclaimedQueryOp->free(bufferToFree); + reclaimBlock.notify(); + + allocThread.join(); + + const auto stats = arbitrator_->stats(); + ASSERT_EQ(stats.numFailures, 0); + ASSERT_EQ(stats.numAborted, 0); + ASSERT_EQ(stats.numRequests, oldStats.numRequests + 1); + // We count the freed capacity in reclaimed bytes. + ASSERT_EQ(stats.numShrunkBytes, oldStats.numShrunkBytes + allocationSize); + ASSERT_EQ(stats.numReclaimedBytes, 0); + ASSERT_EQ(reclaimedQueryOp->capacity(), kMemoryCapacity - allocationSize); + ASSERT_EQ(arbitrationQueryOp->capacity(), allocationSize); +} + DEBUG_ONLY_TEST_F( MockSharedArbitrationTest, raceBetweenInitialReservationAndArbitration) {