diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index 47f00178bd67..baf3d5148deb 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -46,6 +46,10 @@ void registerVeloxCounters() { // Track memory reclaim bytes. REPORT_ADD_STAT_EXPORT_TYPE( kCounterMemoryReclaimedBytes, facebook::velox::StatType::SUM); + + // Track the number of times that the memory reclaim wait timeouts. + REPORT_ADD_STAT_EXPORT_TYPE( + kCounterMemoryReclaimWaitTimeoutCount, facebook::velox::StatType::SUM); } } // namespace facebook::velox diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index 83c8eac46416..16d63fffd463 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -20,7 +20,7 @@ namespace facebook::velox { -// Velox Counter Registration +/// Velox Counter Registration void registerVeloxCounters(); constexpr folly::StringPiece kCounterHiveFileHandleGenerateLatencyMs{ @@ -39,4 +39,7 @@ constexpr folly::StringPiece kCounterMemoryReclaimedBytes{ constexpr folly::StringPiece kCounterMemoryReclaimWaitTimeMs{ "velox.memory_reclaim_wait_ms"}; + +constexpr folly::StringPiece kCounterMemoryReclaimWaitTimeoutCount{ + "velox.memory_reclaim_wait_timeout_count"}; } // namespace facebook::velox diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index a284872ba145..bd6a3d2b13a8 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -35,6 +35,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .capacity = std::min(options.queryMemoryCapacity, options.capacity), .memoryPoolInitCapacity = options.memoryPoolInitCapacity, .memoryPoolTransferCapacity = options.memoryPoolTransferCapacity, + .memoryReclaimWaitMs = options.memoryReclaimWaitMs, .arbitrationStateCheckCb = options.arbitrationStateCheckCb})), alignment_(std::max(MemoryAllocator::kMinAlignment, options.alignment)), checkUsageLeak_(options.checkUsageLeak), diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index b6d7b6355d3a..b6335011a0c8 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -114,6 +114,11 @@ struct MemoryManagerOptions { /// during the memory arbitration. uint64_t memoryPoolTransferCapacity{32 << 20}; + /// Specifies the max time to wait for memory reclaim by arbitration. The + /// memory reclaim might fail if the max wait time has exceeded. If it is + /// zero, then there is no timeout. + uint64_t memoryReclaimWaitMs{0}; + /// Provided by the query system to validate the state after a memory pool /// enters arbitration if not null. For instance, Prestissimo provides /// callback to check if a memory arbitration request is issued from a driver diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 4ab6df0bb755..bd7be6d947fb 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -196,8 +196,11 @@ bool MemoryReclaimer::reclaimableBytes( return reclaimable; } -uint64_t -MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) { +uint64_t MemoryReclaimer::reclaim( + MemoryPool* pool, + uint64_t targetBytes, + uint64_t maxWaitMs, + Stats& stats) { if (pool->kind() == MemoryPool::Kind::kLeaf) { return 0; } @@ -230,7 +233,7 @@ MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) { uint64_t reclaimedBytes{0}; for (const auto& candidate : candidates) { - const auto bytes = candidate.pool->reclaim(targetBytes, stats); + const auto bytes = candidate.pool->reclaim(targetBytes, maxWaitMs, stats); reclaimedBytes += bytes; if (targetBytes != 0) { if (bytes >= targetBytes) { diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index d3efb693259b..c35f7d251504 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -63,6 +63,13 @@ class MemoryArbitrator { /// during the memory arbitration. uint64_t memoryPoolTransferCapacity{32 << 20}; + /// Specifies the max time to wait for memory reclaim by arbitration. The + /// memory reclaim might fail if the max time has exceeded. This prevents + /// the memory arbitration from getting stuck when the memory reclaim waits + /// for a hanging query task to pause. If it is zero, then there is no + /// timeout. + uint64_t memoryReclaimWaitMs{0}; + /// Provided by the query system to validate the state after a memory pool /// enters arbitration if not null. For instance, Prestissimo provides /// callback to check if a memory arbitration request is issued from a @@ -223,11 +230,13 @@ class MemoryArbitrator { : capacity_(config.capacity), memoryPoolInitCapacity_(config.memoryPoolInitCapacity), memoryPoolTransferCapacity_(config.memoryPoolTransferCapacity), + memoryReclaimWaitMs_(config.memoryReclaimWaitMs), arbitrationStateCheckCb_(config.arbitrationStateCheckCb) {} const uint64_t capacity_; const uint64_t memoryPoolInitCapacity_; const uint64_t memoryPoolTransferCapacity_; + const uint64_t memoryReclaimWaitMs_; const MemoryArbitrationStateCheckCB arbitrationStateCheckCb_; }; @@ -312,10 +321,18 @@ class MemoryReclaimer { /// Invoked by the memory arbitrator to reclaim from memory 'pool' with /// specified 'targetBytes'. It is expected to reclaim at least that amount of /// memory bytes but there is no guarantees. If 'targetBytes' is zero, then it - /// reclaims all the reclaimable memory from the memory 'pool'. The function - /// returns the actual reclaimed memory bytes. - virtual uint64_t - reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats); + /// reclaims all the reclaimable memory from the memory 'pool'. 'maxWaitMs' + /// specifies the max time to wait for reclaim if not zero. The memory + /// reclaim might fail if exceeds the timeout. The function returns the actual + /// reclaimed memory bytes. + /// + /// NOTE: 'maxWaitMs' is optional and the actual memory reclaim implementation + /// can choose to respect this timeout or not on its own. + virtual uint64_t reclaim( + MemoryPool* pool, + uint64_t targetBytes, + uint64_t maxWaitMs, + Stats& stats); /// Invoked by the memory arbitrator to abort memory 'pool' and the associated /// query execution when encounters non-recoverable memory reclaim error or diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index b35b2bf25115..473b8f8e6e5a 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -935,11 +935,12 @@ bool MemoryPoolImpl::reclaimableBytes(uint64_t& reclaimableBytes) const { uint64_t MemoryPoolImpl::reclaim( uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) { if (reclaimer() == nullptr) { return 0; } - return reclaimer()->reclaim(this, targetBytes, stats); + return reclaimer()->reclaim(this, targetBytes, maxWaitMs, stats); } void MemoryPoolImpl::enterArbitration() { diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index 6da5b13a212c..04ce09b3edec 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -410,10 +410,13 @@ class MemoryPool : public std::enable_shared_from_this { /// with specified reclaim target bytes. If 'targetBytes' is zero, then it /// tries to reclaim all the reclaimable memory from the memory pool. It is /// noop if the reclaimer is not set, otherwise invoke the reclaimer's - /// corresponding method. The function returns the actually freed capacity - /// from the root of this memory pool. + /// corresponding method. If not zero, 'maxWaitMs' specifies the max time in + /// milliseconds to wait for reclaim. The memory reclaim might fail if exceeds + /// the timeout. The function returns the actually freed capacity from the + /// root of this memory pool. virtual uint64_t reclaim( uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) = 0; /// Invoked by the memory arbitrator to abort a root memory pool. The function @@ -636,8 +639,10 @@ class MemoryPoolImpl : public MemoryPool { bool reclaimableBytes(uint64_t& reclaimableBytes) const override; - uint64_t reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) - override; + uint64_t reclaim( + uint64_t targetBytes, + uint64_t maxWaitMs, + memory::MemoryReclaimer::Stats& stats) override; uint64_t shrink(uint64_t targetBytes = 0) override; diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 1b72625600ca..f5501eef2f01 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -323,11 +323,11 @@ TEST_F(MemoryReclaimerTest, common) { uint64_t reclaimableBytes; ASSERT_FALSE(pool->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); - ASSERT_EQ(pool->reclaim(0, stats_), 0); + ASSERT_EQ(pool->reclaim(0, 0, stats_), 0); ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); - ASSERT_EQ(pool->reclaim(100, stats_), 0); + ASSERT_EQ(pool->reclaim(100, 0, stats_), 0); ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); - ASSERT_EQ(pool->reclaim(kMaxMemory, stats_), 0); + ASSERT_EQ(pool->reclaim(kMaxMemory, 0, stats_), 0); ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); } ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); @@ -356,6 +356,7 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer { uint64_t reclaim( MemoryPool* /*unused*/, uint64_t targetBytes, + uint64_t /*unused*/, Stats& stats) noexcept override { std::lock_guard l(mu_); uint64_t reclaimedBytes{0}; @@ -445,7 +446,7 @@ TEST_F(MemoryReclaimerTest, mockReclaim) { const int numReclaims = 5; const int numBytesToReclaim = allocBytes * 3; for (int iter = 0; iter < numReclaims; ++iter) { - const auto reclaimedBytes = root->reclaim(numBytesToReclaim, stats_); + const auto reclaimedBytes = root->reclaim(numBytesToReclaim, 0, stats_); ASSERT_EQ(reclaimedBytes, numBytesToReclaim); ASSERT_EQ(reclaimedBytes, stats_.reclaimedBytes); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); @@ -454,12 +455,12 @@ TEST_F(MemoryReclaimerTest, mockReclaim) { } ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(totalUsedBytes, reclaimableBytes); - ASSERT_EQ(root->reclaim(allocBytes + 1, stats_), 2 * allocBytes); - ASSERT_EQ(root->reclaim(allocBytes - 1, stats_), allocBytes); + ASSERT_EQ(root->reclaim(allocBytes + 1, 0, stats_), 2 * allocBytes); + ASSERT_EQ(root->reclaim(allocBytes - 1, 0, stats_), allocBytes); ASSERT_EQ(3 * allocBytes, stats_.reclaimedBytes); const uint64_t expectedReclaimedBytes = totalUsedBytes; - ASSERT_EQ(root->reclaim(0, stats_), expectedReclaimedBytes); + ASSERT_EQ(root->reclaim(0, 0, stats_), expectedReclaimedBytes); ASSERT_EQ(3 * allocBytes + expectedReclaimedBytes, stats_.reclaimedBytes); ASSERT_EQ(totalUsedBytes, 0); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); @@ -497,7 +498,7 @@ TEST_F(MemoryReclaimerTest, mockReclaimMoreThanAvailable) { ASSERT_EQ(reclaimableBytes, totalUsedBytes); const uint64_t expectedReclaimedBytes = totalUsedBytes; ASSERT_EQ( - root->reclaim(totalUsedBytes + 100, stats_), expectedReclaimedBytes); + root->reclaim(totalUsedBytes + 100, 0, stats_), expectedReclaimedBytes); ASSERT_EQ(expectedReclaimedBytes, stats_.reclaimedBytes); ASSERT_EQ(totalUsedBytes, 0); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); @@ -558,7 +559,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {10, 11, 8, *14*, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, 0, stats_), 2 * allocUnitBytes); ASSERT_EQ(2 * allocUnitBytes, stats_.reclaimedBytes); totalAllocUnits -= 2; @@ -568,7 +569,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {10, 11, 8, *12*, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, 0, stats_), 2 * allocUnitBytes); ASSERT_EQ(4 * allocUnitBytes, stats_.reclaimedBytes); totalAllocUnits -= 2; @@ -578,7 +579,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {10, 11, 8, *4*, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 8 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 8 * allocUnitBytes, 0, stats_), 8 * allocUnitBytes); ASSERT_EQ(12 * allocUnitBytes, stats_.reclaimedBytes); totalAllocUnits -= 8; @@ -588,7 +589,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation gunits are {10, *9*, 8, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, 0, stats_), 2 * allocUnitBytes); totalAllocUnits -= 2; verify({10, 9, 8, 4, 5}); @@ -597,7 +598,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {*7*, 9, 8, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 3 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 3 * allocUnitBytes, 0, stats_), 3 * allocUnitBytes); totalAllocUnits -= 3; verify({7, 9, 8, 4, 5}); @@ -606,7 +607,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child and two from 2nd child. // So expected reclaimable allocation units are {7, *0*, *6*, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 11 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 11 * allocUnitBytes, 0, stats_), 11 * allocUnitBytes); totalAllocUnits -= 11; verify({7, 0, 6, 4, 5}); @@ -615,7 +616,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child and three from 2nd child. // So expected reclaimable allocation units are {*0*, 0, *3*, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes, 0, stats_), 10 * allocUnitBytes); totalAllocUnits -= 10; verify({0, 0, 3, 4, 5}); @@ -624,7 +625,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child and 4 from 4th child and 1 from 2nd. // So expected reclaimable allocation units are {0, 0, 2, *0*, *0*} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes, stats_), + root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes, 0, stats_), 10 * allocUnitBytes); totalAllocUnits -= 10; verify({0, 0, 2, 0, 0}); @@ -633,7 +634,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // cleared. ASSERT_EQ( root->reclaimer()->reclaim( - root.get(), totalAllocUnits * allocUnitBytes, stats_), + root.get(), totalAllocUnits * allocUnitBytes, 0, stats_), totalAllocUnits * allocUnitBytes); totalAllocUnits = 0; verify({0, 0, 0, 0, 0}); @@ -722,7 +723,7 @@ TEST_F(MemoryReclaimerTest, concurrentRandomMockReclaims) { bytesToReclaim = 0; } } - const auto reclaimedBytes = root->reclaim(bytesToReclaim, stats_); + const auto reclaimedBytes = root->reclaim(bytesToReclaim, 0, stats_); totalReclaimedBytes += reclaimedBytes; if (reclaimedBytes < bytesToReclaim) { ASSERT_GT(bytesToReclaim, oldUsedBytes); @@ -761,7 +762,7 @@ TEST_F(MemoryReclaimerTest, concurrentRandomMockReclaims) { ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, totalUsedBytes); - root->reclaim(0, stats_); + root->reclaim(0, 0, stats_); ASSERT_EQ(totalReclaimedBytes + reclaimableBytes, stats_.reclaimedBytes); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index ae06ab1d081c..16b6fb6eccb8 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -2817,9 +2817,9 @@ TEST_P(MemoryPoolTest, reclaimAPIsWithDefaultReclaimer) { uint64_t reclaimableBytes{100}; ASSERT_FALSE(pool->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); - ASSERT_EQ(pool->reclaim(0, stats_), 0); - ASSERT_EQ(pool->reclaim(100, stats_), 0); - ASSERT_EQ(pool->reclaim(kMaxMemory, stats_), 0); + ASSERT_EQ(pool->reclaim(0, 0, stats_), 0); + ASSERT_EQ(pool->reclaim(100, 0, stats_), 0); + ASSERT_EQ(pool->reclaim(kMaxMemory, 0, stats_), 0); } for (const auto& allocation : allocations) { allocation.pool->free(allocation.buffer, allocation.size); diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index b1e2ba05ce12..4ecd6791ab23 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -161,8 +161,11 @@ class MockMemoryOperator { return op_->reclaimableBytes(pool, reclaimableBytes); } - uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) - override { + uint64_t reclaim( + MemoryPool* pool, + uint64_t targetBytes, + uint64_t /*unused*/, + Stats& stats) override { ++numReclaims_; if (!reclaimable_) { return 0; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 69f9ea79485b..47c78dfb00a2 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -868,6 +868,7 @@ bool HiveDataSink::WriterReclaimer::reclaimableBytes( uint64_t HiveDataSink::WriterReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK_EQ(pool->name(), writerInfo_->writerPool->name()); if (!dataSink_->canReclaim()) { @@ -887,7 +888,7 @@ uint64_t HiveDataSink::WriterReclaimer::reclaim( const uint64_t memoryUsageBeforeReclaim = pool->currentBytes(); const std::string memoryUsageTreeBeforeReclaim = pool->treeMemoryUsage(); const auto reclaimedBytes = - exec::MemoryReclaimer::reclaim(pool, targetBytes, stats); + exec::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats); const uint64_t memoryUsageAfterReclaim = pool->currentBytes(); if (memoryUsageAfterReclaim > memoryUsageBeforeReclaim) { VELOX_FAIL( diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 0c5dea8266ef..d07d6615414d 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -454,6 +454,7 @@ class HiveDataSink : public DataSink { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; private: diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 474ab1be5d98..669a5fe759fe 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -671,7 +671,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { if (testData.expectedWriterReclaimed) { ASSERT_TRUE(root_->reclaimableBytes(reclaimableBytes)); ASSERT_GT(reclaimableBytes, 0); - ASSERT_GT(root_->reclaim(256L << 20, stats), 0); + ASSERT_GT(root_->reclaim(256L << 20, 0, stats), 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); ASSERT_GT(stats.reclaimedBytes, 0); // We expect dwrf writer set numNonReclaimableAttempts counter. @@ -679,7 +679,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { } else { ASSERT_FALSE(root_->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); - ASSERT_EQ(root_->reclaim(256L << 20, stats), 0); + ASSERT_EQ(root_->reclaim(256L << 20, 0, stats), 0); ASSERT_EQ(stats.reclaimExecTimeUs, 0); ASSERT_EQ(stats.reclaimedBytes, 0); if (testData.expectedWriterReclaimEnabled) { @@ -830,7 +830,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { ASSERT_FALSE(root_->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); } - ASSERT_EQ(root_->reclaim(1L << 30, stats), 0); + ASSERT_EQ(root_->reclaim(1L << 30, 0, stats), 0); ASSERT_EQ(stats.reclaimExecTimeUs, 0); ASSERT_EQ(stats.reclaimedBytes, 0); if (testData.expectedWriterReclaimEnabled) { diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 144b8e443563..1409ad810c8f 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -146,6 +146,7 @@ bool SortingWriter::MemoryReclaimer::reclaimableBytes( uint64_t SortingWriter::MemoryReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t /*unused*/, memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK_EQ(pool->name(), writer_->sortPool_->name()); diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index 1ffb2f96925b..8dcab763b092 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -57,6 +57,7 @@ class SortingWriter : public Writer { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; private: diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 998ea278f30d..ba0d0973cf77 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -1630,7 +1630,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) { writer->flush(); memory::MemoryReclaimer::Stats stats; const auto oldCapacity = writerPool->capacity(); - writerPool->reclaim(1L << 30, stats); + writerPool->reclaim(1L << 30, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); if (enableReclaim) { ASSERT_LT(writerPool->capacity(), oldCapacity); @@ -1654,16 +1654,16 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) { } if (!enableReclaim) { ASSERT_FALSE(reservationCalled); - ASSERT_EQ(writerPool->reclaim(1L << 30, stats), 0); + ASSERT_EQ(writerPool->reclaim(1L << 30, 0, stats), 0); ASSERT_EQ(stats, memory::MemoryReclaimer::Stats{}); } else { ASSERT_TRUE(reservationCalled); writer->testingNonReclaimableSection() = true; - ASSERT_EQ(writerPool->reclaim(1L << 30, stats), 0); + ASSERT_EQ(writerPool->reclaim(1L << 30, 0, stats), 0); ASSERT_EQ(stats.numNonReclaimableAttempts, 1); writer->testingNonReclaimableSection() = false; stats.numNonReclaimableAttempts = 0; - ASSERT_GT(writerPool->reclaim(1L << 30, stats), 0); + ASSERT_GT(writerPool->reclaim(1L << 30, 0, stats), 0); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); @@ -1847,7 +1847,7 @@ TEST_F(E2EWriterTest, memoryReclaimAfterClose) { memory::MemoryReclaimer::Stats stats; const auto oldCapacity = writerPool->capacity(); - writerPool->reclaim(1L << 30, stats); + writerPool->reclaim(1L << 30, 0, stats); if (testData.expectedNonReclaimableAttempt) { ASSERT_EQ(stats.numNonReclaimableAttempts, 1); } else { @@ -1905,7 +1905,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimDuringInit) { writerPool->reclaimableBytes(reclaimableBytes), reclaimable); memory::MemoryReclaimer::Stats stats; - writerPool->reclaim(1L << 30, stats); + writerPool->reclaim(1L << 30, 0, stats); if (reclaimable) { ASSERT_GE(reclaimableBytes, 0); // We can't reclaim during writer init. @@ -1996,14 +1996,14 @@ TEST_F(E2EWriterTest, memoryReclaimThreshold) { ASSERT_TRUE(writerPool->reclaimer()->reclaimableBytes( *writerPool, reclaimableBytes)); ASSERT_GT(reclaimableBytes, 0); - ASSERT_GT(writerPool->reclaim(1L << 30, stats), 0); + ASSERT_GT(writerPool->reclaim(1L << 30, 0, stats), 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); ASSERT_GT(stats.reclaimedBytes, 0); } else { ASSERT_FALSE(writerPool->reclaimer()->reclaimableBytes( *writerPool, reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); - ASSERT_EQ(writerPool->reclaim(1L << 30, stats), 0); + ASSERT_EQ(writerPool->reclaim(1L << 30, 0, stats), 0); ASSERT_GT(stats.numNonReclaimableAttempts, 0); ASSERT_EQ(stats.reclaimExecTimeUs, 0); ASSERT_EQ(stats.reclaimedBytes, 0); diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index a3e168bd405d..df2605223427 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -197,6 +197,7 @@ class MockMemoryPool : public velox::memory::MemoryPool { } uint64_t reclaim( + uint64_t /*unused*/, uint64_t /*unused*/, velox::memory::MemoryReclaimer::Stats& /*unused*/) override { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 547fbc1f3f52..6ebd2688e37f 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -719,6 +719,7 @@ bool Writer::MemoryReclaimer::reclaimableBytes( uint64_t Writer::MemoryReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t /*unused*/, memory::MemoryReclaimer::Stats& stats) { if (!writer_->canReclaim()) { return 0; diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index e0f984d0fe90..4e5529d7d025 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -158,6 +158,7 @@ class Writer : public dwio::common::Writer { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; private: diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 72af27e468ef..17ed077c2ef4 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -444,46 +444,44 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { auto* rows = table_->rows(); const auto numRows = rows->numRows(); - if (numRows == 0) { - // Skip the memory reservation for the first input as we are lack of memory - // usage stats for estimation. It is safe to skip as the query should have - // sufficient memory initially. - return true; - } auto [freeRows, outOfLineFreeBytes] = rows->freeSpace(); const auto outOfLineBytes = rows->stringAllocator().retainedSize() - outOfLineFreeBytes; const auto outOfLineBytesPerRow = - std::max(1, outOfLineBytes / numRows); - const int64_t flatBytes = input->estimateFlatSize(); + std::max(1, numRows == 0 ? 0 : outOfLineBytes / numRows); + const auto currentUsage = pool()->parent()->currentBytes(); - // Test-only spill path. - if (testingTriggerSpill()) { - numSpillRows_ = std::max(1, numRows / 10); - numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; - return false; - } + if (numRows != 0) { + // Test-only spill path. + if (testingTriggerSpill()) { + numSpillRows_ = std::max(1, numRows / 10); + numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; + return false; + } - // We check usage from the parent pool to take peers' allocations into - // account. - const auto currentUsage = pool()->parent()->currentBytes(); - if (spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) { - const int64_t bytesToSpill = - currentUsage * spillConfig()->spillableReservationGrowthPct / 100; - numSpillRows_ = std::max( - 1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow)); - numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; - return false; + // We check usage from the parent pool to take peers' allocations into + // account. + if (spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) { + const int64_t bytesToSpill = + currentUsage * spillConfig()->spillableReservationGrowthPct / 100; + numSpillRows_ = std::max( + 1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow)); + numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; + return false; + } } const auto minReservationBytes = currentUsage * spillConfig_->minSpillableReservationPct / 100; const auto availableReservationBytes = pool()->availableReservation(); const auto tableIncrementBytes = table_->hashTableSizeIncrease(input->size()); - const auto incrementBytes = - rows->sizeIncrement(input->size(), outOfLineBytes ? flatBytes * 2 : 0) + - tableIncrementBytes; + const int64_t flatBytes = input->estimateFlatSize(); + const auto rowContainerIncrementBytes = numRows == 0 + ? flatBytes * 2 + : rows->sizeIncrement( + input->size(), outOfLineBytes > 0 ? flatBytes * 2 : 0); + const auto incrementBytes = rowContainerIncrementBytes + tableIncrementBytes; // First to check if we have sufficient minimal memory reservation. if (availableReservationBytes >= minReservationBytes) { @@ -517,6 +515,13 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { } } + if (numRows == 0) { + // Nothing we can spill from this hash build operator. + return true; + } + + // TODO: deprecate the spilling after memory reservation fails as we rely on + // memory arbitration to trigger spilling automatically. numSpillRows_ = std::max( 1, targetIncrementBytes / (rows->fixedRowSize() + outOfLineBytesPerRow)); numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index 8a9ae4d7cc41..464f7dbff100 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -173,20 +173,20 @@ bool isLeftNullAwareJoinWithFilter( uint64_t HashJoinMemoryReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) { uint64_t reclaimedBytes{0}; - pool->visitChildren( - [&targetBytes, &reclaimedBytes, &stats](memory::MemoryPool* child) { - VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf); - // The hash probe operator do not support memory reclaim. - if (!isHashBuildMemoryPool(*child)) { - return true; - } - // We only need to reclaim from any one of the hash build operators - // which will reclaim from all the peer hash build operators. - reclaimedBytes = child->reclaim(targetBytes, stats); - return false; - }); + pool->visitChildren([&](memory::MemoryPool* child) { + VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf); + // The hash probe operator do not support memory reclaim. + if (!isHashBuildMemoryPool(*child)) { + return true; + } + // We only need to reclaim from any one of the hash build operators + // which will reclaim from all the peer hash build operators. + reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats); + return false; + }); return reclaimedBytes; } diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index c76a399ebe07..899f8fa5c63f 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -146,6 +146,7 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) final; private: diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 482c84d5b6e0..846d1e91dc10 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -582,6 +582,7 @@ bool Operator::MemoryReclaimer::reclaimableBytes( uint64_t Operator::MemoryReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t /*unused*/, memory::MemoryReclaimer::Stats& stats) { std::shared_ptr driver = ensureDriver(); if (FOLLY_UNLIKELY(driver == nullptr)) { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 3b96009e6a24..2b640fe7c586 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -621,6 +621,7 @@ class Operator : public BaseRuntimeStatWriter { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */) diff --git a/velox/exec/SharedArbitrator.cpp b/velox/exec/SharedArbitrator.cpp index 8b1e89f5230a..4fe9116547e4 100644 --- a/velox/exec/SharedArbitrator.cpp +++ b/velox/exec/SharedArbitrator.cpp @@ -411,7 +411,8 @@ uint64_t SharedArbitrator::reclaim( try { freedBytes = pool->shrink(targetBytes); if (freedBytes < targetBytes) { - pool->reclaim(targetBytes - freedBytes, reclaimerStats); + pool->reclaim( + targetBytes - freedBytes, memoryReclaimWaitMs_, reclaimerStats); } } catch (const std::exception& e) { VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool " diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index dd9836420801..a40611c37112 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -305,6 +305,7 @@ bool TableWriter::ConnectorReclaimer::reclaimableBytes( uint64_t TableWriter::ConnectorReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) { if (!canReclaim_) { return 0; @@ -338,7 +339,7 @@ uint64_t TableWriter::ConnectorReclaimer::reclaim( return 0; } RuntimeStatWriterScopeGuard opStatsGuard(op_); - return memory::MemoryReclaimer::reclaim(pool, targetBytes, stats); + return memory::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats); } // static diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index a76d939a469e..256387c0dfc2 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -165,6 +165,7 @@ class TableWriter : public Operator { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */) diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index bc3bf941a84c..3b3ddc851478 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -824,7 +824,7 @@ void Task::resume(std::shared_ptr self) { // Setting pause requested must be atomic with the resuming so that // suspended sections do not go back on thread during resume. self->pauseRequested_ = false; - if (!self->exception_) { + if (self->exception_ == nullptr) { for (auto& driver : self->drivers_) { if (driver) { if (driver->state().isSuspended) { @@ -2545,22 +2545,15 @@ std::unique_ptr Task::MemoryReclaimer::create( uint64_t Task::MemoryReclaimer::reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) { auto task = ensureTask(); if (FOLLY_UNLIKELY(task == nullptr)) { return 0; } VELOX_CHECK_EQ(task->pool()->name(), pool->name()); - uint64_t reclaimWaitTimeUs{0}; - { - MicrosecondTimer timer{&reclaimWaitTimeUs}; - task->requestPause().wait(); - } - stats.reclaimWaitTimeUs += reclaimWaitTimeUs; - REPORT_ADD_HISTOGRAM_VALUE( - kCounterMemoryReclaimWaitTimeMs, reclaimWaitTimeUs / 1'000); - auto guard = folly::makeGuard([&]() { + auto resumeGuard = folly::makeGuard([&]() { try { Task::resume(task); } catch (const VeloxRuntimeError& exception) { @@ -2568,11 +2561,35 @@ uint64_t Task::MemoryReclaimer::reclaim( << " after memory reclamation: " << exception.message(); } }); + uint64_t reclaimWaitTimeUs{0}; + bool paused{true}; + { + MicrosecondTimer timer{&reclaimWaitTimeUs}; + if (maxWaitMs == 0) { + task->requestPause().wait(); + } else { + paused = task->requestPause().wait(std::chrono::milliseconds(maxWaitMs)); + } + } + VELOX_CHECK(paused || maxWaitMs != 0); + if (!paused) { + REPORT_ADD_STAT_VALUE(kCounterMemoryReclaimWaitTimeoutCount, 1); + VELOX_FAIL( + "Memory reclaim failed to wait for task {} to pause after {} with max timeout {}", + task->taskId(), + succinctMicros(reclaimWaitTimeUs), + succinctMillis(maxWaitMs)); + } + + stats.reclaimWaitTimeUs += reclaimWaitTimeUs; + REPORT_ADD_HISTOGRAM_VALUE( + kCounterMemoryReclaimWaitTimeMs, reclaimWaitTimeUs / 1'000); + // Don't reclaim from a cancelled task as it will terminate soon. if (task->isCancelled()) { return 0; } - return memory::MemoryReclaimer::reclaim(pool, targetBytes, stats); + return memory::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats); } void Task::MemoryReclaimer::abort( @@ -2584,8 +2601,9 @@ void Task::MemoryReclaimer::abort( } VELOX_CHECK_EQ(task->pool()->name(), pool->name()); task->setError(error); + const static int maxTaskAbortWaitUs = 60'000'000; // 60s // Set timeout to zero to infinite wait until task completes. - task->taskCompletionFuture(0).wait(); + task->taskCompletionFuture(maxTaskAbortWaitUs).wait(); memory::MemoryReclaimer::abort(pool, error); } diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 110729ed2ec8..b2636d40dcc8 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -684,6 +684,7 @@ class Task : public std::enable_shared_from_this { uint64_t reclaim( memory::MemoryPool* pool, uint64_t targetBytes, + uint64_t maxWaitMs, memory::MemoryReclaimer::Stats& stats) override; void abort(memory::MemoryPool* pool, const std::exception_ptr& error) diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index d91c5c6dea77..e67cdbd8000d 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -381,7 +381,7 @@ class AggregationTest : public OperatorTestBase { uint64_t targetBytes, memory::MemoryReclaimer::Stats& reclaimerStats) { const auto oldCapacity = op->pool()->capacity(); - op->pool()->reclaim(targetBytes, reclaimerStats); + op->pool()->reclaim(targetBytes, 0, reclaimerStats); dynamic_cast(op->pool()) ->testingSetCapacity(oldCapacity); } @@ -3046,7 +3046,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyInput) { { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); - task->pool()->reclaim(kMaxBytes, stats); + task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); ASSERT_GT(stats.reclaimedBytes, 0); @@ -3116,7 +3116,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { { MemoryReclaimer::Stats stats; SuspendedSection suspendedSection(driver); - task->pool()->reclaim(kMaxBytes, stats); + task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); ASSERT_GT(stats.reclaimedBytes, 0); diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 79cc8ea9781d..10d1ae9834aa 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -847,7 +847,7 @@ class HashJoinTest : public HiveConnectorTestBase { uint64_t targetBytes, memory::MemoryReclaimer::Stats& reclaimerStats) { const auto oldCapacity = op->pool()->capacity(); - op->pool()->reclaim(targetBytes, reclaimerStats); + op->pool()->reclaim(targetBytes, 0, reclaimerStats); dynamic_cast(op->pool()) ->testingSetCapacity(oldCapacity); } diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index f6699f3c3868..6d6aeae304a9 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -222,7 +222,7 @@ class OrderByTest : public OperatorTestBase { uint64_t targetBytes, memory::MemoryReclaimer::Stats& reclaimerStats) { const auto oldCapacity = op->pool()->capacity(); - op->pool()->reclaim(targetBytes, reclaimerStats); + op->pool()->reclaim(targetBytes, 0, reclaimerStats); dynamic_cast(op->pool()) ->testingSetCapacity(oldCapacity); } diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 8a4082e70ba3..2c8585f0690f 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -326,7 +326,8 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { void setupMemory( int64_t memoryCapacity = 0, uint64_t memoryPoolInitCapacity = kMemoryPoolInitCapacity, - uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity) { + uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity, + uint64_t maxReclaimWaitMs = 0) { memoryCapacity = (memoryCapacity != 0) ? memoryCapacity : kMemoryCapacity; allocator_ = std::make_shared(memoryCapacity); MemoryManagerOptions options; @@ -336,6 +337,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { options.capacity = options.capacity; options.memoryPoolInitCapacity = memoryPoolInitCapacity; options.memoryPoolTransferCapacity = memoryPoolTransferCapacity; + options.memoryReclaimWaitMs = maxReclaimWaitMs; options.checkUsageLeak = true; options.arbitrationStateCheckCb = memoryArbitrationStateCheck; memoryManager_ = std::make_unique(options); @@ -384,213 +386,234 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { return queryCtx; } - core::PlanNodePtr hashJoinPlan(const std::vector& vectors) { + // Contains the query result. + struct QueryTestResult { + std::shared_ptr task; + RowVectorPtr data; + core::PlanNodeId planNodeId; + }; + + core::PlanNodePtr hashJoinPlan( + const std::vector& vectors, + core::PlanNodeId& joinNodeId) { auto planNodeIdGenerator = std::make_shared(); return PlanBuilder(planNodeIdGenerator) - .values(vectors) + .values(vectors, true) .project({"c0", "c1", "c2"}) .hashJoin( {"c0"}, {"u1"}, PlanBuilder(planNodeIdGenerator) - .values(vectors) + .values(vectors, true) .project({"c0 AS u0", "c1 AS u1", "c2 AS u2"}) .planNode(), "", {"c0", "c1", "c2"}, core::JoinType::kInner) + .capturePlanNodeId(joinNodeId) .planNode(); } - std::pair> runHashJoinTask( + QueryTestResult runHashJoinTask( const std::vector& vectors, const std::shared_ptr& queryCtx, uint32_t numDrivers, bool enableSpilling, const RowVectorPtr& expectedResult = nullptr) { - const auto plan = hashJoinPlan(vectors); - RowVectorPtr result; + QueryTestResult result; + const auto plan = hashJoinPlan(vectors, result.planNodeId); std::shared_ptr task; if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - result = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kJoinSpillEnabled, "true") - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kJoinSpillEnabled, "true") + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } else { - result = AssertQueryBuilder(plan) - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } if (expectedResult != nullptr) { - assertEqualResults({result}, {expectedResult}); + assertEqualResults({result.data}, {expectedResult}); } - return {result, task}; + return result; } - core::PlanNodePtr aggregationPlan(const std::vector& vectors) { + core::PlanNodePtr aggregationPlan( + const std::vector& vectors, + core::PlanNodeId& aggregateNodeId) { auto planNodeIdGenerator = std::make_shared(); return PlanBuilder(planNodeIdGenerator) .values(vectors) .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) + .capturePlanNodeId(aggregateNodeId) .planNode(); } - std::pair> runAggregateTask( + QueryTestResult runAggregateTask( const std::vector& vectors, const std::shared_ptr& queryCtx, bool enableSpilling, uint32_t numDrivers, const RowVectorPtr& expectedResult = nullptr) { - const auto plan = aggregationPlan(vectors); - RowVectorPtr result; - std::shared_ptr task; + QueryTestResult result; + const auto plan = aggregationPlan(vectors, result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - result = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = + AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } else { - result = AssertQueryBuilder(plan) - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } if (expectedResult != nullptr) { - assertEqualResults({result}, {expectedResult}); + assertEqualResults({result.data}, {expectedResult}); } - return {result, task}; + return result; } - core::PlanNodePtr orderByPlan(const std::vector& vectors) { + core::PlanNodePtr orderByPlan( + const std::vector& vectors, + core::PlanNodeId& orderNodeId) { auto planNodeIdGenerator = std::make_shared(); return PlanBuilder(planNodeIdGenerator) .values(vectors) .project({"c0", "c1", "c2"}) .orderBy({"c2 ASC NULLS LAST"}, false) + .capturePlanNodeId(orderNodeId) .planNode(); } - std::pair> runOrderByTask( + QueryTestResult runOrderByTask( const std::vector& vectors, const std::shared_ptr& queryCtx, uint32_t numDrivers, bool enableSpilling, const RowVectorPtr& expectedResult = nullptr) { - const auto plan = orderByPlan(vectors); - RowVectorPtr result; - std::shared_ptr task; + QueryTestResult result; + const auto plan = orderByPlan(vectors, result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - result = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kOrderBySpillEnabled, "true") - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kOrderBySpillEnabled, "true") + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } else { - result = AssertQueryBuilder(plan) - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } if (expectedResult != nullptr) { - assertEqualResults({result}, {expectedResult}); + assertEqualResults({result.data}, {expectedResult}); } - return {result, task}; + return result; } - core::PlanNodePtr rowNumberPlan(const std::vector& vectors) { + core::PlanNodePtr rowNumberPlan( + const std::vector& vectors, + core::PlanNodeId& rowNumberNodeId) { auto planNodeIdGenerator = std::make_shared(); return PlanBuilder(planNodeIdGenerator) .values(vectors) .rowNumber({"c0"}, 2, false) .project({"c0", "c1"}) + .capturePlanNodeId(rowNumberNodeId) .planNode(); } - std::pair> runRowNumberTask( + QueryTestResult runRowNumberTask( const std::vector& vectors, const std::shared_ptr& queryCtx, uint32_t numDrivers, bool enableSpilling, const RowVectorPtr& expectedResult = nullptr) { - const auto plan = rowNumberPlan(vectors); - RowVectorPtr result; - std::shared_ptr task; + QueryTestResult result; + const auto plan = rowNumberPlan(vectors, result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - result = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kRowNumberSpillEnabled, "true") - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = + AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kRowNumberSpillEnabled, "true") + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } else { - result = AssertQueryBuilder(plan) - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } if (expectedResult != nullptr) { - assertEqualResults({result}, {expectedResult}); + assertEqualResults({result.data}, {expectedResult}); } - return {result, task}; + return result; } - core::PlanNodePtr topNPlan(const std::vector& vectors) { + core::PlanNodePtr topNPlan( + const std::vector& vectors, + core::PlanNodeId& topNodeId) { auto planNodeIdGenerator = std::make_shared(); return PlanBuilder(planNodeIdGenerator) .values(vectors) .project({"c1"}) .topN({"c1 NULLS FIRST"}, 10, false) + .capturePlanNodeId(topNodeId) .planNode(); } - std::pair> runTopNTask( + QueryTestResult runTopNTask( const std::vector& vectors, const std::shared_ptr& queryCtx, uint32_t numDrivers, bool enableSpilling, const RowVectorPtr& expectedResult = nullptr) { - const auto plan = topNPlan(vectors); - RowVectorPtr result; - std::shared_ptr task; + QueryTestResult result; + const auto plan = topNPlan(vectors, result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - result = + result.data = AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") .queryCtx(queryCtx) .maxDrivers(numDrivers) - .copyResults(pool(), task); + .copyResults(pool(), result.task); } else { - result = AssertQueryBuilder(plan) - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } if (expectedResult != nullptr) { - assertEqualResults({result}, {expectedResult}); + assertEqualResults({result.data}, {expectedResult}); } - return {result, task}; + return result; } core::PlanNodePtr writePlan( const std::vector& vectors, - const std::string& outputDirPath) { + const std::string& outputDirPath, + core::PlanNodeId& writeNodeId) { auto planNodeIdGenerator = std::make_shared(); return PlanBuilder(planNodeIdGenerator) .values(vectors) @@ -598,22 +621,22 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { .singleAggregation( {}, {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}) + .capturePlanNodeId(writeNodeId) .planNode(); } - std::pair> runWriteTask( + QueryTestResult runWriteTask( const std::vector& vectors, const std::shared_ptr& queryCtx, uint32_t numDrivers, bool enableSpilling, const RowVectorPtr& expectedResult = nullptr) { + QueryTestResult result; const auto outputDirectory = TempDirectoryPath::create(); - auto plan = writePlan(vectors, outputDirectory->path); - RowVectorPtr result; - std::shared_ptr task; + auto plan = writePlan(vectors, outputDirectory->path, result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); - result = + result.data = AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") @@ -633,17 +656,17 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { "1GB") .queryCtx(queryCtx) .maxDrivers(numDrivers) - .copyResults(pool(), task); + .copyResults(pool(), result.task); } else { - result = AssertQueryBuilder(plan) - .queryCtx(queryCtx) - .maxDrivers(numDrivers) - .copyResults(pool(), task); + result.data = AssertQueryBuilder(plan) + .queryCtx(queryCtx) + .maxDrivers(numDrivers) + .copyResults(pool(), result.task); } if (expectedResult != nullptr) { - assertEqualResults({result}, {expectedResult}); + assertEqualResults({result.data}, {expectedResult}); } - return {result, task}; + return result; } static inline FakeMemoryOperatorFactory* fakeOperatorFactory_; @@ -1829,7 +1852,7 @@ DEBUG_ONLY_TEST_F( } const int numDrivers = 4; const auto expectedResult = - runHashJoinTask(vectors, nullptr, numDrivers, false).first; + runHashJoinTask(vectors, nullptr, numDrivers, false).data; // Whether the tasks are run under the same query context. std::vector sameQueries = {false, true}; @@ -1918,12 +1941,11 @@ DEBUG_ONLY_TEST_F( joinQueryCtx->testingOverrideConfigUnsafe(std::move(config)); std::thread joinThread([&]() { - auto task = runHashJoinTask( - vectors, joinQueryCtx, numDrivers, true, expectedResult) - .second; - auto stats = task->taskStats().pipelineStats; - // Verify that spilling occured. - ASSERT_GT(stats[1].operatorStats[2].spilledBytes, 0); + const auto result = runHashJoinTask( + vectors, joinQueryCtx, numDrivers, true, expectedResult); + auto taskStats = exec::toPlanStats(result.task->taskStats()); + auto& planStats = taskStats.at(result.planNodeId); + ASSERT_GT(planStats.spilledBytes, 0); }); std::thread memThread([&]() { @@ -2948,6 +2970,79 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, tableFileWriteError) { waitForAllTasksToBeDeleted(); } +DEBUG_ONLY_TEST_F(SharedArbitrationTest, taskWaitTimeout) { + const int queryMemoryCapacity = 128 << 20; + // Creates a large number of vectors based on the query capacity to trigger + // memory arbitration. + const auto vectors = newVectors(1'000, queryMemoryCapacity / 2); + const int numDrivers = 4; + const auto expectedResult = + runHashJoinTask(vectors, nullptr, numDrivers, false).data; + + for (uint64_t timeoutMs : {0, 1'000, 30'000}) { + SCOPED_TRACE(fmt::format("timeout {}", succinctMillis(timeoutMs))); + setupMemory(512 << 20, 0, 0, timeoutMs); + + std::shared_ptr queryCtx = newQueryCtx(queryMemoryCapacity); + + // Set test injection to block one hash build operator to inject delay when + // memory reclaim waits for task to pause. + folly::EventCount buildBlockWait; + std::atomic buildBlockWaitFlag{true}; + std::atomic blockOneBuild{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::common::memory::MemoryPoolImpl::maybeReserve", + std::function([&](memory::MemoryPool* pool) { + const std::string re(".*HashBuild"); + if (!RE2::FullMatch(pool->name(), re)) { + return; + } + if (!blockOneBuild.exchange(false)) { + return; + } + buildBlockWait.await([&]() { return !buildBlockWaitFlag.load(); }); + })); + + folly::EventCount taskPauseWait; + std::atomic taskPauseWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Task::requestPauseLocked", + std::function(([&](Task* /*unused*/) { + taskPauseWaitFlag = true; + taskPauseWait.notifyAll(); + }))); + + std::thread queryThread([&]() { + // We expect failure on short time out. + if (timeoutMs == 1'000) { + VELOX_ASSERT_THROW( + runHashJoinTask( + vectors, queryCtx, numDrivers, true, expectedResult), + "Memory reclaim failed to wait"); + } else { + // We expect succeed on large time out or no timeout. + const auto result = runHashJoinTask( + vectors, queryCtx, numDrivers, true, expectedResult); + auto taskStats = exec::toPlanStats(result.task->taskStats()); + auto& planStats = taskStats.at(result.planNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + } + }); + + // Wait for task pause to reach, and then delay for a while before unblock + // the blocked hash build operator. + taskPauseWait.await([&]() { return taskPauseWaitFlag.load(); }); + // Wait for two seconds and expect the short reclaim wait timeout. + std::this_thread::sleep_for(std::chrono::seconds(2)); + // Unblock the blocked build operator to let memory reclaim proceed. + buildBlockWaitFlag = false; + buildBlockWait.notifyAll(); + + queryThread.join(); + waitForAllTasksToBeDeleted(); + } +} + DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { const uint64_t memoryCapacity = 128 * MB; setupMemory(memoryCapacity); @@ -3691,7 +3786,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenRaclaimAndJoinFinish) { // spill after hash table built. memory::MemoryReclaimer::Stats stats; const uint64_t oldCapacity = joinQueryCtx->pool()->capacity(); - task.load()->pool()->reclaim(1'000, stats); + task.load()->pool()->reclaim(1'000, 0, stats); // If the last build memory pool is first child of its parent memory pool, // then memory arbitration (or join node memory pool) will reclaim from the // last build operator first which simply quits as the driver has gone. If @@ -3880,13 +3975,13 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { } const int numDrivers = 4; const auto expectedWriteResult = - runWriteTask(vectors, nullptr, numDrivers, false).first; + runWriteTask(vectors, nullptr, numDrivers, false).data; const auto expectedJoinResult = - runHashJoinTask(vectors, nullptr, numDrivers, false).first; + runHashJoinTask(vectors, nullptr, numDrivers, false).data; const auto expectedOrderResult = - runOrderByTask(vectors, nullptr, numDrivers, false).first; + runOrderByTask(vectors, nullptr, numDrivers, false).data; const auto expectedTopNResult = - runTopNTask(vectors, nullptr, numDrivers, false).first; + runTopNTask(vectors, nullptr, numDrivers, false).data; struct { uint64_t totalCapacity; @@ -3926,19 +4021,19 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { // in a single thread. task = runWriteTask( vectors, queryCtx, numDrivers, true, expectedWriteResult) - .second; + .task; } else if ((i % 4) == 0) { task = runHashJoinTask( vectors, queryCtx, numDrivers, true, expectedJoinResult) - .second; + .task; } else if ((i % 4) == 1) { task = runOrderByTask( vectors, queryCtx, numDrivers, true, expectedOrderResult) - .second; + .task; } else { task = runTopNTask( vectors, queryCtx, numDrivers, true, expectedTopNResult) - .second; + .task; } } catch (const VeloxException& e) { VELOX_CHECK(