Skip to content

Commit

Permalink
Support global arbitration without spilling enabled in arbitrator
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 30, 2024
1 parent 7c93eba commit 54f8975
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 12 deletions.
56 changes: 46 additions & 10 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ uint32_t SharedArbitrator::ExtraConfig::globalArbitrationMemoryReclaimPct(
kDefaultGlobalMemoryArbitrationReclaimPct);
}

bool SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<bool>(
configs,
kGlobalArbitrationWithoutSpill,
kDefaultGlobalArbitrationWithoutSpill);
}

double SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<double>(
configs,
kGlobalArbitrationAbortTimeRatio,
kDefaultGlobalArbitrationAbortTimeRatio);
}

SharedArbitrator::SharedArbitrator(const Config& config)
: MemoryArbitrator(config),
reservedCapacity_(ExtraConfig::reservedCapacity(config.extraConfigs)),
Expand All @@ -216,6 +232,10 @@ SharedArbitrator::SharedArbitrator(const Config& config)
ExtraConfig::globalArbitrationEnabled(config.extraConfigs)),
globalArbitrationMemoryReclaimPct_(
ExtraConfig::globalArbitrationMemoryReclaimPct(config.extraConfigs)),
globalArbitrationAbortTimeRatio_(
ExtraConfig::globalArbitrationAbortTimeRatio(config.extraConfigs)),
globalArbitrationWithoutSpill_(
ExtraConfig::globalArbitrationWithoutSpill(config.extraConfigs)),
freeReservedCapacity_(reservedCapacity_),
freeNonReservedCapacity_(capacity_ - freeReservedCapacity_) {
VELOX_CHECK_EQ(kind_, config.kind);
Expand Down Expand Up @@ -252,7 +272,11 @@ SharedArbitrator::SharedArbitrator(const Config& config)
VELOX_MEM_LOG(INFO) << "Arbitration config: max arbitration time "
<< succinctMillis(maxArbitrationTimeMs_)
<< ", global memory reclaim percentage "
<< globalArbitrationMemoryReclaimPct_;
<< globalArbitrationMemoryReclaimPct_
<< ", global arbitration abort time ratio "
<< globalArbitrationAbortTimeRatio_
<< ", global arbitration skip spill "
<< globalArbitrationWithoutSpill_;
}
VELOX_MEM_LOG(INFO) << "Memory pool participant config: "
<< participantConfig_.toString();
Expand Down Expand Up @@ -833,13 +857,25 @@ void SharedArbitrator::globalArbitrationMain() {
VELOX_MEM_LOG(INFO) << "Global arbitration controller stopped";
}

bool SharedArbitrator::globalArbitrationShouldReclaimByAbort(
uint64_t globalRunElapsedTimeMs,
bool hasReclaimedByAbort,
bool allParticipantsReclaimed,
uint64_t lastReclaimedBytes) const {
return globalArbitrationWithoutSpill_ ||
(globalRunElapsedTimeMs >
maxArbitrationTimeMs_ * globalArbitrationAbortTimeRatio_ &&
(hasReclaimedByAbort ||
(allParticipantsReclaimed && lastReclaimedBytes == 0)));
}

