Skip to content

Commit

Permalink
YQ-3345 support load cpu threshold (ydb-platform#6360)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 12, 2024
1 parent e8fe6aa commit e0d21c5
Show file tree
Hide file tree
Showing 22 changed files with 900 additions and 255 deletions.
183 changes: 58 additions & 125 deletions ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/control_plane_storage/util.h>

#include <ydb/core/kqp/workload_service/common/cpu_quota_manager.h>

#include <ydb/library/services/services.pb.h>

#include <ydb/library/security/ydb_credentials_provider_factory.h>
Expand All @@ -24,17 +26,9 @@ namespace NFq {
class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor> {
struct TCounters {
::NMonitoring::TDynamicCounterPtr Counters;
struct TCommonMetrics {
::NMonitoring::TDynamicCounters::TCounterPtr Ok;
::NMonitoring::TDynamicCounters::TCounterPtr Error;
::NMonitoring::THistogramPtr LatencyMs;
};

TCommonMetrics CpuLoadRequest;
::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr AvailableLoadPercentage;
::NMonitoring::TDynamicCounterPtr SubComponent;

::NMonitoring::THistogramPtr CpuLoadRequestLatencyMs;
::NMonitoring::TDynamicCounters::TCounterPtr TargetLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize;
::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload;
Expand All @@ -48,21 +42,11 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
private:
void Register() {
::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup("component", "ComputeDatabaseMonitoring");
auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
RegisterCommonMetrics(CpuLoadRequest, subComponent);
InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false);
AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false);
QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false);
AvailableLoadPercentage = subComponent->GetCounter("AvailableLoadPercentage", false);
TargetLoadPercentage = subComponent->GetCounter("TargetLoadPercentage", false);
PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false);
PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true);
}

void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) {
metrics.Ok = subComponent->GetCounter("Ok", true);
metrics.Error = subComponent->GetCounter("Error", true);
metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
SubComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
CpuLoadRequestLatencyMs = SubComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
TargetLoadPercentage = SubComponent->GetCounter("TargetLoadPercentage", false);
PendingQueueSize = SubComponent->GetCounter("PendingQueueSize", false);
PendingQueueOverload = SubComponent->GetCounter("PendingQueueOverload", true);
}

static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
Expand All @@ -75,15 +59,19 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
TComputeDatabaseMonitoringActor(const TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters)
: MonitoringClientActorId(monitoringClientActorId)
, Counters(counters)
, MonitoringRequestDelay(GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)))
, AverageLoadInterval(std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)))
, MaxClusterLoad(std::min<ui32>(config.GetMaxClusterLoadPercentage(), 100) / 100.0)
, DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1)
, PendingQueueSize(config.GetPendingQueueSize())
, Strict(config.GetStrict())
, CpuNumber(config.GetCpuNumber())
, CpuQuotaManager(
GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)),
std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)),
TDuration::Zero(),
config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1,
config.GetStrict(),
config.GetCpuNumber(),
Counters.SubComponent
)
{
*Counters.AvailableLoadPercentage = 100;
*Counters.TargetLoadPercentage = static_cast<ui64>(MaxClusterLoad * 100);
}

Expand All @@ -105,54 +93,29 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
)

void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(InstantLoad, AverageLoad);
if (!Ready) {
auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(CpuQuotaManager.GetInstantLoad(), CpuQuotaManager.GetAverageLoad());
if (!CpuQuotaManager.CheckLoadIsOutdated()) {
response->Issues.AddIssue("CPU Load is unavailable");
}
Send(ev->Sender, response.release(), 0, ev->Cookie);
}

void Handle(TEvYdbCompute::TEvCpuLoadResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();

auto now = TInstant::Now();
if (!response.Issues) {
auto delta = now - LastCpuLoad;
LastCpuLoad = now;

if (response.CpuNumber) {
CpuNumber = response.CpuNumber;
}

InstantLoad = response.InstantLoad;
// exponential moving average
if (!Ready || delta >= AverageLoadInterval) {
AverageLoad = InstantLoad;
QuotedLoad = InstantLoad;
} else {
auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue();
AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
}
Ready = true;
Counters.CpuLoadRequest.Ok->Inc();
*Counters.InstantLoadPercentage = static_cast<ui64>(InstantLoad * 100);
*Counters.AverageLoadPercentage = static_cast<ui64>(AverageLoad * 100);
CheckPendingQueue();
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
} else {
if (response.Issues) {
LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString());
Counters.CpuLoadRequest.Error->Inc();
CheckLoadIsOutdated();
}
Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds());
Counters.CpuLoadRequestLatencyMs->Collect((TInstant::Now() - StartCpuLoad).MilliSeconds());

CpuQuotaManager.UpdateCpuLoad(response.InstantLoad, response.CpuNumber, !response.Issues);
CheckPendingQueue();

// TODO: make load pulling reactive
// 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests)
// 2. Active pulling when busy

