Skip to content

Commit

Permalink
YQ WM improved overload issues (ydb-platform#8437)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Aug 29, 2024
1 parent 9c8c951 commit 8a5c10d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 39 deletions.
40 changes: 25 additions & 15 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0));
}

void OnCleanup() {
void OnCleanup(bool resetConfigCounters) {
ActivePoolHandlers->Dec();

InFlightLimit->Set(0);
QueueSizeLimit->Set(0);
LoadCpuThreshold->Set(0);
if (resetConfigCounters) {
InFlightLimit->Set(0);
QueueSizeLimit->Set(0);
LoadCpuThreshold->Set(0);
}
}

private:
Expand Down Expand Up @@ -136,7 +138,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
STRICT_STFUNC(StateFuncBase,
// Workload service events
sFunc(TEvents::TEvPoison, HandlePoison);
sFunc(TEvPrivate::TEvStopPoolHandler, HandleStop);
hFunc(TEvPrivate::TEvStopPoolHandler, Handle);
hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
hFunc(TEvPrivate::TEvUpdatePoolSubscription, Handle);

Expand All @@ -160,7 +162,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {

SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);

Counters.OnCleanup();
Counters.OnCleanup(ResetCountersOnStrop);

TBase::PassAway();
}
Expand All @@ -171,8 +173,9 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
this->PassAway();
}

void HandleStop() {
void Handle(TEvPrivate::TEvStopPoolHandler::TPtr& ev) {
LOG_I("Got stop pool handler request, waiting for " << LocalSessions.size() << " requests");
ResetCountersOnStrop = ev->Get()->ResetCounters;
if (LocalSessions.empty()) {
PassAway();
} else {
Expand Down Expand Up @@ -332,7 +335,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
if (!request->Started && request->State != TRequest::EState::Finishing) {
if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) {
status = Ydb::StatusIds::CANCELLED;
issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId);
issues.AddIssue(TStringBuilder() << "Request was delayed during " << TInstant::Now() - request->StartTime << ", that is larger than delay deadline " << PoolConfig.QueryCancelAfter << " in pool " << PoolId << ", request was canceled");
}
ReplyContinue(request, status, issues);
return;
Expand Down Expand Up @@ -515,6 +518,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
ui64 LocalInFlight = 0;
std::unordered_map<TString, TRequest> LocalSessions;
bool StopHandler = false; // Stop than all requests finished
bool ResetCountersOnStrop = true;
};


Expand Down Expand Up @@ -622,8 +626,13 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
}

void OnScheduleRequest(TRequest* request) override {
if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
if (PendingRequests.size() >= MAX_PENDING_REQUESTS) {
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", that is larger than allowed limit " << MAX_PENDING_REQUESTS);
return;
}

if (SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending/delayed requests is " << GetLocalSessionsCount() - GetLocalInFlight() << ", that is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId);
return;
}

Expand Down Expand Up @@ -742,15 +751,15 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor

if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) {
RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) {
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", number of global delayed/running requests is " << GlobalState.AmountRequests() << ", sum of them is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId);
});
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
}

if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) {
RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this](TRequest* request) {
RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this, delayedRequestsCount](TRequest* request) {
AddFinishedRequest(request->SessionId);
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local delayed requests is " << delayedRequestsCount << ", that is larger than allowed limit " << QueueSizeLimit << " for pool " << PoolId);
});
}

Expand Down Expand Up @@ -787,9 +796,10 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
if (!ev->Get()->QuotaAccepted) {
LOG_D("Skipped request start due to load cpu threshold");
if (static_cast<EStartRequestCase>(ev->Cookie) == EStartRequestCase::Pending) {
ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this](TRequest* request) {
NYql::TIssues issues = GroupIssues(ev->Get()->Issues, TStringBuilder() << "Request was rejected, failed to request CPU quota for pool " << PoolId << ", current CPU threshold is " << 100.0 * ev->Get()->MaxClusterLoad << "%");
ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this, issues](TRequest* request) {
AddFinishedRequest(request->SessionId);
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, issues);
});
}
RefreshState();
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/kqp/workload_service/common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ struct TEvPrivate {
};

struct TEvStopPoolHandler : public NActors::TEventLocal<TEvStopPoolHandler, EvStopPoolHandler> {
explicit TEvStopPoolHandler(bool resetCounters)
: ResetCounters(resetCounters)
{}

const bool ResetCounters;
};

struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
Expand Down Expand Up @@ -196,11 +201,15 @@ struct TEvPrivate {
};