void SharedArbitrator::runGlobalArbitration() {
TestValue::adjust(
"facebook::velox::memory::SharedArbitrator::runGlobalArbitration", this);

const uint64_t startTimeMs = getCurrentTimeMs();
uint64_t totalReclaimedBytes{0};
bool reclaimByAbort{false};
bool shouldReclaimByAbort{false};
uint64_t reclaimedBytes{0};
std::unordered_set<uint64_t> reclaimedParticipants;
std::unordered_set<uint64_t> failedParticipants;
Expand All @@ -857,19 +893,19 @@ void SharedArbitrator::runGlobalArbitration() {

// Check if we need to abort participant to reclaim used memory to
// accelerate global arbitration.
//
// TODO: make the time based condition check configurable.
reclaimByAbort =
(getCurrentTimeMs() - startTimeMs) < maxArbitrationTimeMs_ / 2 &&
(reclaimByAbort || (allParticipantsReclaimed && reclaimedBytes == 0));
if (!reclaimByAbort) {
shouldReclaimByAbort = globalArbitrationShouldReclaimByAbort(
getCurrentTimeMs() - startTimeMs,
shouldReclaimByAbort,
allParticipantsReclaimed,
reclaimedBytes);
if (shouldReclaimByAbort) {
reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true);
} else {
reclaimedBytes = reclaimUsedMemoryBySpill(
targetBytes,
reclaimedParticipants,
failedParticipants,
allParticipantsReclaimed);
} else {
reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true);
}
totalReclaimedBytes += reclaimedBytes;
reclaimUnusedCapacity();
Expand Down
30 changes: 30 additions & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ class SharedArbitrator : public memory::MemoryArbitrator {
static uint32_t globalArbitrationMemoryReclaimPct(
const std::unordered_map<std::string, std::string>& configs);

/// The ratio used with 'memory-reclaim-max-wait-time', beyond which, global
/// arbitration will no longer reclaim memory by spilling, but instead
/// directly abort. It is only in effect when 'global-arbitration-enabled'
/// is true
static constexpr std::string_view kGlobalArbitrationAbortTimeRatio{
"global-arbitration-abort-time-ratio"};
static constexpr double kDefaultGlobalArbitrationAbortTimeRatio{0.5};
static double globalArbitrationAbortTimeRatio(
const std::unordered_map<std::string, std::string>& configs);

/// If true, global arbitration will not reclaim memory by spilling, but
/// only by aborting. This flag is only effective if
/// 'global-arbitration-enabled' is true
static constexpr std::string_view kGlobalArbitrationWithoutSpill{
"global-arbitration-without-spill"};
static constexpr bool kDefaultGlobalArbitrationWithoutSpill{false};
static bool globalArbitrationWithoutSpill(
const std::unordered_map<std::string, std::string>& configs);

/// If true, do sanity check on the arbitrator state on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
Expand Down Expand Up @@ -377,6 +396,15 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// Invoked by global arbitration control thread to run global arbitration.
void runGlobalArbitration();

// Helper method used by 'runGlobalArbitration()' to decide if current
// iteration of global run should directly reclaim capacity by aborting
// queries.
bool globalArbitrationShouldReclaimByAbort(
uint64_t globalRunElapsedTimeMs,
bool hasReclaimedByAbort,
bool allParticipantsReclaimed,
uint64_t lastReclaimedBytes) const;

// Invoked to get the global arbitration target in bytes.
uint64_t getGlobalArbitrationTarget();

Expand Down Expand Up @@ -565,6 +593,8 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const double memoryReclaimThreadsHwMultiplier_;
const bool globalArbitrationEnabled_;
const uint32_t globalArbitrationMemoryReclaimPct_;
const double globalArbitrationAbortTimeRatio_;
const bool globalArbitrationWithoutSpill_;

// The executor used to reclaim memory from multiple participants in parallel
// at the background for global arbitration or external memory reclamation.
Expand Down
88 changes: 86 additions & 2 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ class MockSharedArbitrationTest : public testing::Test {
kMemoryReclaimThreadsHwMultiplier,
std::function<void(MemoryPool&)> arbitrationStateCheckCb = nullptr,
bool globalArtbitrationEnabled = true,
uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000) {
uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000,
bool globalArbitrationWithoutSpill = false) {
MemoryManagerOptions options;
options.allocatorCapacity = memoryCapacity;
std::string arbitratorKind = "SHARED";
Expand Down Expand Up @@ -483,7 +484,9 @@ class MockSharedArbitrationTest : public testing::Test {
{std::string(ExtraConfig::kMemoryReclaimMaxWaitTime),
folly::to<std::string>(arbitrationTimeoutMs) + "ms"},
{std::string(ExtraConfig::kGlobalArbitrationEnabled),
folly::to<std::string>(globalArtbitrationEnabled)}};
folly::to<std::string>(globalArtbitrationEnabled)},
{std::string(ExtraConfig::kGlobalArbitrationWithoutSpill),
folly::to<std::string>(globalArbitrationWithoutSpill)}};
options.arbitrationStateCheckCb = std::move(arbitrationStateCheckCb);
options.checkUsageLeak = true;
manager_ = std::make_unique<MemoryManager>(options);
Expand Down Expand Up @@ -595,6 +598,14 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultMemoryReclaimThreadsHwMultiplier);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationWithoutSpill);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationAbortTimeRatio);

// Testing custom values
std::unordered_map<std::string, std::string> configs;
Expand All @@ -620,6 +631,11 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] =
"1.0";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] = "true";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] = "0.8";

ASSERT_EQ(SharedArbitrator::ExtraConfig::reservedCapacity(configs), 100);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(configs),
Expand All @@ -642,6 +658,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
ASSERT_EQ(
SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs),
1.0);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs),
true);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs),
0.8);

// Testing invalid values
configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] =
Expand All @@ -667,6 +689,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] =
"invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] =
"invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] =
"invalid";

VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::reservedCapacity(configs),
Expand Down Expand Up @@ -701,6 +729,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs),
"Failed while parsing SharedArbitrator configs");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs),
"Failed while parsing SharedArbitrator configs");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs),
"Failed while parsing SharedArbitrator configs");
// Invalid memory reclaim executor hw multiplier.
VELOX_ASSERT_THROW(
setupMemory(kMemoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1),
Expand Down Expand Up @@ -1677,6 +1711,56 @@ DEBUG_ONLY_TEST_F(
ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].sum, 1);
}

TEST_F(MockSharedArbitrationTest, globalArbitrationWithoutSpill) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
setupMemory(
memoryCapacity,
0,
memoryPoolInitCapacity,
0,
0,
0,
0,
0,
0,
0,
0,
kMemoryReclaimThreadsHwMultiplier,
nullptr,
true,
5 * 60 * 1'000,
true);
auto triggerTask = addTask(memoryCapacity);
auto* triggerOp = triggerTask->addMemoryOp(false);
triggerOp->allocate(memoryCapacity / 2);

auto abortTask = addTask(memoryCapacity / 2);
auto* abortOp = abortTask->addMemoryOp(true);
abortOp->allocate(memoryCapacity / 2);
ASSERT_EQ(triggerTask->capacity(), memoryCapacity / 2);

std::unordered_map<std::string, RuntimeMetric> runtimeStats;
auto statsWriter = std::make_unique<TestRuntimeStatWriter>(runtimeStats);
setThreadLocalRunTimeStatWriter(statsWriter.get());
triggerOp->allocate(memoryCapacity / 2);

ASSERT_EQ(
runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].count, 1);
ASSERT_GT(runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].sum, 0);
ASSERT_EQ(
runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].count, 1);
ASSERT_EQ(runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].sum, 1);
ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].count, 0);

ASSERT_TRUE(triggerTask->error() == nullptr);
ASSERT_EQ(triggerTask->capacity(), memoryCapacity);
ASSERT_TRUE(abortTask->error() != nullptr);
VELOX_ASSERT_THROW(
std::rethrow_exception(abortTask->error()),
"Memory pool aborted to reclaim used memory");
}

DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, multipleGlobalRuns) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
Expand Down

0 comments on commit 54f8975

Please sign in to comment.