Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support global arbitration without spilling enabled in arbitrator #11381

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ uint32_t SharedArbitrator::ExtraConfig::globalArbitrationMemoryReclaimPct(
kDefaultGlobalMemoryArbitrationReclaimPct);
}

bool SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<bool>(
configs,
kGlobalArbitrationWithoutSpill,
kDefaultGlobalArbitrationWithoutSpill);
}

double SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(
const std::unordered_map<std::string, std::string>& configs) {
return getConfig<double>(
configs,
kGlobalArbitrationAbortTimeRatio,
kDefaultGlobalArbitrationAbortTimeRatio);
}

SharedArbitrator::SharedArbitrator(const Config& config)
: MemoryArbitrator(config),
reservedCapacity_(ExtraConfig::reservedCapacity(config.extraConfigs)),
Expand All @@ -216,6 +232,10 @@ SharedArbitrator::SharedArbitrator(const Config& config)
ExtraConfig::globalArbitrationEnabled(config.extraConfigs)),
globalArbitrationMemoryReclaimPct_(
ExtraConfig::globalArbitrationMemoryReclaimPct(config.extraConfigs)),
globalArbitrationAbortTimeRatio_(
ExtraConfig::globalArbitrationAbortTimeRatio(config.extraConfigs)),
globalArbitrationWithoutSpill_(
ExtraConfig::globalArbitrationWithoutSpill(config.extraConfigs)),
freeReservedCapacity_(reservedCapacity_),
freeNonReservedCapacity_(capacity_ - freeReservedCapacity_) {
VELOX_CHECK_EQ(kind_, config.kind);
Expand Down Expand Up @@ -252,7 +272,11 @@ SharedArbitrator::SharedArbitrator(const Config& config)
VELOX_MEM_LOG(INFO) << "Arbitration config: max arbitration time "
<< succinctMillis(maxArbitrationTimeMs_)
<< ", global memory reclaim percentage "
<< globalArbitrationMemoryReclaimPct_;
<< globalArbitrationMemoryReclaimPct_
<< ", global arbitration abort time ratio "
<< globalArbitrationAbortTimeRatio_
<< ", global arbitration skip spill "
<< globalArbitrationWithoutSpill_;
}
VELOX_MEM_LOG(INFO) << "Memory pool participant config: "
<< participantConfig_.toString();
Expand Down Expand Up @@ -833,10 +857,22 @@ void SharedArbitrator::globalArbitrationMain() {
VELOX_MEM_LOG(INFO) << "Global arbitration controller stopped";
}

bool SharedArbitrator::globalArbitrationShouldReclaimByAbort(
uint64_t globalRunElapsedTimeMs,
bool hasReclaimedByAbort,
bool allParticipantsReclaimed,
uint64_t lastReclaimedBytes) const {
return globalArbitrationWithoutSpill_ ||
(globalRunElapsedTimeMs >
maxArbitrationTimeMs_ * globalArbitrationAbortTimeRatio_ &&
(hasReclaimedByAbort ||
(allParticipantsReclaimed && lastReclaimedBytes == 0)));
}