struct TEvCpuQuotaResponse : public NActors::TEventLocal<TEvCpuQuotaResponse, EvCpuQuotaResponse> {
explicit TEvCpuQuotaResponse(bool quotaAccepted)
explicit TEvCpuQuotaResponse(bool quotaAccepted, double maxClusterLoad, NYql::TIssues issues)
: QuotaAccepted(quotaAccepted)
, MaxClusterLoad(maxClusterLoad)
, Issues(std::move(issues))
{}

const bool QuotaAccepted;
const double MaxClusterLoad;
const NYql::TIssues Issues;
};

struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

if (auto poolState = GetPoolState(database, poolId)) {
if (poolState->NewPoolHandler) {
Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler());
Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
}
poolState->NewPoolHandler = ev->Get()->NewHandler;
poolState->UpdateHandler();
Expand Down Expand Up @@ -443,7 +443,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
for (const auto& [poolKey, poolState] : PoolIdToState) {
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) {
CpuQuotaManager->CleanupHandler(poolState.PoolHandler);
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler());
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true));
poolsToDelete.emplace_back(poolKey);
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/workload_service/kqp_workload_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct TPoolState {
return;
}

ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler());
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
PoolHandler = *NewPoolHandler;
NewPoolHandler = std::nullopt;
InFlightRequests = 0;
Expand Down Expand Up @@ -160,7 +160,7 @@ struct TCpuQuotaManagerState {
auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad);

bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS;
ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted), 0, coockie);
ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);

// Schedule notification
if (!quotaAccepted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ struct TSampleQueries {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

template <typename TResult>
static void CheckOverloaded(const TResult& result, const TString& poolId) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::OVERLOADED, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Too many pending requests for pool " << poolId);
}

template <typename TResult>
static void CheckCancelled(const TResult& result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
Expand Down
35 changes: 22 additions & 13 deletions ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
}
UNIT_ASSERT_C(firstRequest.HasValue(), "One of two requests shoud be rejected");
UNIT_ASSERT_C(!secondRequest.HasValue(), "One of two requests shoud be placed in pool");
TSampleQueries::CheckOverloaded(firstRequest.GetResult(), ydb->GetSettings().PoolId_);

auto result = firstRequest.GetResult();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 2, number of global delayed/running requests is 1, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);

return secondRequest;
}
Expand Down Expand Up @@ -114,10 +117,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
ydb->WaitQueryExecution(hangingRequest);

TSampleQueries::CheckOverloaded(
ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
ydb->GetSettings().PoolId_
);
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);

ydb->ContinueQueryExecution(hangingRequest);
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
Expand All @@ -142,10 +144,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
ydb->WaitQueryExecution(asyncResult);
}

TSampleQueries::CheckOverloaded(
ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
ydb->GetSettings().PoolId_
);
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is " << inFlight << ", sum of them is larger than allowed limit 0 (including concurrent query limit " << inFlight << ") for pool " << ydb->GetSettings().PoolId_);

for (const auto& asyncResult : asyncResults) {
ydb->ContinueQueryExecution(asyncResult);
Expand Down Expand Up @@ -230,7 +231,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {

auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Delay deadline exceeded in pool " << ydb->GetSettings().PoolId_);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was delayed during");
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << ", that is larger than delay deadline 10.000000s in pool " << ydb->GetSettings().PoolId_ << ", request was canceled");
}

Y_UNIT_TEST(TestCpuLoadThresholdRefresh) {
Expand Down Expand Up @@ -289,7 +291,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceDistributed) {
ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1});

// Check distributed queue size
TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0)), ydb->GetSettings().PoolId_);
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0));
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 2, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);

ydb->ContinueQueryExecution(delayedRequest);
ydb->ContinueQueryExecution(hangingRequest);
Expand Down Expand Up @@ -359,7 +363,9 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
);
ydb->WaitQueryExecution(hangingRequest);

TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId)), poolId);
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId));
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << poolId);

ydb->ContinueQueryExecution(hangingRequest);
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
Expand Down Expand Up @@ -401,7 +407,10 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
QUEUE_SIZE=0
);
)");
TSampleQueries::CheckOverloaded(delayedRequest.GetResult(), ydb->GetSettings().PoolId_);

auto result = delayedRequest.GetResult();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local delayed requests is 1, that is larger than allowed limit 0 for pool " << ydb->GetSettings().PoolId_);

ydb->ContinueQueryExecution(hangingRequest);
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
Expand Down

0 comments on commit 8a5c10d

Please sign in to comment.