if (MonitoringRequestDelay) {
Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup());
if (auto delay = CpuQuotaManager.GetMonitoringRequestDelay()) {
Schedule(delay, new NActors::TEvents::TEvWakeup());
} else {
SendCpuLoadRequest();
}
Expand All @@ -164,48 +127,24 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
if (request.Quota > 1.0) {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie);
} else {
if (!request.Quota) {
request.Quota = DefaultQueryLoad;
}
CheckLoadIsOutdated();
if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) {
if (PendingQueue.size() >= PendingQueueSize) {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{
NYql::TIssue{TStringBuilder{}
<< "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100)
<< "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%"
}}), 0, ev->Cookie);
auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad);
CheckPendingQueue();
if (response.Status == NYdb::EStatus::OVERLOADED && PendingQueue.size() < PendingQueueSize) {
PendingQueue.push(ev);
Counters.PendingQueueSize->Inc();
} else {
if (response.Status == NYdb::EStatus::OVERLOADED) {
Counters.PendingQueueOverload->Inc();
} else {
PendingQueue.push(ev);
Counters.PendingQueueSize->Inc();
}
} else {
QuotedLoad += request.Quota;
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie);
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie);
}
}
}

void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
if (CpuNumber) {
auto& request = *ev.Get()->Get();
if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) {
auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber;
auto quota = request.Quota ? request.Quota : DefaultQueryLoad;
if (quota > load) {
auto adjustment = (quota - load) / 2;
if (QuotedLoad > adjustment) {
QuotedLoad -= adjustment;
} else {
QuotedLoad = 0.0;
}
CheckPendingQueue();
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
}
}
}
auto& request = *ev.Get()->Get();
CpuQuotaManager.AdjustCpuQuota(request.Quota, request.Duration, request.CpuSecondsConsumed);
CheckPendingQueue();
}

void SendCpuLoadRequest() {
Expand All @@ -215,57 +154,51 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp

void CheckLoadIsOutdated() {
// TODO: support timeout to decline quota after request pending time is over, not load info
if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) {
Ready = false;
QuotedLoad = 0.0;
if (Strict) {
while (PendingQueue.size()) {
auto& ev = PendingQueue.front();
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
PendingQueue.pop();
Counters.PendingQueueSize->Dec();
}
if (Strict && !CpuQuotaManager.CheckLoadIsOutdated()) {
while (PendingQueue.size()) {
auto& ev = PendingQueue.front();
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
PendingQueue.pop();
Counters.PendingQueueSize->Dec();
}
}
}

void CheckPendingQueue() {
CheckLoadIsOutdated();

auto now = TInstant::Now();
while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) {
while (PendingQueue.size()) {
auto& ev = PendingQueue.front();
auto& request = *ev.Get()->Get();
if (request.Deadline && now >= request.Deadline) {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::CANCELLED, NYql::TIssues{
NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie);
} else {
QuotedLoad += request.Quota;
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie);
auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad);
if (response.Status == NYdb::EStatus::OVERLOADED) {
break;
}

Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie);
}

PendingQueue.pop();
Counters.PendingQueueSize->Dec();
}
}

private:
TInstant StartCpuLoad;
TInstant LastCpuLoad;
TActorId MonitoringClientActorId;
TCounters Counters;

double InstantLoad = 0.0;
double AverageLoad = 0.0;
double QuotedLoad = 0.0;
bool Ready = false;

const TDuration MonitoringRequestDelay;
const TDuration AverageLoadInterval;
const double MaxClusterLoad;
const double DefaultQueryLoad;
const ui32 PendingQueueSize;
const bool Strict;
ui32 CpuNumber = 0;

NKikimr::NKqp::NWorkload::TCpuQuotaManager CpuQuotaManager;
TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue;

TInstant StartCpuLoad;
};

std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/control_plane/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PEERDIR(
ydb/core/fq/libs/compute/ydb/synchronization_service
ydb/core/fq/libs/control_plane_storage/proto
ydb/core/fq/libs/quota_manager/proto
ydb/core/kqp/workload_service/common
ydb/core/protos
ydb/library/db_pool/protos
ydb/library/yql/public/issue
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
};

struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId)
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: Database(database)
, SessionId(sessionId)
, PoolId(poolId)
, Duration(duration)
, CpuConsumed(cpuConsumed)
{}

const TString Database;
const TString SessionId;
const TString PoolId;
const TDuration Duration;
const TDuration CpuConsumed;
};

struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqpWorkloadServiceEvents::EvCleanupResponse> {
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso
}

if (settings.GetObjectId() == NResourcePool::DEFAULT_POOL_ID) {
if (properties.contains("concurrent_query_limit")) {
ythrow yexception() << "Can not change property concurrent_query_limit for default pool";
std::vector<TString> forbiddenProperties = {
"concurrent_query_limit",
"database_load_cpu_threshold"
};
for (const TString& property : forbiddenProperties) {
if (properties.contains(property)) {
ythrow yexception() << "Can not change property " << property << " for default pool";
}
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2071,8 +2071,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
CleanupCtx->Final = isFinal;
CleanupCtx->IsWaitingForWorkloadServiceCleanup = true;

const auto& stats = QueryState->QueryStats;
auto event = std::make_unique<NWorkload::TEvCleanupRequest>(
QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId,
TDuration::MicroSeconds(stats.DurationUs), TDuration::MicroSeconds(stats.WorkerCpuTimeUs)
);

auto forwardId = MakeKqpWorkloadServiceId(SelfId().NodeId());
Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), new NWorkload::TEvCleanupRequest(QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId));
Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), event.release(), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId));
QueryState->PoolHandlerActor = Nothing();
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bo
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Cpu load fetcher actor
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);

} // NKikimr::NKqp::NWorkload
Loading

0 comments on commit e0d21c5

Please sign in to comment.