Skip to content

Commit

Permalink
Fix fast abort condition in global arbitration
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 30, 2024
1 parent df5cff5 commit 899b65f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
8 changes: 4 additions & 4 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,6 @@ void SharedArbitrator::globalArbitrationMain() {
}

void SharedArbitrator::runGlobalArbitration() {
TestValue::adjust(
"facebook::velox::memory::SharedArbitrator::runGlobalArbitration", this);

const uint64_t startTimeMs = getCurrentTimeMs();
uint64_t totalReclaimedBytes{0};
bool reclaimByAbort{false};
Expand All @@ -845,6 +842,9 @@ void SharedArbitrator::runGlobalArbitration() {
std::unordered_set<uint64_t> failedParticipants;
bool allParticipantsReclaimed{false};

TestValue::adjust(
"facebook::velox::memory::SharedArbitrator::runGlobalArbitration", this);

size_t round{0};
for (;; ++round) {
uint64_t arbitrationTimeUs{0};
Expand All @@ -860,7 +860,7 @@ void SharedArbitrator::runGlobalArbitration() {
//
// TODO: make the time based condition check configurable.
reclaimByAbort =
(getCurrentTimeMs() - startTimeMs) < maxArbitrationTimeMs_ / 2 &&
(getCurrentTimeMs() - startTimeMs) > maxArbitrationTimeMs_ / 2 &&
(reclaimByAbort || (allParticipantsReclaimed && reclaimedBytes == 0));
if (!reclaimByAbort) {
reclaimedBytes = reclaimUsedMemoryBySpill(
Expand Down
81 changes: 81 additions & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,87 @@ DEBUG_ONLY_TEST_F(
ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].sum, 1);
}

DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationAbortTimeRatio) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
const int64_t maxArbitrationTimeMs = 2'000;
const int64_t abortTimeThresholdMs = maxArbitrationTimeMs / 2;

setupMemory(
memoryCapacity,
0,
memoryPoolInitCapacity,
0,
0,
0,
0,
0,
0,
0,
0,
kMemoryReclaimThreadsHwMultiplier,
nullptr,
true,
maxArbitrationTimeMs);

test::SharedArbitratorTestHelper arbitratorHelper(arbitrator_);

for (uint64_t pauseTimeMs :
{abortTimeThresholdMs / 2,
(maxArbitrationTimeMs - abortTimeThresholdMs) / 2}) {
auto task1 = addTask(memoryCapacity);
auto* op1 = task1->addMemoryOp(false);
op1->allocate(memoryCapacity / 2);

auto task2 = addTask(memoryCapacity / 2);
auto* op2 = task2->addMemoryOp(false);
op2->allocate(memoryCapacity / 2);

SCOPED_TESTVALUE_SET(
"facebook::velox::memory::SharedArbitrator::runGlobalArbitration",
std::function<void(const SharedArbitrator*)>(
([&](const SharedArbitrator* /*unused*/) {
std::this_thread::sleep_for(
std::chrono::milliseconds(pauseTimeMs));
})));

std::unordered_map<std::string, RuntimeMetric> runtimeStats;
auto statsWriter = std::make_unique<TestRuntimeStatWriter>(runtimeStats);
setThreadLocalRunTimeStatWriter(statsWriter.get());
const auto prevGlobalArbitrationRuns =
arbitratorHelper.globalArbitrationRuns();
op1->allocate(memoryCapacity / 2);

ASSERT_EQ(
runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].count, 1);
ASSERT_GT(
runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].sum, 0);
ASSERT_EQ(
runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].count, 1);
ASSERT_EQ(
runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].sum, 1);
ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].count, 0);
ASSERT_TRUE(task1->error() == nullptr);
ASSERT_EQ(task1->capacity(), memoryCapacity);
ASSERT_TRUE(task2->error() != nullptr);
VELOX_ASSERT_THROW(
std::rethrow_exception(task2->error()),
"Memory pool aborted to reclaim used memory");

const auto deltaGlobalArbitrationRuns =
arbitratorHelper.globalArbitrationRuns() - prevGlobalArbitrationRuns;
if (pauseTimeMs < abortTimeThresholdMs) {
ASSERT_GT(deltaGlobalArbitrationRuns, 2);
} else {
// In SharedArbitrator::runGlobalArbitration()
// First loop attempting spill, global run update.
// Second loop abort, resume waiter. Global run update and the assert
// below is a race condition, hence ASSERT_LE
ASSERT_LE(deltaGlobalArbitrationRuns, 2);
}
}
}

DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, multipleGlobalRuns) {
const int64_t memoryCapacity = 512 << 20;
const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/tests/SharedArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class SharedArbitratorTestHelper {
return arbitrator_->globalArbitrationController_.get();
}

uint64_t globalArbitrationRuns() const {
return arbitrator_->globalArbitrationRuns_;
}

bool hasShutdown() const {
std::lock_guard<std::mutex> l(arbitrator_->stateLock_);
return arbitrator_->hasShutdownLocked();
Expand Down

0 comments on commit 899b65f

Please sign in to comment.