diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 3492259b70fc..c9c8a1fd9700 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -194,6 +194,22 @@ uint32_t SharedArbitrator::ExtraConfig::globalArbitrationMemoryReclaimPct( kDefaultGlobalMemoryArbitrationReclaimPct); } +bool SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill( + const std::unordered_map& configs) { + return getConfig( + configs, + kGlobalArbitrationWithoutSpill, + kDefaultGlobalArbitrationWithoutSpill); +} + +double SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio( + const std::unordered_map& configs) { + return getConfig( + configs, + kGlobalArbitrationAbortTimeRatio, + kDefaultGlobalArbitrationAbortTimeRatio); +} + SharedArbitrator::SharedArbitrator(const Config& config) : MemoryArbitrator(config), reservedCapacity_(ExtraConfig::reservedCapacity(config.extraConfigs)), @@ -216,6 +232,10 @@ SharedArbitrator::SharedArbitrator(const Config& config) ExtraConfig::globalArbitrationEnabled(config.extraConfigs)), globalArbitrationMemoryReclaimPct_( ExtraConfig::globalArbitrationMemoryReclaimPct(config.extraConfigs)), + globalArbitrationAbortTimeRatio_( + ExtraConfig::globalArbitrationAbortTimeRatio(config.extraConfigs)), + globalArbitrationWithoutSpill_( + ExtraConfig::globalArbitrationWithoutSpill(config.extraConfigs)), freeReservedCapacity_(reservedCapacity_), freeNonReservedCapacity_(capacity_ - freeReservedCapacity_) { VELOX_CHECK_EQ(kind_, config.kind); @@ -252,7 +272,11 @@ SharedArbitrator::SharedArbitrator(const Config& config) VELOX_MEM_LOG(INFO) << "Arbitration config: max arbitration time " << succinctMillis(maxArbitrationTimeMs_) << ", global memory reclaim percentage " - << globalArbitrationMemoryReclaimPct_; + << globalArbitrationMemoryReclaimPct_ + << ", global arbitration abort time ratio " + << globalArbitrationAbortTimeRatio_ + << ", global arbitration skip spill " + << globalArbitrationWithoutSpill_; } VELOX_MEM_LOG(INFO) << "Memory pool participant config: " << participantConfig_.toString(); @@ -833,10 +857,22 @@ void SharedArbitrator::globalArbitrationMain() { VELOX_MEM_LOG(INFO) << "Global arbitration controller stopped"; } +bool SharedArbitrator::globalArbitrationShouldReclaimByAbort( + uint64_t globalRunElapsedTimeMs, + bool hasReclaimedByAbort, + bool allParticipantsReclaimed, + uint64_t lastReclaimedBytes) const { + return globalArbitrationWithoutSpill_ || + (globalRunElapsedTimeMs > + maxArbitrationTimeMs_ * globalArbitrationAbortTimeRatio_ && + (hasReclaimedByAbort || + (allParticipantsReclaimed && lastReclaimedBytes == 0))); +} + void SharedArbitrator::runGlobalArbitration() { const uint64_t startTimeMs = getCurrentTimeMs(); uint64_t totalReclaimedBytes{0}; - bool reclaimByAbort{false}; + bool shouldReclaimByAbort{false}; uint64_t reclaimedBytes{0}; std::unordered_set reclaimedParticipants; std::unordered_set failedParticipants; @@ -857,19 +893,19 @@ void SharedArbitrator::runGlobalArbitration() { // Check if we need to abort participant to reclaim used memory to // accelerate global arbitration. - // - // TODO: make the time based condition check configurable. - reclaimByAbort = - (getCurrentTimeMs() - startTimeMs) > maxArbitrationTimeMs_ / 2 && - (reclaimByAbort || (allParticipantsReclaimed && reclaimedBytes == 0)); - if (!reclaimByAbort) { + shouldReclaimByAbort = globalArbitrationShouldReclaimByAbort( + getCurrentTimeMs() - startTimeMs, + shouldReclaimByAbort, + allParticipantsReclaimed, + reclaimedBytes); + if (shouldReclaimByAbort) { + reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true); + } else { reclaimedBytes = reclaimUsedMemoryBySpill( targetBytes, reclaimedParticipants, failedParticipants, allParticipantsReclaimed); - } else { - reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true); } totalReclaimedBytes += reclaimedBytes; reclaimUnusedCapacity(); diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 7d43e2b6eb1e..14f97d81b4e1 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -199,6 +199,25 @@ class SharedArbitrator : public memory::MemoryArbitrator { static uint32_t globalArbitrationMemoryReclaimPct( const std::unordered_map& configs); + /// The ratio used with 'memory-reclaim-max-wait-time', beyond which, global + /// arbitration will no longer reclaim memory by spilling, but instead + /// directly abort. It is only in effect when 'global-arbitration-enabled' + /// is true + static constexpr std::string_view kGlobalArbitrationAbortTimeRatio{ + "global-arbitration-abort-time-ratio"}; + static constexpr double kDefaultGlobalArbitrationAbortTimeRatio{0.5}; + static double globalArbitrationAbortTimeRatio( + const std::unordered_map& configs); + + /// If true, global arbitration will not reclaim memory by spilling, but + /// only by aborting. This flag is only effective if + /// 'global-arbitration-enabled' is true + static constexpr std::string_view kGlobalArbitrationWithoutSpill{ + "global-arbitration-without-spill"}; + static constexpr bool kDefaultGlobalArbitrationWithoutSpill{false}; + static bool globalArbitrationWithoutSpill( + const std::unordered_map& configs); + /// If true, do sanity check on the arbitrator state on destruction. /// /// TODO: deprecate this flag after all the existing memory leak use cases @@ -377,6 +396,15 @@ class SharedArbitrator : public memory::MemoryArbitrator { // Invoked by global arbitration control thread to run global arbitration. void runGlobalArbitration(); + // Helper method used by 'runGlobalArbitration()' to decide if current + // iteration of global run should directly reclaim capacity by aborting + // queries. + bool globalArbitrationShouldReclaimByAbort( + uint64_t globalRunElapsedTimeMs, + bool hasReclaimedByAbort, + bool allParticipantsReclaimed, + uint64_t lastReclaimedBytes) const; + // Invoked to get the global arbitration target in bytes. uint64_t getGlobalArbitrationTarget(); @@ -565,6 +593,8 @@ class SharedArbitrator : public memory::MemoryArbitrator { const double memoryReclaimThreadsHwMultiplier_; const bool globalArbitrationEnabled_; const uint32_t globalArbitrationMemoryReclaimPct_; + const double globalArbitrationAbortTimeRatio_; + const bool globalArbitrationWithoutSpill_; // The executor used to reclaim memory from multiple participants in parallel // at the background for global arbitration or external memory reclamation. diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index e25bf562803d..75ad335eb23a 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -450,7 +450,9 @@ class MockSharedArbitrationTest : public testing::Test { kMemoryReclaimThreadsHwMultiplier, std::function arbitrationStateCheckCb = nullptr, bool globalArtbitrationEnabled = true, - uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000) { + uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000, + bool globalArbitrationWithoutSpill = false, + double globalArbitrationAbortTimeRatio = 0.5) { MemoryManagerOptions options; options.allocatorCapacity = memoryCapacity; std::string arbitratorKind = "SHARED"; @@ -483,7 +485,9 @@ class MockSharedArbitrationTest : public testing::Test { {std::string(ExtraConfig::kMemoryReclaimMaxWaitTime), folly::to(arbitrationTimeoutMs) + "ms"}, {std::string(ExtraConfig::kGlobalArbitrationEnabled), - folly::to(globalArtbitrationEnabled)}}; + folly::to(globalArtbitrationEnabled)}, + {std::string(ExtraConfig::kGlobalArbitrationWithoutSpill), + folly::to(globalArbitrationWithoutSpill)}}; options.arbitrationStateCheckCb = std::move(arbitrationStateCheckCb); options.checkUsageLeak = true; manager_ = std::make_unique(options); @@ -595,6 +599,14 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier( emptyConfigs), SharedArbitrator::ExtraConfig::kDefaultMemoryReclaimThreadsHwMultiplier); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill( + emptyConfigs), + SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationWithoutSpill); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio( + emptyConfigs), + SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationAbortTimeRatio); // Testing custom values std::unordered_map configs; @@ -620,6 +632,11 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { configs[std::string( SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] = "1.0"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] = "true"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] = "0.8"; + ASSERT_EQ(SharedArbitrator::ExtraConfig::reservedCapacity(configs), 100); ASSERT_EQ( SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(configs), @@ -642,6 +659,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { ASSERT_EQ( SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs), 1.0); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs), + true); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs), + 0.8); // Testing invalid values configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] = @@ -667,6 +690,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { configs[std::string( SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] = "invalid"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] = + "invalid"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] = + "invalid"; VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::reservedCapacity(configs), @@ -701,6 +730,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs), "Failed while parsing SharedArbitrator configs"); + VELOX_ASSERT_THROW( + SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs), + "Failed while parsing SharedArbitrator configs"); + VELOX_ASSERT_THROW( + SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs), + "Failed while parsing SharedArbitrator configs"); // Invalid memory reclaim executor hw multiplier. VELOX_ASSERT_THROW( setupMemory(kMemoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1), @@ -1681,8 +1716,9 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { const int64_t memoryCapacity = 512 << 20; const uint64_t memoryPoolInitCapacity = memoryCapacity / 2; const int64_t maxArbitrationTimeMs = 2'000; - const int64_t abortTimeThresholdMs = maxArbitrationTimeMs / 2; - + const double globalArbitrationAbortTimeRatio = 0.5; + const int64_t abortTimeThresholdMs = + maxArbitrationTimeMs * globalArbitrationAbortTimeRatio; setupMemory( memoryCapacity, 0, @@ -1698,7 +1734,9 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { kMemoryReclaimThreadsHwMultiplier, nullptr, true, - maxArbitrationTimeMs); + maxArbitrationTimeMs, + false, + globalArbitrationAbortTimeRatio); test::SharedArbitratorTestHelper arbitratorHelper(arbitrator_); @@ -1758,6 +1796,56 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) { } } +TEST_F(MockSharedArbitrationTest, globalArbitrationWithoutSpill) { + const int64_t memoryCapacity = 512 << 20; + const uint64_t memoryPoolInitCapacity = memoryCapacity / 2; + setupMemory( + memoryCapacity, + 0, + memoryPoolInitCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + kMemoryReclaimThreadsHwMultiplier, + nullptr, + true, + 5 * 60 * 1'000, + true); + auto triggerTask = addTask(memoryCapacity); + auto* triggerOp = triggerTask->addMemoryOp(false); + triggerOp->allocate(memoryCapacity / 2); + + auto abortTask = addTask(memoryCapacity / 2); + auto* abortOp = abortTask->addMemoryOp(true); + abortOp->allocate(memoryCapacity / 2); + ASSERT_EQ(triggerTask->capacity(), memoryCapacity / 2); + + std::unordered_map runtimeStats; + auto statsWriter = std::make_unique(runtimeStats); + setThreadLocalRunTimeStatWriter(statsWriter.get()); + triggerOp->allocate(memoryCapacity / 2); + + ASSERT_EQ( + runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].count, 1); + ASSERT_GT(runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].sum, 0); + ASSERT_EQ( + runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].count, 1); + ASSERT_EQ(runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].sum, 1); + ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].count, 0); + + ASSERT_TRUE(triggerTask->error() == nullptr); + ASSERT_EQ(triggerTask->capacity(), memoryCapacity); + ASSERT_TRUE(abortTask->error() != nullptr); + VELOX_ASSERT_THROW( + std::rethrow_exception(abortTask->error()), + "Memory pool aborted to reclaim used memory"); +} + DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, multipleGlobalRuns) { const int64_t memoryCapacity = 512 << 20; const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;