Skip to content

Commit

Permalink
Fix the global arbitration check failure when it is disabled (faceboo…
Browse files Browse the repository at this point in the history
…kincubator#11364)

Summary:
Pull Request resolved: facebookincubator#11364

Some memory intensive queries run into check failure on global arbitration flag before they
start to wait for global memory arbitration as it is not enabled in those clusters. The issue is because
of a bug that we fall back to global arbitration when query which fails to allocate memory from the system
and within the capacity limit and is not qualified to do memory reclaim from itself.
This PR fixes this with unit test.

Reviewed By: tanjialiang, arhimondr

Differential Revision: D65073054

fbshipit-source-id: 85eb4d5a3102fde4d8989ea5a0f076b2bc2c04d8
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 28, 2024
1 parent 878388f commit 4a43b02
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
13 changes: 8 additions & 5 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ void SharedArbitrator::shutdownGlobalArbitration() {
}

void SharedArbitrator::wakeupGlobalArbitrationThread() {
VELOX_CHECK(globalArbitrationEnabled_);
checkGlobalArbitrationEnabled();
VELOX_CHECK_NOT_NULL(globalArbitrationController_);
incrementGlobalArbitrationWaitCount();
globalArbitrationThreadCv_.notify_one();
Expand Down Expand Up @@ -721,9 +721,12 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) {
reclaimUnusedCapacity();
RETURN_IF_TRUE(growWithFreeCapacity(op));

if (!globalArbitrationEnabled_ &&
op.participant()->reclaimableUsedCapacity() >=
participantConfig_.minReclaimBytes) {
if (!globalArbitrationEnabled_) {
if (op.participant()->reclaimableUsedCapacity() <
participantConfig_.minReclaimBytes) {
return false;
}

// NOTE: if global memory arbitration is not enabled, we will try to
// reclaim from the participant itself before failing this operation.
reclaim(
Expand All @@ -739,7 +742,7 @@ bool SharedArbitrator::growCapacity(ArbitrationOperation& op) {
}

bool SharedArbitrator::startAndWaitGlobalArbitration(ArbitrationOperation& op) {
VELOX_CHECK(globalArbitrationEnabled_);
checkGlobalArbitrationEnabled();
checkIfTimeout(op);

std::unique_ptr<ArbitrationWait> arbitrationWait;
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ class SharedArbitrator : public memory::MemoryArbitrator {
return state_ == State::kShutdown;
}

FOLLY_ALWAYS_INLINE void checkGlobalArbitrationEnabled() const {
VELOX_CHECK(globalArbitrationEnabled_, "Global arbitration is not enabled");
}

// Invoked to get the arbitration participant by 'name'. The function returns
// std::nullopt if the underlying query memory pool is destroyed.
std::optional<ScopedArbitrationParticipant> getParticipant(
Expand Down
58 changes: 58 additions & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3708,6 +3708,64 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailure) {
}
}

// This test is to verify if a non-reclaimable query fails properly if global
// arbitration is disabled.
TEST_F(
MockSharedArbitrationTest,
arbitrationFailureOnNonReclaimableQueryWithGlobalArbitrationDisabled) {
const int64_t memoryCapacity = 128 * MB;
for (bool hasMinReclaimBytes : {false, true}) {
SCOPED_TRACE(fmt::format("hasMinReclaimBytes {}", hasMinReclaimBytes));
// Set min reclaim bytes to avoid reclaim from itself before fail the
// arbitration.
setupMemory(
memoryCapacity,
0,
0,
0,
0,
0,
0,
0,
hasMinReclaimBytes ? MB : 0,
0,
0,
1.0,
nullptr,
false);
std::shared_ptr<MockTask> task1 = addTask();
MockMemoryOperator* op1 = task1->addMemoryOp(false);
op1->allocate(memoryCapacity / 4 * 3);
ASSERT_EQ(task1->capacity(), memoryCapacity / 4 * 3);

std::shared_ptr<MockTask> task2 = addTask();
MockMemoryOperator* op2 = task2->addMemoryOp(false);
VELOX_ASSERT_THROW(
op2->allocate(memoryCapacity / 2), "Exceeded memory pool capacity ");
}
}

// This test is to verify if a reclaimable query reclaim from itself before
// reaching the capacity limit if global arbitration is disabled.
TEST_F(
MockSharedArbitrationTest,
reclaimBeforeReachCapacityLimitWhenGlobalArbitrationDisabled) {
const int64_t memoryCapacity = 128 * MB;
setupMemory(
memoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1.0, nullptr, false);
std::shared_ptr<MockTask> task1 = addTask();
MockMemoryOperator* op1 = task1->addMemoryOp(true);
op1->allocate(memoryCapacity / 2);
ASSERT_EQ(task1->capacity(), memoryCapacity / 2);

std::shared_ptr<MockTask> task2 = addTask();
MockMemoryOperator* op2 = task2->addMemoryOp(true);
op2->allocate(memoryCapacity / 2);
ASSERT_EQ(task2->capacity(), memoryCapacity / 2);

op2->allocate(memoryCapacity / 4);
}

TEST_F(MockSharedArbitrationTest, concurrentArbitrations) {
const int numTasks = 10;
const int numOpsPerTask = 5;
Expand Down

0 comments on commit 4a43b02

Please sign in to comment.