void SharedArbitrator::runGlobalArbitration() {
const uint64_t startTimeMs = getCurrentTimeMs();
uint64_t totalReclaimedBytes{0};
bool reclaimByAbort{false};
bool shouldReclaimByAbort{false};
uint64_t reclaimedBytes{0};
std::unordered_set<uint64_t> reclaimedParticipants;
std::unordered_set<uint64_t> failedParticipants;
Expand All @@ -857,19 +893,19 @@ void SharedArbitrator::runGlobalArbitration() {

// Check if we need to abort participant to reclaim used memory to
// accelerate global arbitration.
//
// TODO: make the time based condition check configurable.
reclaimByAbort =
(getCurrentTimeMs() - startTimeMs) > maxArbitrationTimeMs_ / 2 &&
(reclaimByAbort || (allParticipantsReclaimed && reclaimedBytes == 0));
if (!reclaimByAbort) {
shouldReclaimByAbort = globalArbitrationShouldReclaimByAbort(
getCurrentTimeMs() - startTimeMs,
shouldReclaimByAbort,
allParticipantsReclaimed,
reclaimedBytes);
if (shouldReclaimByAbort) {
reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true);
} else {
reclaimedBytes = reclaimUsedMemoryBySpill(
targetBytes,
reclaimedParticipants,
failedParticipants,
allParticipantsReclaimed);
} else {
reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true);
}
totalReclaimedBytes += reclaimedBytes;
reclaimUnusedCapacity();
Expand Down
30 changes: 30 additions & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ class SharedArbitrator : public memory::MemoryArbitrator {
static uint32_t globalArbitrationMemoryReclaimPct(
const std::unordered_map<std::string, std::string>& configs);

/// The ratio used with 'memory-reclaim-max-wait-time', beyond which, global
/// arbitration will no longer reclaim memory by spilling, but instead
/// directly abort. It is only in effect when 'global-arbitration-enabled'
/// is true
static constexpr std::string_view kGlobalArbitrationAbortTimeRatio{
"global-arbitration-abort-time-ratio"};
static constexpr double kDefaultGlobalArbitrationAbortTimeRatio{0.5};
static double globalArbitrationAbortTimeRatio(
const std::unordered_map<std::string, std::string>& configs);

/// If true, global arbitration will not reclaim memory by spilling, but
/// only by aborting. This flag is only effective if
/// 'global-arbitration-enabled' is true
static constexpr std::string_view kGlobalArbitrationWithoutSpill{
"global-arbitration-without-spill"};
static constexpr bool kDefaultGlobalArbitrationWithoutSpill{false};
static bool globalArbitrationWithoutSpill(
const std::unordered_map<std::string, std::string>& configs);

/// If true, do sanity check on the arbitrator state on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
Expand Down Expand Up @@ -377,6 +396,15 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// Invoked by global arbitration control thread to run global arbitration.
void runGlobalArbitration();

// Helper method used by 'runGlobalArbitration()' to decide if current
// iteration of global run should directly reclaim capacity by aborting
// queries.
bool globalArbitrationShouldReclaimByAbort(
uint64_t globalRunElapsedTimeMs,
bool hasReclaimedByAbort,
bool allParticipantsReclaimed,
uint64_t lastReclaimedBytes) const;

// Invoked to get the global arbitration target in bytes.
uint64_t getGlobalArbitrationTarget();

Expand Down Expand Up @@ -565,6 +593,8 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const double memoryReclaimThreadsHwMultiplier_;
const bool globalArbitrationEnabled_;
const uint32_t globalArbitrationMemoryReclaimPct_;
const double globalArbitrationAbortTimeRatio_;
tanjialiang marked this conversation as resolved.
Show resolved Hide resolved
const bool globalArbitrationWithoutSpill_;

// The executor used to reclaim memory from multiple participants in parallel
// at the background for global arbitration or external memory reclamation.
Expand Down
98 changes: 93 additions & 5 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ class MockSharedArbitrationTest : public testing::Test {
kMemoryReclaimThreadsHwMultiplier,
std::function<void(MemoryPool&)> arbitrationStateCheckCb = nullptr,
bool globalArtbitrationEnabled = true,
uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000) {
uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000,
bool globalArbitrationWithoutSpill = false,
double globalArbitrationAbortTimeRatio = 0.5) {
MemoryManagerOptions options;
options.allocatorCapacity = memoryCapacity;
std::string arbitratorKind = "SHARED";
Expand Down Expand Up @@ -483,7 +485,9 @@ class MockSharedArbitrationTest : public testing::Test {
{std::string(ExtraConfig::kMemoryReclaimMaxWaitTime),
folly::to<std::string>(arbitrationTimeoutMs) + "ms"},
{std::string(ExtraConfig::kGlobalArbitrationEnabled),
folly::to<std::string>(globalArtbitrationEnabled)}};
folly::to<std::string>(globalArtbitrationEnabled)},
{std::string(ExtraConfig::kGlobalArbitrationWithoutSpill),
folly::to<std::string>(globalArbitrationWithoutSpill)}};
options.arbitrationStateCheckCb = std::move(arbitrationStateCheckCb);
options.checkUsageLeak = true;
manager_ = std::make_unique<MemoryManager>(options);
Expand Down Expand Up @@ -595,6 +599,14 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultMemoryReclaimThreadsHwMultiplier);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationWithoutSpill);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(
emptyConfigs),
SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationAbortTimeRatio);

