diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index 1f6fa29836de..4db9c9d9c16f 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -221,7 +221,7 @@ class MemoryManager { const uint16_t alignment_; const bool checkUsageLeak_; const bool debugEnabled_; - // The destruction callback set for the allocated root memory pools which are + // The destruction callback set for the allocated root memory pools which are // tracked by 'pools_'. It is invoked on the root pool destruction and removes // the pool from 'pools_'. const MemoryPoolImpl::DestructionCallback poolDestructionCb_; diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index b1f808ccf033..3f2741a070e6 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -264,7 +264,9 @@ MemoryArbitrator::Stats::Stats( uint64_t _maxCapacityBytes, uint64_t _freeCapacityBytes, uint64_t _reclaimTimeUs, - uint64_t _numNonReclaimableAttempts) + uint64_t _numNonReclaimableAttempts, + uint64_t _numReserveRequest, + uint64_t _numReleaseRequest) : numRequests(_numRequests), numSucceeded(_numSucceeded), numAborted(_numAborted), @@ -276,16 +278,23 @@ MemoryArbitrator::Stats::Stats( maxCapacityBytes(_maxCapacityBytes), freeCapacityBytes(_freeCapacityBytes), reclaimTimeUs(_reclaimTimeUs), - numNonReclaimableAttempts(_numNonReclaimableAttempts) {} + numNonReclaimableAttempts(_numNonReclaimableAttempts), + numReserveRequest(_numReserveRequest), + numReleaseRequest(_numReleaseRequest) {} std::string MemoryArbitrator::Stats::toString() const { return fmt::format( - "STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} numNonReclaimableAttempts {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]", + "STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} " + "numNonReclaimableAttempts {} numReserveRequest {} numReleaseRequest {} " + "queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} " + "reclaimedMemory {} maxCapacity {} freeCapacity {}]", numRequests, numSucceeded, numAborted, numFailures, numNonReclaimableAttempts, + numReserveRequest, + numReleaseRequest, succinctMicros(queueTimeUs), succinctMicros(arbitrationTimeUs), succinctMicros(reclaimTimeUs), @@ -311,6 +320,8 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-( result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs; result.numNonReclaimableAttempts = numNonReclaimableAttempts - other.numNonReclaimableAttempts; + result.numReserveRequest = numReserveRequest - other.numReserveRequest; + result.numReleaseRequest = numReleaseRequest - other.numReleaseRequest; return result; } @@ -327,7 +338,9 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { maxCapacityBytes, freeCapacityBytes, reclaimTimeUs, - numNonReclaimableAttempts) == + numNonReclaimableAttempts, + numReserveRequest, + numReleaseRequest) == std::tie( other.numRequests, other.numSucceeded, @@ -340,7 +353,9 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { other.maxCapacityBytes, other.freeCapacityBytes, other.reclaimTimeUs, - other.numNonReclaimableAttempts); + other.numNonReclaimableAttempts, + other.numReserveRequest, + other.numReleaseRequest); } bool MemoryArbitrator::Stats::operator!=(const Stats& other) const { @@ -372,6 +387,8 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const { UPDATE_COUNTER(numReclaimedBytes); UPDATE_COUNTER(reclaimTimeUs); UPDATE_COUNTER(numNonReclaimableAttempts); + UPDATE_COUNTER(numReserveRequest); + UPDATE_COUNTER(numReleaseRequest); #undef UPDATE_COUNTER VELOX_CHECK( !((gtCount > 0) && (ltCount > 0)), diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index ffa0e10e5905..153025d7104d 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -173,6 +173,10 @@ class MemoryArbitrator { /// The total number of times of the reclaim attempts that end up failing /// due to reclaiming at non-reclaimable stage. uint64_t numNonReclaimableAttempts{0}; + /// The total number of invoking reserveMemory method. + uint64_t numReserveRequest{0}; + /// The total number of invoking releaseMemory method. + uint64_t numReleaseRequest{0}; Stats( uint64_t _numRequests, @@ -186,7 +190,9 @@ class MemoryArbitrator { uint64_t _maxCapacityBytes, uint64_t _freeCapacityBytes, uint64_t _reclaimTimeUs, - uint64_t _numNonReclaimableAttempts); + uint64_t _numNonReclaimableAttempts, + uint64_t _numReserveRequest, + uint64_t _numReleaseRequest); Stats() = default; diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 23cee612ad25..ce803ee14876 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -47,7 +47,11 @@ TEST_F(MemoryArbitrationTest, stats) { stats.numNonReclaimableAttempts = 5; ASSERT_EQ( stats.toString(), - "STATS[numRequests 2 numSucceeded 0 numAborted 3 numFailures 100 numNonReclaimableAttempts 5 queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms shrunkMemory 95.37MB reclaimedMemory 9.77KB maxCapacity 0B freeCapacity 0B]"); + "STATS[numRequests 2 numSucceeded 0 numAborted 3 numFailures 100 " + "numNonReclaimableAttempts 5 numReserveRequest 0 numReleaseRequest 0 " + "queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms " + "shrunkMemory 95.37MB reclaimedMemory 9.77KB " + "maxCapacity 0B freeCapacity 0B]"); } TEST_F(MemoryArbitrationTest, create) { @@ -121,9 +125,11 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { TEST_F(MemoryArbitrationTest, arbitratorStats) { const MemoryArbitrator::Stats emptyStats; ASSERT_TRUE(emptyStats.empty()); - const MemoryArbitrator::Stats anchorStats(5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); + const MemoryArbitrator::Stats anchorStats( + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); ASSERT_FALSE(anchorStats.empty()); - const MemoryArbitrator::Stats largeStats(8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); + const MemoryArbitrator::Stats largeStats( + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); ASSERT_FALSE(largeStats.empty()); ASSERT_TRUE(!(anchorStats == largeStats)); ASSERT_TRUE(anchorStats != largeStats); @@ -132,9 +138,11 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(anchorStats <= largeStats); ASSERT_TRUE(!(anchorStats >= largeStats)); const auto delta = largeStats - anchorStats; - ASSERT_EQ(delta, MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 3, 3)); + ASSERT_EQ( + delta, MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 3, 3, 3, 3)); - const MemoryArbitrator::Stats smallStats(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + const MemoryArbitrator::Stats smallStats( + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); ASSERT_TRUE(!(anchorStats == smallStats)); ASSERT_TRUE(anchorStats != smallStats); ASSERT_TRUE(!(anchorStats < smallStats)); @@ -143,7 +151,7 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(anchorStats >= smallStats); const MemoryArbitrator::Stats invalidStats( - 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 2); + 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 2, 8, 2); ASSERT_TRUE(!(anchorStats == invalidStats)); ASSERT_TRUE(anchorStats != invalidStats); ASSERT_THROW(anchorStats < invalidStats, VeloxException); diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 2c304ef07806..2e0d872b2a54 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -97,7 +97,15 @@ TEST_F(MemoryManagerTest, Ctor) { ASSERT_EQ(arbitrator->stats().maxCapacityBytes, kCapacity); ASSERT_EQ( manager.toString(), - "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 0\nList of root pools:\n\t__default_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] STATS[numRequests 0 numSucceeded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 queueTime 0us arbitrationTime 0us reclaimTime 0us shrunkMemory 0B reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB]]]"); + "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of " + "pools 0\nList of root pools:\n\t__default_root__\n" + "Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 " + "allocated pages 0 mapped pages 0]\n" + "ARBITRATOR[SHARED CAPACITY[4.00GB] STATS[numRequests 0 numSucceeded 0 " + "numAborted 0 numFailures 0 numNonReclaimableAttempts 0 " + "numReserveRequest 0 numReleaseRequest 0 queueTime 0us " + "arbitrationTime 0us reclaimTime 0us shrunkMemory 0B " + "reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB]]]"); } { // Test construction failure due to inconsistent allocator capacity setting. diff --git a/velox/exec/SharedArbitrator.cpp b/velox/exec/SharedArbitrator.cpp index d6a166608349..96c7c9c85c9f 100644 --- a/velox/exec/SharedArbitrator.cpp +++ b/velox/exec/SharedArbitrator.cpp @@ -152,6 +152,7 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) { const int64_t bytesToReserve = std::min(maxGrowBytes(*pool), memoryPoolInitCapacity_); std::lock_guard l(mutex_); + ++numReserveRequest_; if (running_) { // NOTE: if there is a running memory arbitration, then we shall skip // reserving the free memory for the newly created memory pool but let it @@ -164,6 +165,7 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) { void SharedArbitrator::releaseMemory(MemoryPool* pool) { std::lock_guard l(mutex_); + ++numReleaseRequest_; const uint64_t freedBytes = pool->shrink(0); incrementFreeCapacityLocked(freedBytes); } @@ -499,6 +501,8 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const { stats.freeCapacityBytes = freeCapacity_; stats.reclaimTimeUs = reclaimTimeUs_; stats.numNonReclaimableAttempts = numNonReclaimableAttempts_; + stats.numReserveRequest = numReserveRequest_; + stats.numReleaseRequest = numReleaseRequest_; return stats; } diff --git a/velox/exec/SharedArbitrator.h b/velox/exec/SharedArbitrator.h index 04d7b112991a..b6e8bbb1968a 100644 --- a/velox/exec/SharedArbitrator.h +++ b/velox/exec/SharedArbitrator.h @@ -203,5 +203,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { tsan_atomic numReclaimedBytes_{0}; tsan_atomic reclaimTimeUs_{0}; tsan_atomic numNonReclaimableAttempts_{0}; + tsan_atomic numReserveRequest_{0}; + tsan_atomic numReleaseRequest_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index bb89c8c34f72..05ee55b3109f 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -314,6 +314,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { VectorFuzzer fuzzer(fuzzerOpts_, pool()); vector_ = newVector(); executor_ = std::make_unique(32); + numAddPool_ = 0; } void TearDown() override { @@ -338,6 +339,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { memoryManager_ = std::make_unique(options); ASSERT_EQ(memoryManager_->arbitrator()->kind(), "SHARED"); arbitrator_ = static_cast(memoryManager_->arbitrator()); + numAddPool_ = 0; } RowVectorPtr newVector() { @@ -376,6 +378,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { configs, cache::AsyncDataCache::getInstance(), std::move(pool)); + ++numAddPool_; return queryCtx; } @@ -387,6 +390,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { VectorFuzzer::Options fuzzerOpts_; RowVectorPtr vector_; std::unique_ptr executor_; + uint64_t numAddPool_; }; DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromOrderBy) { @@ -679,6 +683,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } } @@ -1172,6 +1177,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); + ASSERT_EQ(newStats.numReserveRequest, numAddPool_); } } @@ -1463,6 +1469,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { const auto newStats = arbitrator_->stats(); ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes); ASSERT_GT(newStats.reclaimTimeUs, oldStats.reclaimTimeUs); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } } @@ -1795,6 +1802,7 @@ DEBUG_ONLY_TEST_F( memThread.join(); waitForAllTasksToBeDeleted(); ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 2); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } DEBUG_ONLY_TEST_F( @@ -2789,73 +2797,77 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromTableWriter) { createDuckDbTable(vectors); for (bool writerSpillEnabled : {false, true}) { - SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); + { + SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); - setupMemory(kMemoryCapacity, 0); + setupMemory(kMemoryCapacity, 0); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), 0); + std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + ASSERT_EQ(queryCtx->pool()->capacity(), 0); - std::atomic numInputs{0}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "TableWrite") { - return; - } - // We reclaim memory from table writer connector memory pool which - // connects to the memory pools inside the hive connector. - ASSERT_FALSE(op->canReclaim()); - if (++numInputs != numBatches) { - return; - } + std::atomic numInputs{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "TableWrite") { + return; + } + // We reclaim memory from table writer connector memory pool which + // connects to the memory pools inside the hive connector. + ASSERT_FALSE(op->canReclaim()); + if (++numInputs != numBatches) { + return; + } - const auto fakeAllocationSize = - arbitrator_->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); - if (writerSpillEnabled) { - auto* buffer = op->pool()->allocate(fakeAllocationSize); - op->pool()->free(buffer, fakeAllocationSize); - } else { - VELOX_ASSERT_THROW( - op->pool()->allocate(fakeAllocationSize), - "Exceeded memory pool"); - } - }))); + const auto fakeAllocationSize = + arbitrator_->stats().maxCapacityBytes - + op->pool()->parent()->reservedBytes(); + if (writerSpillEnabled) { + auto* buffer = op->pool()->allocate(fakeAllocationSize); + op->pool()->free(buffer, fakeAllocationSize); + } else { + VELOX_ASSERT_THROW( + op->pool()->allocate(fakeAllocationSize), + "Exceeded memory pool"); + } + }))); - auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto outputDirectory = TempDirectoryPath::create(); - auto writerPlan = - PlanBuilder() - .values(vectors) - .tableWrite(outputDirectory->path) - .project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}) - .planNode(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto outputDirectory = TempDirectoryPath::create(); + auto writerPlan = + PlanBuilder() + .values(vectors) + .tableWrite(outputDirectory->path) + .project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format( + "sum({})", TableWriteTraits::rowCountColumnName())}) + .planNode(); - AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(queryCtx) - .maxDrivers(1) - .spillDirectory(spillDirectory->path) - .config( - core::QueryConfig::kSpillEnabled, - writerSpillEnabled ? "true" : "false") - .config( - core::QueryConfig::kWriterSpillEnabled, - writerSpillEnabled ? "true" : "false") - // Set 0 file writer flush threshold to always trigger flush in test. - .config( - core::QueryConfig::kWriterFlushThresholdBytes, - folly::to(0)) - .plan(std::move(writerPlan)) - .assertResults(fmt::format("SELECT {}", numRows)); - - ASSERT_EQ(arbitrator_->stats().numFailures, writerSpillEnabled ? 0 : 1); - ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 0); - waitForAllTasksToBeDeleted(3'000'000); + AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(queryCtx) + .maxDrivers(1) + .spillDirectory(spillDirectory->path) + .config( + core::QueryConfig::kSpillEnabled, + writerSpillEnabled ? "true" : "false") + .config( + core::QueryConfig::kWriterSpillEnabled, + writerSpillEnabled ? "true" : "false") + // Set 0 file writer flush threshold to always trigger flush in test. + .config( + core::QueryConfig::kWriterFlushThresholdBytes, + folly::to(0)) + .plan(std::move(writerPlan)) + .assertResults(fmt::format("SELECT {}", numRows)); + + ASSERT_EQ(arbitrator_->stats().numFailures, writerSpillEnabled ? 0 : 1); + ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 0); + waitForAllTasksToBeDeleted(3'000'000); + } + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); + ASSERT_EQ(arbitrator_->stats().numReleaseRequest, numAddPool_); } } @@ -2879,83 +2891,87 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromSortTableWriter) { createDuckDbTable(vectors); for (bool writerSpillEnabled : {false, true}) { - SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); + { + SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); - setupMemory(kMemoryCapacity, 0); + setupMemory(kMemoryCapacity, 0); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), 0); + std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + ASSERT_EQ(queryCtx->pool()->capacity(), 0); - const auto spillStats = globalSpillStats(); + const auto spillStats = globalSpillStats(); - std::atomic numInputs{0}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "TableWrite") { - return; - } - // We reclaim memory from table writer connector memory pool which - // connects to the memory pools inside the hive connector. - ASSERT_FALSE(op->canReclaim()); - if (++numInputs != numBatches) { - return; - } + std::atomic numInputs{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "TableWrite") { + return; + } + // We reclaim memory from table writer connector memory pool which + // connects to the memory pools inside the hive connector. + ASSERT_FALSE(op->canReclaim()); + if (++numInputs != numBatches) { + return; + } - const auto fakeAllocationSize = - arbitrator_->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); - if (writerSpillEnabled) { - auto* buffer = op->pool()->allocate(fakeAllocationSize); - op->pool()->free(buffer, fakeAllocationSize); - } else { - VELOX_ASSERT_THROW( - op->pool()->allocate(fakeAllocationSize), - "Exceeded memory pool"); - } - }))); + const auto fakeAllocationSize = + arbitrator_->stats().maxCapacityBytes - + op->pool()->parent()->reservedBytes(); + if (writerSpillEnabled) { + auto* buffer = op->pool()->allocate(fakeAllocationSize); + op->pool()->free(buffer, fakeAllocationSize); + } else { + VELOX_ASSERT_THROW( + op->pool()->allocate(fakeAllocationSize), + "Exceeded memory pool"); + } + }))); - auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto outputDirectory = TempDirectoryPath::create(); - auto writerPlan = - PlanBuilder() - .values(vectors) - .tableWrite(outputDirectory->path, {"c0"}, 4, {"c1"}, {"c2"}) - .project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}) - .planNode(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto outputDirectory = TempDirectoryPath::create(); + auto writerPlan = + PlanBuilder() + .values(vectors) + .tableWrite(outputDirectory->path, {"c0"}, 4, {"c1"}, {"c2"}) + .project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format( + "sum({})", TableWriteTraits::rowCountColumnName())}) + .planNode(); - AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(queryCtx) - .maxDrivers(1) - .spillDirectory(spillDirectory->path) - .config( - core::QueryConfig::kSpillEnabled, - writerSpillEnabled ? "true" : "false") - .config( - core::QueryConfig::kWriterSpillEnabled, - writerSpillEnabled ? "true" : "false") - // Set 0 file writer flush threshold to always trigger flush in test. - .config( - core::QueryConfig::kWriterFlushThresholdBytes, - folly::to(0)) - .plan(std::move(writerPlan)) - .assertResults(fmt::format("SELECT {}", numRows)); - - ASSERT_EQ(arbitrator_->stats().numFailures, writerSpillEnabled ? 0 : 1); - ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 0); - waitForAllTasksToBeDeleted(3'000'000); - const auto updatedSpillStats = globalSpillStats(); - if (writerSpillEnabled) { - ASSERT_GT(updatedSpillStats.spilledBytes, spillStats.spilledBytes); - ASSERT_GT( - updatedSpillStats.spilledPartitions, spillStats.spilledPartitions); - } else { - ASSERT_EQ(updatedSpillStats, spillStats); + AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(queryCtx) + .maxDrivers(1) + .spillDirectory(spillDirectory->path) + .config( + core::QueryConfig::kSpillEnabled, + writerSpillEnabled ? "true" : "false") + .config( + core::QueryConfig::kWriterSpillEnabled, + writerSpillEnabled ? "true" : "false") + // Set 0 file writer flush threshold to always trigger flush in test. + .config( + core::QueryConfig::kWriterFlushThresholdBytes, + folly::to(0)) + .plan(std::move(writerPlan)) + .assertResults(fmt::format("SELECT {}", numRows)); + + ASSERT_EQ(arbitrator_->stats().numFailures, writerSpillEnabled ? 0 : 1); + ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 0); + waitForAllTasksToBeDeleted(3'000'000); + const auto updatedSpillStats = globalSpillStats(); + if (writerSpillEnabled) { + ASSERT_GT(updatedSpillStats.spilledBytes, spillStats.spilledBytes); + ASSERT_GT( + updatedSpillStats.spilledPartitions, spillStats.spilledPartitions); + } else { + ASSERT_EQ(updatedSpillStats, spillStats); + } } + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); + ASSERT_EQ(arbitrator_->stats().numReleaseRequest, numAddPool_); } } @@ -2977,71 +2993,75 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, writerFlushThreshold) { const std::vector writerFlushThresholds{0, 1UL << 30}; for (uint64_t writerFlushThreshold : writerFlushThresholds) { - SCOPED_TRACE(fmt::format( - "writerFlushThreshold: {}", succinctBytes(writerFlushThreshold))); + { + SCOPED_TRACE(fmt::format( + "writerFlushThreshold: {}", succinctBytes(writerFlushThreshold))); - setupMemory(kMemoryCapacity, 0); + setupMemory(kMemoryCapacity, 0); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), 0); + std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + ASSERT_EQ(queryCtx->pool()->capacity(), 0); - std::atomic numInputs{0}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "TableWrite") { - return; - } - if (++numInputs != numBatches) { - return; - } + std::atomic numInputs{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "TableWrite") { + return; + } + if (++numInputs != numBatches) { + return; + } - const auto fakeAllocationSize = - arbitrator_->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); - if (writerFlushThreshold == 0) { - auto* buffer = op->pool()->allocate(fakeAllocationSize); - op->pool()->free(buffer, fakeAllocationSize); - } else { - // The injected memory allocation fail if we set very high memory - // flush threshold. - VELOX_ASSERT_THROW( - op->pool()->allocate(fakeAllocationSize), - "Exceeded memory pool"); - } - }))); + const auto fakeAllocationSize = + arbitrator_->stats().maxCapacityBytes - + op->pool()->parent()->reservedBytes(); + if (writerFlushThreshold == 0) { + auto* buffer = op->pool()->allocate(fakeAllocationSize); + op->pool()->free(buffer, fakeAllocationSize); + } else { + // The injected memory allocation fail if we set very high memory + // flush threshold. + VELOX_ASSERT_THROW( + op->pool()->allocate(fakeAllocationSize), + "Exceeded memory pool"); + } + }))); - auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto outputDirectory = TempDirectoryPath::create(); - auto writerPlan = - PlanBuilder() - .values(vectors) - .tableWrite(outputDirectory->path) - .project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}) - .planNode(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto outputDirectory = TempDirectoryPath::create(); + auto writerPlan = + PlanBuilder() + .values(vectors) + .tableWrite(outputDirectory->path) + .project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format( + "sum({})", TableWriteTraits::rowCountColumnName())}) + .planNode(); - AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(queryCtx) - .maxDrivers(1) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kWriterSpillEnabled, "true") - .config( - core::QueryConfig::kWriterFlushThresholdBytes, - folly::to(writerFlushThreshold)) - .plan(std::move(writerPlan)) - .assertResults(fmt::format("SELECT {}", numRows)); - - ASSERT_EQ( - arbitrator_->stats().numFailures, writerFlushThreshold == 0 ? 0 : 1); - ASSERT_EQ( - arbitrator_->stats().numNonReclaimableAttempts, - writerFlushThreshold == 0 ? 0 : 1); - waitForAllTasksToBeDeleted(3'000'000); + AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(queryCtx) + .maxDrivers(1) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWriterSpillEnabled, "true") + .config( + core::QueryConfig::kWriterFlushThresholdBytes, + folly::to(writerFlushThreshold)) + .plan(std::move(writerPlan)) + .assertResults(fmt::format("SELECT {}", numRows)); + + ASSERT_EQ( + arbitrator_->stats().numFailures, writerFlushThreshold == 0 ? 0 : 1); + ASSERT_EQ( + arbitrator_->stats().numNonReclaimableAttempts, + writerFlushThreshold == 0 ? 0 : 1); + waitForAllTasksToBeDeleted(3'000'000); + } + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); + ASSERT_EQ(arbitrator_->stats().numReleaseRequest, numAddPool_); } } @@ -3120,6 +3140,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromNonReclaimableTableWriter) { ASSERT_EQ(arbitrator_->stats().numFailures, 1); ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 1); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } DEBUG_ONLY_TEST_F( @@ -3212,6 +3233,7 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 0); ASSERT_EQ(arbitrator_->stats().numFailures, 0); ASSERT_GT(arbitrator_->stats().numReclaimedBytes, 0); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } DEBUG_ONLY_TEST_F( @@ -3301,6 +3323,7 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(arbitrator_->stats().numFailures, 1); ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 1); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); const auto updatedSpillStats = globalSpillStats(); ASSERT_EQ(updatedSpillStats, spillStats); } @@ -3457,6 +3480,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenRaclaimAndJoinFinish) { waitForAllTasksToBeDeleted(); ASSERT_EQ(arbitrator_->stats().numFailures, 0); ASSERT_EQ(arbitrator_->stats().numReclaimedBytes, 0); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) { @@ -3600,6 +3624,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, joinBuildSpillError) { waitForAllTasksToBeDeleted(); ASSERT_EQ(arbitrator_->stats().numFailures, 1); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numAddPool_); } TEST_F(SharedArbitrationTest, concurrentArbitration) { @@ -3729,4 +3754,37 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { } controlThread.join(); } + +TEST_F(SharedArbitrationTest, reserveReleaseCounter) { + for (int i = 0; i < 37; ++i) { + folly::Random::DefaultGenerator rng(i); + auto numRootPools = folly::Random::rand32(rng) % 11 + 3; + std::vector threads; + threads.reserve(numRootPools); + std::mutex mutex; + setupMemory(kMemoryCapacity, 0); + { + std::vector> queries; + queries.reserve(numRootPools); + for (int j = 0; j < numRootPools; ++j) { + threads.emplace_back([&]() { + { + std::lock_guard l(mutex); + auto oldNum = arbitrator_->stats().numReserveRequest; + queries.emplace_back(newQueryCtx()); + ASSERT_EQ(arbitrator_->stats().numReserveRequest, oldNum + 1); + } + }); + } + + for (auto& queryThread : threads) { + queryThread.join(); + } + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numRootPools); + ASSERT_EQ(arbitrator_->stats().numReleaseRequest, 0); + } + ASSERT_EQ(arbitrator_->stats().numReserveRequest, numRootPools); + ASSERT_EQ(arbitrator_->stats().numReleaseRequest, numRootPools); + } +} } // namespace facebook::velox::exec::test