// Testing custom values
std::unordered_map<std::string, std::string> configs;
Expand All @@ -620,6 +632,11 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] =
"1.0";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] = "true";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] = "0.8";

ASSERT_EQ(SharedArbitrator::ExtraConfig::reservedCapacity(configs), 100);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(configs),
Expand All @@ -642,6 +659,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
ASSERT_EQ(
SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs),
1.0);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs),
true);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs),
0.8);

// Testing invalid values
configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] =
Expand All @@ -667,6 +690,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] =
"invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] =
"invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] =
"invalid";

VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::reservedCapacity(configs),
Expand Down Expand Up @@ -701,6 +730,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs),
"Failed while parsing SharedArbitrator configs");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs),
"Failed while parsing SharedArbitrator configs");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs),
"Failed while parsing SharedArbitrator configs");
// Invalid memory reclaim executor hw multiplier.
VELOX_ASSERT_THROW(
setupMemory(kMemoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1),
Expand Down Expand Up @@ -1681,8 +1716,9 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
const int64_t maxArbitrationTimeMs = 2'000;
const int64_t abortTimeThresholdMs = maxArbitrationTimeMs / 2;

const double globalArbitrationAbortTimeRatio = 0.5;
const int64_t abortTimeThresholdMs =
maxArbitrationTimeMs * globalArbitrationAbortTimeRatio;
setupMemory(
memoryCapacity,
0,
Expand All @@ -1698,7 +1734,9 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) {
kMemoryReclaimThreadsHwMultiplier,
nullptr,
true,
maxArbitrationTimeMs);
maxArbitrationTimeMs,
false,
globalArbitrationAbortTimeRatio);

test::SharedArbitratorTestHelper arbitratorHelper(arbitrator_);

Expand Down Expand Up @@ -1758,6 +1796,56 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) {
}
}

TEST_F(MockSharedArbitrationTest, globalArbitrationWithoutSpill) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
setupMemory(
memoryCapacity,
0,
memoryPoolInitCapacity,
0,
0,
0,
0,
0,
0,
0,
0,
kMemoryReclaimThreadsHwMultiplier,
nullptr,
true,
5 * 60 * 1'000,
true);
auto triggerTask = addTask(memoryCapacity);
auto* triggerOp = triggerTask->addMemoryOp(false);
triggerOp->allocate(memoryCapacity / 2);

auto abortTask = addTask(memoryCapacity / 2);
auto* abortOp = abortTask->addMemoryOp(true);
abortOp->allocate(memoryCapacity / 2);
ASSERT_EQ(triggerTask->capacity(), memoryCapacity / 2);

std::unordered_map<std::string, RuntimeMetric> runtimeStats;
auto statsWriter = std::make_unique<TestRuntimeStatWriter>(runtimeStats);
setThreadLocalRunTimeStatWriter(statsWriter.get());
triggerOp->allocate(memoryCapacity / 2);

ASSERT_EQ(
runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].count, 1);
ASSERT_GT(runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].sum, 0);
ASSERT_EQ(
runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].count, 1);
ASSERT_EQ(runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].sum, 1);
ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].count, 0);

ASSERT_TRUE(triggerTask->error() == nullptr);
ASSERT_EQ(triggerTask->capacity(), memoryCapacity);
ASSERT_TRUE(abortTask->error() != nullptr);
VELOX_ASSERT_THROW(
std::rethrow_exception(abortTask->error()),
"Memory pool aborted to reclaim used memory");
}

DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, multipleGlobalRuns) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
Expand Down
Loading