diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp index f752bef3160e..33cec012a0f5 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp @@ -1,6 +1,8 @@ #include #include +#include + #include #include @@ -24,17 +26,9 @@ namespace NFq { class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped { struct TCounters { ::NMonitoring::TDynamicCounterPtr Counters; - struct TCommonMetrics { - ::NMonitoring::TDynamicCounters::TCounterPtr Ok; - ::NMonitoring::TDynamicCounters::TCounterPtr Error; - ::NMonitoring::THistogramPtr LatencyMs; - }; - - TCommonMetrics CpuLoadRequest; - ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage; - ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage; - ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage; - ::NMonitoring::TDynamicCounters::TCounterPtr AvailableLoadPercentage; + ::NMonitoring::TDynamicCounterPtr SubComponent; + + ::NMonitoring::THistogramPtr CpuLoadRequestLatencyMs; ::NMonitoring::TDynamicCounters::TCounterPtr TargetLoadPercentage; ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize; ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload; @@ -48,21 +42,11 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrappedGetSubgroup("component", "ComputeDatabaseMonitoring"); - auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest"); - RegisterCommonMetrics(CpuLoadRequest, subComponent); - InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false); - AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false); - QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false); - AvailableLoadPercentage = subComponent->GetCounter("AvailableLoadPercentage", false); - TargetLoadPercentage = subComponent->GetCounter("TargetLoadPercentage", false); - PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false); - PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true); - } - - void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) { - metrics.Ok = subComponent->GetCounter("Ok", true); - metrics.Error = subComponent->GetCounter("Error", true); - metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + SubComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest"); + CpuLoadRequestLatencyMs = SubComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + TargetLoadPercentage = SubComponent->GetCounter("TargetLoadPercentage", false); + PendingQueueSize = SubComponent->GetCounter("PendingQueueSize", false); + PendingQueueOverload = SubComponent->GetCounter("PendingQueueOverload", true); } static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { @@ -75,15 +59,19 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1))) , MaxClusterLoad(std::min(config.GetMaxClusterLoadPercentage(), 100) / 100.0) - , DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1) , PendingQueueSize(config.GetPendingQueueSize()) , Strict(config.GetStrict()) - , CpuNumber(config.GetCpuNumber()) + , CpuQuotaManager( + GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)), + std::max(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)), + TDuration::Zero(), + config.GetDefaultQueryLoadPercentage() ? std::min(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1, + config.GetStrict(), + config.GetCpuNumber(), + Counters.SubComponent + ) { - *Counters.AvailableLoadPercentage = 100; *Counters.TargetLoadPercentage = static_cast(MaxClusterLoad * 100); } @@ -105,8 +93,8 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped(InstantLoad, AverageLoad); - if (!Ready) { + auto response = std::make_unique(CpuQuotaManager.GetInstantLoad(), CpuQuotaManager.GetAverageLoad()); + if (!CpuQuotaManager.CheckLoadIsOutdated()) { response->Issues.AddIssue("CPU Load is unavailable"); } Send(ev->Sender, response.release(), 0, ev->Cookie); @@ -114,45 +102,20 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrappedGet(); - - auto now = TInstant::Now(); - if (!response.Issues) { - auto delta = now - LastCpuLoad; - LastCpuLoad = now; - - if (response.CpuNumber) { - CpuNumber = response.CpuNumber; - } - - InstantLoad = response.InstantLoad; - // exponential moving average - if (!Ready || delta >= AverageLoadInterval) { - AverageLoad = InstantLoad; - QuotedLoad = InstantLoad; - } else { - auto ratio = static_cast(delta.GetValue()) / AverageLoadInterval.GetValue(); - AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad; - QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad; - } - Ready = true; - Counters.CpuLoadRequest.Ok->Inc(); - *Counters.InstantLoadPercentage = static_cast(InstantLoad * 100); - *Counters.AverageLoadPercentage = static_cast(AverageLoad * 100); - CheckPendingQueue(); - *Counters.QuotedLoadPercentage = static_cast(QuotedLoad * 100); - } else { + if (response.Issues) { LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString()); - Counters.CpuLoadRequest.Error->Inc(); - CheckLoadIsOutdated(); } - Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds()); + Counters.CpuLoadRequestLatencyMs->Collect((TInstant::Now() - StartCpuLoad).MilliSeconds()); + + CpuQuotaManager.UpdateCpuLoad(response.InstantLoad, response.CpuNumber, !response.Issues); + CheckPendingQueue(); // TODO: make load pulling reactive // 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests) // 2. Active pulling when busy - if (MonitoringRequestDelay) { - Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup()); + if (auto delay = CpuQuotaManager.GetMonitoringRequestDelay()) { + Schedule(delay, new NActors::TEvents::TEvWakeup()); } else { SendCpuLoadRequest(); } @@ -164,48 +127,24 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped 1.0) { Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie); } else { - if (!request.Quota) { - request.Quota = DefaultQueryLoad; - } - CheckLoadIsOutdated(); - if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) { - if (PendingQueue.size() >= PendingQueueSize) { - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{ - NYql::TIssue{TStringBuilder{} - << "Cluster is overloaded, current quoted load " << static_cast(QuotedLoad * 100) - << "%, average load " << static_cast(AverageLoad * 100) << "%" - }}), 0, ev->Cookie); + auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad); + CheckPendingQueue(); + if (response.Status == NYdb::EStatus::OVERLOADED && PendingQueue.size() < PendingQueueSize) { + PendingQueue.push(ev); + Counters.PendingQueueSize->Inc(); + } else { + if (response.Status == NYdb::EStatus::OVERLOADED) { Counters.PendingQueueOverload->Inc(); - } else { - PendingQueue.push(ev); - Counters.PendingQueueSize->Inc(); } - } else { - QuotedLoad += request.Quota; - *Counters.QuotedLoadPercentage = static_cast(QuotedLoad * 100); - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie); + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie); } } } void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) { - if (CpuNumber) { - auto& request = *ev.Get()->Get(); - if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) { - auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber; - auto quota = request.Quota ? request.Quota : DefaultQueryLoad; - if (quota > load) { - auto adjustment = (quota - load) / 2; - if (QuotedLoad > adjustment) { - QuotedLoad -= adjustment; - } else { - QuotedLoad = 0.0; - } - CheckPendingQueue(); - *Counters.QuotedLoadPercentage = static_cast(QuotedLoad * 100); - } - } - } + auto& request = *ev.Get()->Get(); + CpuQuotaManager.AdjustCpuQuota(request.Quota, request.Duration, request.CpuSecondsConsumed); + CheckPendingQueue(); } void SendCpuLoadRequest() { @@ -215,57 +154,51 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped AverageLoadInterval) { - Ready = false; - QuotedLoad = 0.0; - if (Strict) { - while (PendingQueue.size()) { - auto& ev = PendingQueue.front(); - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie); - PendingQueue.pop(); - Counters.PendingQueueSize->Dec(); - } + if (Strict && !CpuQuotaManager.CheckLoadIsOutdated()) { + while (PendingQueue.size()) { + auto& ev = PendingQueue.front(); + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie); + PendingQueue.pop(); + Counters.PendingQueueSize->Dec(); } } } void CheckPendingQueue() { + CheckLoadIsOutdated(); + auto now = TInstant::Now(); - while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) { + while (PendingQueue.size()) { auto& ev = PendingQueue.front(); auto& request = *ev.Get()->Get(); if (request.Deadline && now >= request.Deadline) { Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::CANCELLED, NYql::TIssues{ NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie); } else { - QuotedLoad += request.Quota; - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie); + auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad); + if (response.Status == NYdb::EStatus::OVERLOADED) { + break; + } + + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie); } + PendingQueue.pop(); Counters.PendingQueueSize->Dec(); } } private: - TInstant StartCpuLoad; - TInstant LastCpuLoad; TActorId MonitoringClientActorId; TCounters Counters; - - double InstantLoad = 0.0; - double AverageLoad = 0.0; - double QuotedLoad = 0.0; - bool Ready = false; - - const TDuration MonitoringRequestDelay; - const TDuration AverageLoadInterval; const double MaxClusterLoad; - const double DefaultQueryLoad; const ui32 PendingQueueSize; const bool Strict; - ui32 CpuNumber = 0; + NKikimr::NKqp::NWorkload::TCpuQuotaManager CpuQuotaManager; TQueue PendingQueue; + + TInstant StartCpuLoad; }; std::unique_ptr CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) { diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make index a77f4292a98a..fcfb13f095ef 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/core/fq/libs/compute/ydb/synchronization_service ydb/core/fq/libs/control_plane_storage/proto ydb/core/fq/libs/quota_manager/proto + ydb/core/kqp/workload_service/common ydb/core/protos ydb/library/db_pool/protos ydb/library/yql/public/issue diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index bdde6089d1ef..6ef1bfe6cf45 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -41,15 +41,19 @@ struct TEvContinueRequest : public NActors::TEventLocal { - TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId) + TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed) : Database(database) , SessionId(sessionId) , PoolId(poolId) + , Duration(duration) + , CpuConsumed(cpuConsumed) {} const TString Database; const TString SessionId; const TString PoolId; + const TDuration Duration; + const TDuration CpuConsumed; }; struct TEvCleanupResponse : public NActors::TEventLocal { diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp index 42c4687a19c6..69447732cfc8 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp @@ -138,8 +138,14 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso } if (settings.GetObjectId() == NResourcePool::DEFAULT_POOL_ID) { - if (properties.contains("concurrent_query_limit")) { - ythrow yexception() << "Can not change property concurrent_query_limit for default pool"; + std::vector forbiddenProperties = { + "concurrent_query_limit", + "database_load_cpu_threshold" + }; + for (const TString& property : forbiddenProperties) { + if (properties.contains(property)) { + ythrow yexception() << "Can not change property " << property << " for default pool"; + } } } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 436195c0bae5..c25dd11a536c 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -2071,8 +2071,15 @@ class TKqpSessionActor : public TActorBootstrapped { } CleanupCtx->Final = isFinal; CleanupCtx->IsWaitingForWorkloadServiceCleanup = true; + + const auto& stats = QueryState->QueryStats; + auto event = std::make_unique( + QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId, + TDuration::MicroSeconds(stats.DurationUs), TDuration::MicroSeconds(stats.WorkerCpuTimeUs) + ); + auto forwardId = MakeKqpWorkloadServiceId(SelfId().NodeId()); - Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), new NWorkload::TEvCleanupRequest(QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId)); + Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), event.release(), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId)); QueryState->PoolHandlerActor = Nothing(); } diff --git a/ydb/core/kqp/workload_service/actors/actors.h b/ydb/core/kqp/workload_service/actors/actors.h index 7bae3b142656..5d9c576889c0 100644 --- a/ydb/core/kqp/workload_service/actors/actors.h +++ b/ydb/core/kqp/workload_service/actors/actors.h @@ -15,4 +15,7 @@ NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bo NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken); NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl); +// Cpu load fetcher actor +NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId); + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/actors/cpu_load_actors.cpp b/ydb/core/kqp/workload_service/actors/cpu_load_actors.cpp new file mode 100644 index 000000000000..425c8cfbd30e --- /dev/null +++ b/ydb/core/kqp/workload_service/actors/cpu_load_actors.cpp @@ -0,0 +1,77 @@ +#include "actors.h" + +#include + +#include + + +namespace NKikimr::NKqp::NWorkload { + +namespace { + +class TCpuLoadFetcherActor : public NKikimr::TQueryBase { + using TBase = NKikimr::TQueryBase; + +public: + TCpuLoadFetcherActor() + : TBase(NKikimrServices::KQP_WORKLOAD_SERVICE) + { + SetOperationInfo(__func__, ""); + } + + void OnRunQuery() override { + TString sql = TStringBuilder() << R"( + -- TCpuLoadFetcherActor::OnRunQuery + + SELECT + SUM(CpuThreads) AS ThreadsCount, + SUM(CpuThreads * (1.0 - CpuIdle)) AS TotalLoad + FROM `.sys/nodes`; + )"; + + RunDataQuery(sql); + } + + void OnQueryResult() override { + if (ResultSets.size() != 1) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + NYdb::TResultSetParser result(ResultSets[0]); + if (!result.TryNextRow()) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + ThreadsCount = result.ColumnParser("ThreadsCount").GetOptionalUint64().GetOrElse(0); + TotalLoad = result.ColumnParser("TotalLoad").GetOptionalDouble().GetOrElse(0.0); + + if (!ThreadsCount) { + Finish(Ydb::StatusIds::NOT_FOUND, "Cpu info not found"); + return; + } + + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status == Ydb::StatusIds::SUCCESS) { + Send(Owner, new TEvPrivate::TEvCpuLoadResponse(Ydb::StatusIds::SUCCESS, TotalLoad / ThreadsCount, ThreadsCount, std::move(issues))); + } else { + Send(Owner, new TEvPrivate::TEvCpuLoadResponse(status, 0.0, 0, std::move(issues))); + } + } + +private: + double TotalLoad = 0.0; + ui64 ThreadsCount = 0; +}; + +} // anonymous namespace + +IActor* CreateCpuLoadFetcherActor(const TActorId& replyActorId) { + return new TQueryRetryActor(replyActorId); +} + +} // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 76b72ceee20b..bfbb2096944f 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -42,6 +42,9 @@ class TPoolHandlerActorBase : public TActor { EState State = EState::Pending; bool Started = false; // after TEvContinueRequest success bool CleanupRequired = false; + bool UsedCpuQuota = false; + TDuration Duration; + TDuration CpuConsumed; }; public: @@ -54,7 +57,6 @@ class TPoolHandlerActorBase : public TActor { , QueueSizeLimit(GetMaxQueueSize(poolConfig)) , InFlightLimit(GetMaxInFlight(poolConfig)) , PoolConfig(poolConfig) - , CancelAfter(poolConfig.QueryCancelAfter) { RegisterCounters(); } @@ -118,8 +120,8 @@ class TPoolHandlerActorBase : public TActor { } LOG_D("Received new request, worker id: " << workerActorId << ", session id: " << sessionId); - if (CancelAfter) { - this->Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(sessionId)); + if (auto cancelAfter = PoolConfig.QueryCancelAfter) { + this->Schedule(cancelAfter, new TEvPrivate::TEvCancelRequest(sessionId)); } TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second; @@ -146,8 +148,10 @@ class TPoolHandlerActorBase : public TActor { return; } request->State = TRequest::EState::Finishing; + request->Duration = ev->Get()->Duration; + request->CpuConsumed = ev->Get()->CpuConsumed; - LOG_D("Received cleanup request, worker id: " << workerActorId << ", session id: " << sessionId); + LOG_D("Received cleanup request, worker id: " << workerActorId << ", session id: " << sessionId << ", duration: " << request->Duration << ", cpu consumed: " << request->CpuConsumed); OnCleanupRequest(request); } @@ -213,7 +217,7 @@ class TPoolHandlerActorBase : public TActor { ContinueError->Inc(); LOG_W("Reply continue error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } - RemoveRequest(request->SessionId); + RemoveRequest(request); } LocalDelayedRequests->Dec(); @@ -246,7 +250,7 @@ class TPoolHandlerActorBase : public TActor { ReplyCleanup(request, status, issues); } - RemoveRequest(request->SessionId); + RemoveRequest(request); } protected: @@ -273,9 +277,13 @@ class TPoolHandlerActorBase : public TActor { return nullptr; } - void RemoveRequest(const TString& sessionId) { - LocalSessions.erase(sessionId); - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvFinishRequestInPool(Database, PoolId)); + void RemoveRequest(TRequest* request) { + auto event = std::make_unique( + Database, PoolId, request->Duration, request->CpuConsumed, request->UsedCpuQuota + ); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), event.release()); + + LocalSessions.erase(request->SessionId); if (StopHandler && LocalSessions.empty()) { LOG_I("All requests finished, stop handler"); PassAway(); @@ -291,10 +299,17 @@ class TPoolHandlerActorBase : public TActor { } TMaybe GetWaitDeadline(TInstant startTime) const { - if (!CancelAfter) { + if (auto cancelAfter = PoolConfig.QueryCancelAfter) { + return startTime + cancelAfter; + } + return Nothing(); + } + + TMaybe GetLoadCpuThreshold() const { + if (PoolConfig.DatabaseLoadCpuThreshold < 0.0) { return Nothing(); } - return startTime + CancelAfter; + return PoolConfig.DatabaseLoadCpuThreshold; } TString LogPrefix() const { @@ -349,7 +364,6 @@ class TPoolHandlerActorBase : public TActor { LOG_D("Pool config has changed, queue size: " << poolConfig.QueueSize << ", in flight limit: " << poolConfig.ConcurrentQueryLimit); PoolConfig = poolConfig; - CancelAfter = poolConfig.QueryCancelAfter; QueueSizeLimit = GetMaxQueueSize(poolConfig); InFlightLimit = GetMaxInFlight(poolConfig); RefreshState(true); @@ -398,7 +412,6 @@ class TPoolHandlerActorBase : public TActor { private: NResourcePool::TPoolSettings PoolConfig; - TDuration CancelAfter; // Scheme board settings std::unique_ptr WatchPathId; @@ -436,7 +449,7 @@ class TUnlimitedPoolHandlerActor : public TPoolHandlerActorBase::max(); + return 0 < InFlightLimit && (InFlightLimit < std::numeric_limits::max() || GetLoadCpuThreshold()); } void OnScheduleRequest(TRequest* request) override { @@ -452,6 +465,11 @@ class TUnlimitedPoolHandlerActor : public TPoolHandlerActorBase { using TBase = TPoolHandlerActorBase; + enum class EStartRequestCase { + Pending, + Delayed + }; + static constexpr ui64 MAX_PENDING_REQUESTS = 1000; public: @@ -466,6 +484,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseGetTypeRewrite()) { sFunc(TEvents::TEvWakeup, HandleRefreshState); sFunc(TEvPrivate::TEvRefreshPoolState, HandleExternalRefreshState); + hFunc(TEvPrivate::TEvCpuQuotaResponse, Handle); hFunc(TEvPrivate::TEvTablesCreationFinished, Handle); hFunc(TEvPrivate::TEvRefreshPoolStateResponse, Handle); @@ -486,7 +505,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase::max(); + return InFlightLimit == 0 || (InFlightLimit == std::numeric_limits::max() && !GetLoadCpuThreshold()); } void OnScheduleRequest(TRequest* request) override { @@ -594,7 +613,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase QueueSizeLimit) { RemoveBackRequests(PendingRequests, std::min(GlobalState.DelayedRequests + PendingRequests.size() - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) { @@ -611,7 +630,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseInc(); LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId); - DoStartDelayedRequest(); + DoStartDelayedRequest(GetLoadCpuThreshold()); RefreshState(); }; + void Handle(TEvPrivate::TEvCpuQuotaResponse::TPtr& ev) { + RunningOperation = false; + + if (!ev->Get()->QuotaAccepted) { + LOG_D("Skipped request start due to load cpu threshold"); + if (static_cast(ev->Cookie) == EStartRequestCase::Pending) { + ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this](TRequest* request) { + AddFinishedRequest(request->SessionId); + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); + }); + } + RefreshState(); + return; + } + + RemoveFinishedRequests(); + switch (static_cast(ev->Cookie)) { + case EStartRequestCase::Pending: + if (!RunningOperation && !DelayedRequests.empty()) { + RunningOperation = true; + const TString& sessionId = DelayedRequests.front(); + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup)); + GetRequest(sessionId)->CleanupRequired = true; + } + break; + + case EStartRequestCase::Delayed: + DoStartDelayedRequest(Nothing()); + break; + } + + RefreshState(); + } + void Handle(TEvPrivate::TEvStartRequestResponse::TPtr& ev) { RunningOperation = false; @@ -668,6 +721,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSessionId == sessionId) { + request->UsedCpuQuota = !!GetLoadCpuThreshold(); requestFound = true; GlobalState.RunningRequests++; GlobalInFly->Inc(); @@ -706,7 +760,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase loadCpuThreshold) { RemoveFinishedRequests(); if (RunningOperation) { return; @@ -715,13 +769,17 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup)); DelayedRequests.emplace_front(sessionId); - GetRequest(sessionId)->CleanupRequired = true; + if (loadCpuThreshold) { + RequestCpuQuota(*loadCpuThreshold, EStartRequestCase::Pending); + } else { + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup)); + GetRequest(sessionId)->CleanupRequired = true; + } } } - void DoStartDelayedRequest() { + void DoStartDelayedRequest(TMaybe loadCpuThreshold) { RemoveFinishedRequests(); if (RunningOperation) { return; @@ -729,7 +787,11 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, CountersSubgroup)); + if (loadCpuThreshold) { + RequestCpuQuota(*loadCpuThreshold, EStartRequestCase::Delayed); + } else { + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, CountersSubgroup)); + } } } @@ -770,6 +832,10 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSchedule(LEASE_DURATION / 2, new TEvents::TEvWakeup()); } + void RequestCpuQuota(double loadCpuThreshold, EStartRequestCase requestCase) const { + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvCpuQuotaRequest(loadCpuThreshold / 100.0), 0, static_cast(requestCase)); + } + private: void RemoveFinishedRequests() { if (RunningOperation) { @@ -859,7 +925,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase + + +namespace NKikimr::NKqp::NWorkload { + +//// TCpuQuotaManager::TCounters + +TCpuQuotaManager::TCounters::TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent) + : SubComponent(subComponent) +{ + Register(); +} + +void TCpuQuotaManager::TCounters::Register() { + RegisterCommonMetrics(CpuLoadRequest); + InstantLoadPercentage = SubComponent->GetCounter("InstantLoadPercentage", false); + AverageLoadPercentage = SubComponent->GetCounter("AverageLoadPercentage", false); + QuotedLoadPercentage = SubComponent->GetCounter("QuotedLoadPercentage", false); +} + +void TCpuQuotaManager::TCounters::RegisterCommonMetrics(TCommonMetrics& metrics) const { + metrics.Ok = SubComponent->GetCounter("Ok", true); + metrics.Error = SubComponent->GetCounter("Error", true); +} + +//// TCpuQuotaManager::TCpuQuotaResponse + +TCpuQuotaManager::TCpuQuotaResponse::TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status, NYql::TIssues issues) + : CurrentLoad(currentLoad) + , Status(status) + , Issues(std::move(issues)) +{} + +//// TCpuQuotaManager + +TCpuQuotaManager::TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui64 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent) + : Counters(subComponent) + , MonitoringRequestDelay(monitoringRequestDelay) + , AverageLoadInterval(averageLoadInterval) + , IdleTimeout(idleTimeout) + , DefaultQueryLoad(defaultQueryLoad) + , Strict(strict) + , CpuNumber(cpuNumber) +{} + +double TCpuQuotaManager::GetInstantLoad() const { + return InstantLoad; +} + +double TCpuQuotaManager::GetAverageLoad() const { + return AverageLoad; +} + +TDuration TCpuQuotaManager::GetMonitoringRequestDelay() const { + return GetMonitoringRequestTime() - TInstant::Now(); +} + +TInstant TCpuQuotaManager::GetMonitoringRequestTime() const { + TDuration delay = MonitoringRequestDelay; + if (IdleTimeout && TInstant::Now() - LastRequestCpuQuota > IdleTimeout) { + delay = AverageLoadInterval / 2; + } + + return LastUpdateCpuLoad ? LastUpdateCpuLoad + delay : TInstant::Now(); +} + +void TCpuQuotaManager::UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success) { + auto now = TInstant::Now(); + LastUpdateCpuLoad = now; + + if (!success) { + Counters.CpuLoadRequest.Error->Inc(); + CheckLoadIsOutdated(); + return; + } + + auto delta = now - LastCpuLoad; + LastCpuLoad = now; + + if (cpuNumber) { + CpuNumber = cpuNumber; + } + + InstantLoad = instantLoad; + // exponential moving average + if (!Ready || delta >= AverageLoadInterval) { + AverageLoad = InstantLoad; + QuotedLoad = InstantLoad; + } else { + auto ratio = static_cast(delta.GetValue()) / AverageLoadInterval.GetValue(); + AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad; + QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad; + } + Ready = true; + Counters.CpuLoadRequest.Ok->Inc(); + Counters.InstantLoadPercentage->Set(static_cast(InstantLoad * 100)); + Counters.AverageLoadPercentage->Set(static_cast(AverageLoad * 100)); + Counters.QuotedLoadPercentage->Set(static_cast(QuotedLoad * 100)); +} + +bool TCpuQuotaManager::CheckLoadIsOutdated() { + if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) { + Ready = false; + QuotedLoad = 0.0; + Counters.QuotedLoadPercentage->Set(0); + } + return Ready; +} + +bool TCpuQuotaManager::HasCpuQuota(double maxClusterLoad) { + LastRequestCpuQuota = TInstant::Now(); + return maxClusterLoad == 0.0 || ((Ready || !Strict) && QuotedLoad < maxClusterLoad); +} + +TCpuQuotaManager::TCpuQuotaResponse TCpuQuotaManager::RequestCpuQuota(double quota, double maxClusterLoad) { + if (quota < 0.0 || quota > 1.0) { + return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder() << "Incorrect quota value (exceeds 1.0 or less than 0.0) " << quota)}); + } + quota = quota ? quota : DefaultQueryLoad; + + CheckLoadIsOutdated(); + if (!HasCpuQuota(maxClusterLoad)) { + return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder() + << "Cluster is overloaded, current quoted load " << static_cast(QuotedLoad * 100) + << "%, average load " << static_cast(AverageLoad * 100) << "%" + )}); + } + + QuotedLoad += quota; + Counters.QuotedLoadPercentage->Set(static_cast(QuotedLoad * 100)); + return TCpuQuotaResponse(QuotedLoad * 100); +} + +void TCpuQuotaManager::AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed) { + if (!CpuNumber) { + return; + } + + if (duration && duration < AverageLoadInterval / 2 && quota <= 1.0) { + quota = quota ? quota : DefaultQueryLoad; + auto load = (cpuSecondsConsumed * 1000.0 / duration.MilliSeconds()) / CpuNumber; + if (quota > load) { + auto adjustment = (quota - load) / 2; + if (QuotedLoad > adjustment) { + QuotedLoad -= adjustment; + } else { + QuotedLoad = 0.0; + } + Counters.QuotedLoadPercentage->Set(static_cast(QuotedLoad * 100)); + } + } +} + +} // namespace NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/cpu_quota_manager.h b/ydb/core/kqp/workload_service/common/cpu_quota_manager.h new file mode 100644 index 000000000000..f0587e1d4418 --- /dev/null +++ b/ydb/core/kqp/workload_service/common/cpu_quota_manager.h @@ -0,0 +1,76 @@ +#pragma once + +#include + +#include + +#include + + +namespace NKikimr::NKqp::NWorkload { + +class TCpuQuotaManager { + struct TCounters { + const ::NMonitoring::TDynamicCounterPtr SubComponent; + struct TCommonMetrics { + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + }; + + TCommonMetrics CpuLoadRequest; + ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage; + + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent); + + private: + void Register(); + void RegisterCommonMetrics(TCommonMetrics& metrics) const; + }; + +public: + struct TCpuQuotaResponse { + explicit TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status = NYdb::EStatus::SUCCESS, NYql::TIssues issues = {}); + + const int32_t CurrentLoad; + const NYdb::EStatus Status; + const NYql::TIssues Issues; + }; + +public: + TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui64 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent); + + double GetInstantLoad() const; + double GetAverageLoad() const; + TDuration GetMonitoringRequestDelay() const; + TInstant GetMonitoringRequestTime() const; + + void UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success); + bool CheckLoadIsOutdated(); + + bool HasCpuQuota(double maxClusterLoad); + TCpuQuotaResponse RequestCpuQuota(double quota, double maxClusterLoad); + void AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed); + +private: + TCounters Counters; + + const TDuration MonitoringRequestDelay; + const TDuration AverageLoadInterval; + const TDuration IdleTimeout; + const double DefaultQueryLoad; + const bool Strict; + ui64 CpuNumber = 0; + + TInstant LastCpuLoad; + TInstant LastUpdateCpuLoad; + TInstant LastRequestCpuQuota; + + double InstantLoad = 0.0; + double AverageLoad = 0.0; + double QuotedLoad = 0.0; + bool Ready = false; +}; + +} // namespace NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 25018bbe6728..03bafddfca7a 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -30,6 +30,10 @@ struct TEvPrivate { EvStopPoolHandler, EvCancelRequest, + EvCpuQuotaRequest, + EvCpuQuotaResponse, + EvCpuLoadResponse, + EvTablesCreationFinished, EvCleanupTableResponse, EvCleanupTablesFinished, @@ -110,13 +114,19 @@ struct TEvPrivate { }; struct TEvFinishRequestInPool : public NActors::TEventLocal { - TEvFinishRequestInPool(const TString& database, const TString& poolId) + TEvFinishRequestInPool(const TString& database, const TString& poolId, TDuration duration, TDuration cpuConsumed, bool adjustCpuQuota) : Database(database) , PoolId(poolId) + , Duration(duration) + , CpuConsumed(cpuConsumed) + , AdjustCpuQuota(adjustCpuQuota) {} const TString Database; const TString PoolId; + const TDuration Duration; + const TDuration CpuConsumed; + const bool AdjustCpuQuota; }; struct TEvResignPoolHandler : public NActors::TEventLocal { @@ -142,6 +152,37 @@ struct TEvPrivate { const TString SessionId; }; + // Cpu load requests + struct TEvCpuQuotaRequest : public NActors::TEventLocal { + explicit TEvCpuQuotaRequest(double maxClusterLoad) + : MaxClusterLoad(maxClusterLoad) + {} + + const double MaxClusterLoad; + }; + + struct TEvCpuQuotaResponse : public NActors::TEventLocal { + explicit TEvCpuQuotaResponse(bool quotaAccepted) + : QuotaAccepted(quotaAccepted) + {} + + const bool QuotaAccepted; + }; + + struct TEvCpuLoadResponse : public NActors::TEventLocal { + TEvCpuLoadResponse(Ydb::StatusIds::StatusCode status, double instantLoad, ui64 cpuNumber, NYql::TIssues issues) + : Status(status) + , InstantLoad(instantLoad) + , CpuNumber(cpuNumber) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const double InstantLoad; + const ui64 CpuNumber; + const NYql::TIssues Issues; + }; + // Tables queries events struct TEvTablesCreationFinished : public NActors::TEventLocal { TEvTablesCreationFinished(bool success, NYql::TIssues issues) diff --git a/ydb/core/kqp/workload_service/common/ya.make b/ydb/core/kqp/workload_service/common/ya.make index 44cbd65ca22e..4026b389648b 100644 --- a/ydb/core/kqp/workload_service/common/ya.make +++ b/ydb/core/kqp/workload_service/common/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + cpu_quota_manager.cpp events.cpp helpers.cpp ) @@ -14,6 +15,8 @@ PEERDIR( ydb/library/actors/core + ydb/public/sdk/cpp/client/ydb_types + library/cpp/retry ) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 94ad44dbe460..718f831c245b 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -1,4 +1,5 @@ #include "kqp_workload_service.h" +#include "kqp_workload_service_impl.h" #include #include @@ -8,7 +9,6 @@ #include #include -#include #include #include @@ -23,8 +23,6 @@ namespace { using namespace NActors; -constexpr TDuration IDLE_DURATION = TDuration::Seconds(15); - class TKqpWorkloadService : public TActorBootstrapped { enum class ETablesCreationStatus { @@ -34,45 +32,9 @@ class TKqpWorkloadService : public TActorBootstrapped { Finished, }; - struct TPoolState { - TActorId PoolHandler; - TActorContext ActorContext; - - std::queue PendingRequests = {}; - bool WaitingInitialization = false; - bool PlaceRequestRunning = false; - std::optional NewPoolHandler = std::nullopt; - - ui64 InFlightRequests = 0; - TInstant LastUpdateTime = TInstant::Now(); - - void UpdateHandler() { - if (PlaceRequestRunning || WaitingInitialization || !NewPoolHandler) { - return; - } - - ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler()); - PoolHandler = *NewPoolHandler; - NewPoolHandler = std::nullopt; - InFlightRequests = 0; - } - - void StartPlaceRequest() { - if (PlaceRequestRunning || PendingRequests.empty()) { - return; - } - - PlaceRequestRunning = true; - InFlightRequests++; - ActorContext.Send(PendingRequests.front()->Forward(PoolHandler)); - PendingRequests.pop(); - } - - void OnRequestFinished() { - Y_ENSURE(InFlightRequests); - InFlightRequests--; - LastUpdateTime = TInstant::Now(); - } + enum class EWakeUp { + IdleCheck, + StartCpuLoadRequest }; public: @@ -90,6 +52,8 @@ class TKqpWorkloadService : public TActorBootstrapped { (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem }), IEventHandle::FlagTrackDelivery); + CpuQuotaManager = std::make_unique(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); + EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools(); if (EnabledResourcePools) { InitializeWorkloadService(); @@ -172,24 +136,15 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(ev->Forward(poolState->PoolHandler)); } - void HandleWakeup() { - IdleChecksStarted = false; - - std::vector poolsToDelete; - poolsToDelete.reserve(PoolIdToState.size()); - for (const auto& [poolKey, poolState] : PoolIdToState) { - if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) { - Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler()); - poolsToDelete.emplace_back(poolKey); - } - } - for (const auto& poolKey : poolsToDelete) { - PoolIdToState.erase(poolKey); - ActivePools->Dec(); - } + void Handle(TEvents::TEvWakeup::TPtr& ev) { + switch (static_cast(ev->Get()->Tag)) { + case EWakeUp::IdleCheck: + RunIdleCheck(); + break; - if (!PoolIdToState.empty()) { - StartIdleChecks(); + case EWakeUp::StartCpuLoadRequest: + RunCpuLoadRequest(); + break; } } @@ -201,15 +156,17 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvPlaceRequestIntoPool, Handle); hFunc(TEvCleanupRequest, Handle); - sFunc(TEvents::TEvWakeup, HandleWakeup); + hFunc(TEvents::TEvWakeup, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); hFunc(TEvPrivate::TEvRefreshPoolState, Handle); + hFunc(TEvPrivate::TEvCpuQuotaRequest, Handle); hFunc(TEvPrivate::TEvFinishRequestInPool, Handle); hFunc(TEvPrivate::TEvPrepareTablesRequest, Handle); hFunc(TEvPrivate::TEvCleanupTablesFinished, Handle); hFunc(TEvPrivate::TEvTablesCreationFinished, Handle); + hFunc(TEvPrivate::TEvCpuLoadResponse, Handle); hFunc(TEvPrivate::TEvResignPoolHandler, Handle); ) @@ -238,7 +195,7 @@ class TKqpWorkloadService : public TActorBootstrapped { poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; ActivePools->Inc(); - StartIdleChecks(); + ScheduleIdleCheck(); } poolState->PendingRequests.emplace(std::move(ev)); @@ -268,14 +225,27 @@ class TKqpWorkloadService : public TActorBootstrapped { } } + void Handle(TEvPrivate::TEvCpuQuotaRequest::TPtr& ev) { + const TActorId& poolHandler = ev->Sender; + const double maxClusterLoad = ev->Get()->MaxClusterLoad; + LOG_T("Requested cpu quota from handler " << poolHandler << ", MaxClusterLoad: " << maxClusterLoad); + + CpuQuotaManager->RequestCpuQuota(poolHandler, maxClusterLoad, ev->Cookie); + ScheduleCpuLoadRequest(); + } + void Handle(TEvPrivate::TEvFinishRequestInPool::TPtr& ev) { const TString& database = ev->Get()->Database; const TString& poolId = ev->Get()->PoolId; - LOG_T("Request finished in pool, Database: " << database << ", PoolId: " << poolId); + LOG_T("Request finished in pool, Database: " << database << ", PoolId: " << poolId << ", Duration: " << ev->Get()->Duration << ", CpuConsumed: " << ev->Get()->CpuConsumed << ", AdjustCpuQuota: " << ev->Get()->AdjustCpuQuota); if (auto poolState = GetPoolState(database, poolId)) { poolState->OnRequestFinished(); } + if (ev->Get()->AdjustCpuQuota) { + CpuQuotaManager->AdjustCpuQuota(ev->Get()->Duration, ev->Get()->CpuConsumed.SecondsFloat()); + ScheduleCpuLoadRequest(); + } } void Handle(TEvPrivate::TEvPrepareTablesRequest::TPtr& ev) { @@ -327,6 +297,19 @@ class TKqpWorkloadService : public TActorBootstrapped { OnTabelsCreated(false, issues); } + void Handle(TEvPrivate::TEvCpuLoadResponse::TPtr& ev) { + const bool success = ev->Get()->Status == Ydb::StatusIds::SUCCESS; + if (!success) { + LOG_E("Failed to fetch cpu load " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString()); + } else { + LOG_T("Succesfully fetched cpu load: " << 100.0 * ev->Get()->InstantLoad << "%, cpu number: " << ev->Get()->CpuNumber); + } + + CpuQuotaManager->CpuLoadRequestRunning = false; + CpuQuotaManager->UpdateCpuLoad(ev->Get()->InstantLoad, ev->Get()->CpuNumber, success); + ScheduleCpuLoadRequest(); + } + void Handle(TEvPrivate::TEvResignPoolHandler::TPtr& ev) { const TString& database = ev->Get()->Database; const TString& poolId = ev->Get()->PoolId; @@ -378,13 +361,63 @@ class TKqpWorkloadService : public TActorBootstrapped { PendingHandlers.clear(); } - void StartIdleChecks() { + void ScheduleIdleCheck() { if (IdleChecksStarted) { return; } IdleChecksStarted = true; - Schedule(IDLE_DURATION, new TEvents::TEvWakeup()); + Schedule(IDLE_DURATION / 2, new TEvents::TEvWakeup()); + } + + void RunIdleCheck() { + IdleChecksStarted = false; + + std::vector poolsToDelete; + poolsToDelete.reserve(PoolIdToState.size()); + for (const auto& [poolKey, poolState] : PoolIdToState) { + if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) { + CpuQuotaManager->CleanupHandler(poolState.PoolHandler); + Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler()); + poolsToDelete.emplace_back(poolKey); + } + } + for (const auto& poolKey : poolsToDelete) { + PoolIdToState.erase(poolKey); + ActivePools->Dec(); + } + + if (!PoolIdToState.empty()) { + ScheduleIdleCheck(); + } + } + + void ScheduleCpuLoadRequest() const { + auto delay = CpuQuotaManager->GetCpuLoadRequestDelay(); + if (!delay) { + return; + } + + if (*delay) { + Schedule(*delay, new TEvents::TEvWakeup(static_cast(EWakeUp::StartCpuLoadRequest))); + } else { + RunCpuLoadRequest(); + } + } + + void RunCpuLoadRequest() const { + if (CpuQuotaManager->CpuLoadRequestRunning) { + return; + } + + CpuQuotaManager->CpuLoadRequestTime = TInstant::Zero(); + if (CpuQuotaManager->CpuQuotaManager.GetMonitoringRequestDelay()) { + ScheduleCpuLoadRequest(); + return; + } + + CpuQuotaManager->CpuLoadRequestRunning = true; + Register(CreateCpuLoadFetcherActor(SelfId())); } private: @@ -460,6 +493,7 @@ class TKqpWorkloadService : public TActorBootstrapped { std::unordered_set DatabasesWithDefaultPool; std::unordered_map PoolIdToState; + std::unique_ptr CpuQuotaManager; NMonitoring::TDynamicCounters::TCounterPtr ActivePools; }; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h new file mode 100644 index 000000000000..9ee91f077720 --- /dev/null +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -0,0 +1,132 @@ +#pragma once + +#include + +#include +#include + + +namespace NKikimr::NKqp::NWorkload { + +constexpr TDuration IDLE_DURATION = TDuration::Seconds(60); + +struct TPoolState { + NActors::TActorId PoolHandler; + NActors::TActorContext ActorContext; + + std::queue PendingRequests = {}; + bool WaitingInitialization = false; + bool PlaceRequestRunning = false; + std::optional NewPoolHandler = std::nullopt; + + ui64 InFlightRequests = 0; + TInstant LastUpdateTime = TInstant::Now(); + + void UpdateHandler() { + if (PlaceRequestRunning || WaitingInitialization || !NewPoolHandler) { + return; + } + + ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler()); + PoolHandler = *NewPoolHandler; + NewPoolHandler = std::nullopt; + InFlightRequests = 0; + } + + void StartPlaceRequest() { + if (PlaceRequestRunning || PendingRequests.empty()) { + return; + } + + PlaceRequestRunning = true; + InFlightRequests++; + ActorContext.Send(PendingRequests.front()->Forward(PoolHandler)); + PendingRequests.pop(); + } + + void OnRequestFinished() { + Y_ENSURE(InFlightRequests); + InFlightRequests--; + LastUpdateTime = TInstant::Now(); + } +}; + +struct TCpuQuotaManagerState { + TCpuQuotaManager CpuQuotaManager; + NActors::TActorContext ActorContext; + bool CpuLoadRequestRunning = false; + TInstant CpuLoadRequestTime = TInstant::Zero(); + + TCpuQuotaManagerState(NActors::TActorContext actorContext, NMonitoring::TDynamicCounterPtr subComponent) + : CpuQuotaManager(TDuration::Seconds(1), TDuration::Seconds(10), IDLE_DURATION, 0.1, true, 0, subComponent) + , ActorContext(actorContext) + {} + + void RequestCpuQuota(TActorId poolHandler, double maxClusterLoad, ui64 coockie) { + auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad); + + bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS; + ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted), 0, coockie); + + // Schedule notification + if (!quotaAccepted) { + if (auto it = HandlersLimits.find(poolHandler); it != HandlersLimits.end()) { + PendingHandlers[it->second].erase(poolHandler); + } + HandlersLimits[poolHandler] = maxClusterLoad; + PendingHandlers[maxClusterLoad].insert(poolHandler); + } + } + + void UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success) { + CpuQuotaManager.UpdateCpuLoad(instantLoad, cpuNumber, success); + CheckPendingQueue(); + } + + void AdjustCpuQuota(TDuration duration, double cpuSecondsConsumed) { + CpuQuotaManager.AdjustCpuQuota(0.0, duration, cpuSecondsConsumed); + CheckPendingQueue(); + } + + std::optional GetCpuLoadRequestDelay() { + if (CpuLoadRequestRunning) { + return std::nullopt; + } + + auto requestTime = CpuQuotaManager.GetMonitoringRequestTime(); + if (!CpuLoadRequestTime || CpuLoadRequestTime > requestTime) { + CpuLoadRequestTime = requestTime; + return CpuLoadRequestTime - TInstant::Now(); + } + return std::nullopt; + } + + void CleanupHandler(TActorId poolHandler) { + if (auto it = HandlersLimits.find(poolHandler); it != HandlersLimits.end()) { + PendingHandlers[it->second].erase(poolHandler); + HandlersLimits.erase(it); + } + } + +private: + void CheckPendingQueue() { + while (!PendingHandlers.empty()) { + const auto& [maxClusterLoad, poolHandlers] = *PendingHandlers.begin(); + if (!CpuQuotaManager.HasCpuQuota(maxClusterLoad)) { + break; + } + + for (const TActorId& poolHandler : poolHandlers) { + ActorContext.Send(poolHandler, new TEvPrivate::TEvRefreshPoolState()); + HandlersLimits.erase(poolHandler); + } + PendingHandlers.erase(PendingHandlers.begin()); + } + } + +private: + std::map> PendingHandlers; + std::unordered_map HandlersLimits; +}; + +} // namespace NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 7edc17a78525..557b4a41b649 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -8,9 +8,10 @@ #include #include #include - #include +#include + namespace NKikimr::NKqp::NWorkload { @@ -287,6 +288,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { poolConfig.QueueSize = Settings_.QueueSize_; poolConfig.QueryCancelAfter = Settings_.QueryCancelAfter_; poolConfig.QueryMemoryLimitPercentPerNode = Settings_.QueryMemoryLimitPercentPerNode_; + poolConfig.DatabaseLoadCpuThreshold = Settings_.DatabaseLoadCpuThreshold_; TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); GetRuntime()->Register(CreatePoolCreatorActor(edgeActor, Settings_.DomainName_, Settings_.PoolId_, poolConfig, nullptr, {})); @@ -303,6 +305,41 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { CreateSamplePool(); } + // Cluster helpers + void UpdateNodeCpuInfo(double usage, ui32 threads, ui64 nodeIndex = 0) override { + TVector> pools; + pools.emplace_back("User", usage, threads, threads); + + auto edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex); + GetRuntime()->Send( + NNodeWhiteboard::MakeNodeWhiteboardServiceId(GetRuntime()->GetNodeId(nodeIndex)), edgeActor, + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools), nodeIndex + ); + + WaitFor(FUTURE_WAIT_TIMEOUT, "node cpu usage", [this, usage, threads, nodeIndex, edgeActor](TString& errorString) { + GetRuntime()->Send( + NNodeWhiteboard::MakeNodeWhiteboardServiceId(GetRuntime()->GetNodeId(nodeIndex)), edgeActor, + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest(), nodeIndex + ); + auto response = GetRuntime()->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); + + if (!response->Get()->Record.SystemStateInfoSize()) { + errorString = "empty system state info"; + return false; + } + const auto& systemStateInfo = response->Get()->Record.GetSystemStateInfo()[0]; + + if (!systemStateInfo.PoolStatsSize()) { + errorString = "empty pool stats"; + return false; + } + const auto& poolStat = systemStateInfo.GetPoolStats()[0]; + + errorString = TStringBuilder() << "usage: " << poolStat.GetUsage() << ", threads: " << poolStat.GetThreads(); + return poolStat.GetUsage() == usage && threads == poolStat.GetThreads(); + }); + } + // Scheme queries helpers NYdb::NScheme::TSchemeClient GetSchemeClient() const override { return NYdb::NScheme::TSchemeClient(*YdbDriver_); @@ -323,21 +360,17 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { void WaitPoolAccess(const TString& userSID, ui32 access, const TString& poolId = "") const override { auto token = NACLib::TUserToken(userSID, {}); - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) { - if (auto response = Navigate(TStringBuilder() << ".resource_pools/" << (poolId ? poolId : Settings_.PoolId_))) { - const auto& result = response->ResultSet.at(0); - bool resourcePool = result.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindResourcePool; - if (resourcePool && (!result.SecurityObject || result.SecurityObject->CheckAccess(access, token))) { - return; - } - Cerr << "WaitPoolAccess " << TInstant::Now() - start << ": " << (resourcePool ? TStringBuilder() << "access denied" : TStringBuilder() << "unexpected kind " << result.Kind) << "\n"; - } else { - Cerr << "WaitPoolAccess " << TInstant::Now() - start << ": empty response\n"; + WaitFor(FUTURE_WAIT_TIMEOUT, "pool acl", [this, token, access, poolId](TString& errorString) { + auto response = Navigate(TStringBuilder() << ".resource_pools/" << (poolId ? poolId : Settings_.PoolId_)); + if (!response) { + errorString = "empty response"; + return false; } - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool version waiting timeout"); + const auto& result = response->ResultSet.at(0); + bool resourcePool = result.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindResourcePool; + errorString = (resourcePool ? TStringBuilder() << "access denied" : TStringBuilder() << "unexpected kind " << result.Kind); + return resourcePool && (!result.SecurityObject || result.SecurityObject->CheckAccess(access, token)); + }); } // Generic query helpers @@ -390,17 +423,11 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { } void WaitPoolState(const TPoolStateDescription& state, const TString& poolId = "") const override { - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) { + WaitFor(FUTURE_WAIT_TIMEOUT, "pool state", [this, state, poolId](TString& errorString) { auto description = GetPoolDescription(TDuration::Zero(), poolId); - if (description.DelayedRequests == state.DelayedRequests && description.RunningRequests == state.RunningRequests) { - return; - } - - Cerr << "WaitPoolState " << TInstant::Now() - start << ": delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool state waiting timeout"); + errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests; + return description.DelayedRequests == state.DelayedRequests && description.RunningRequests == state.RunningRequests; + }); } void WaitPoolHandlersCount(i64 finalCount, std::optional initialCount = std::nullopt, TDuration timeout = FUTURE_WAIT_TIMEOUT) const override { @@ -410,16 +437,10 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { UNIT_ASSERT_VALUES_EQUAL_C(counter->Val(), *initialCount, "Unexpected pool handlers count"); } - TInstant start = TInstant::Now(); - while (TInstant::Now() - start < timeout) { - if (counter->Val() == finalCount) { - return; - } - - Cerr << "WaitPoolHandlersCount " << TInstant::Now() - start << ": number handlers = " << counter->Val() << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool handlers count wait timeout"); + WaitFor(timeout, "pool handlers", [counter, finalCount](TString& errorString) { + errorString = TStringBuilder() << "number handlers = " << counter->Val(); + return counter->Val() == finalCount; + }); } void StopWorkloadService(ui64 nodeIndex = 0) const override { @@ -470,6 +491,19 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { return event; } + static void WaitFor(TDuration timeout, TString description, std::function callback) { + TInstant start = TInstant::Now(); + while (TInstant::Now() - start <= timeout) { + TString errorString; + if (callback(errorString)) { + return; + } + Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n"; + Sleep(TDuration::Seconds(1)); + } + UNIT_ASSERT_C(false, "Waiting " << description << " timeout"); + } + NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const { return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp") ->GetSubgroup("subsystem", "workload_manager"); diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index f673f070c11b..e2e05141247d 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -74,12 +74,16 @@ struct TYdbSetupSettings { FLUENT_SETTING_DEFAULT(i32, QueueSize, -1); FLUENT_SETTING_DEFAULT(TDuration, QueryCancelAfter, FUTURE_WAIT_TIMEOUT); FLUENT_SETTING_DEFAULT(double, QueryMemoryLimitPercentPerNode, -1); + FLUENT_SETTING_DEFAULT(double, DatabaseLoadCpuThreshold, -1); TIntrusivePtr Create() const; }; class IYdbSetup : public TThrRefBase { public: + // Cluster helpers + virtual void UpdateNodeCpuInfo(double usage, ui32 threads, ui64 nodeIndex = 0) = 0; + // Scheme queries helpers virtual NYdb::NScheme::TSchemeClient GetSchemeClient() const = 0; virtual void ExecuteSchemeQuery(const TString& query, NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS, const TString& expectedMessage = "") const = 0; diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 8b9a8262609d..d1748959e473 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -20,6 +20,14 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); } +TEvPrivate::TEvCpuLoadResponse::TPtr FetchCpuInfo(TIntrusivePtr ydb) { + auto runtime = ydb->GetRuntime(); + const auto& edgeActor = runtime->AllocateEdgeActor(); + + runtime->Register(CreateCpuLoadFetcherActor(edgeActor)); + return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); +} + } // anonymous namespace Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { @@ -129,6 +137,28 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { DROP RESOURCE POOL )" << NResourcePool::DEFAULT_POOL_ID << ";" , settings)); } + + Y_UNIT_TEST(TestCpuLoadActor) { + const ui32 nodeCount = 5; + auto ydb = TYdbSetupSettings() + .NodeCount(nodeCount) + .Create(); + + auto response = FetchCpuInfo(ydb); + UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::NOT_FOUND, response->Get()->Issues.ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(response->Get()->Issues.ToString(), "Cpu info not found"); + + const double usage = 0.25; + const ui32 threads = 2; + for (size_t nodeIndex = 0; nodeIndex < nodeCount; ++nodeIndex) { + ydb->UpdateNodeCpuInfo(usage, threads, nodeIndex); + } + + response = FetchCpuInfo(ydb); + UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(response->Get()->CpuNumber, threads * nodeCount); + UNIT_ASSERT_DOUBLES_EQUAL(response->Get()->InstantLoad, usage, 0.01); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 553cb5b17933..807ec953ffd1 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -189,6 +189,38 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << ydb->GetSettings().PoolId_ << " was disabled due to zero concurrent query limit"); } + Y_UNIT_TEST(TestCpuLoadThreshold) { + auto ydb = TYdbSetupSettings() + .DatabaseLoadCpuThreshold(90) + .QueryCancelAfter(TDuration::Seconds(10)) + .Create(); + + // Simulate load + ydb->UpdateNodeCpuInfo(1.0, 1); + + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Delay deadline exceeded in pool " << ydb->GetSettings().PoolId_); + } + + Y_UNIT_TEST(TestCpuLoadThresholdRefresh) { + auto ydb = TYdbSetupSettings() + .DatabaseLoadCpuThreshold(90) + .Create(); + + // Simulate load + ydb->UpdateNodeCpuInfo(1.0, 1); + + // Delay request + auto result = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 0}); + + // Free load + ydb->ContinueQueryExecution(result); + ydb->UpdateNodeCpuInfo(0.0, 1); + TSampleQueries::TSelect42::CheckResult(result.GetResult(TDuration::Seconds(5))); + } + Y_UNIT_TEST(TestHandlerActorCleanup) { auto ydb = TYdbSetupSettings() .ConcurrentQueryLimit(1) @@ -197,7 +229,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(NResourcePool::DEFAULT_POOL_ID))); - ydb->WaitPoolHandlersCount(0, 2, TDuration::Seconds(35)); + ydb->WaitPoolHandlersCount(0, 2, TDuration::Seconds(95)); } } diff --git a/ydb/core/kqp/workload_service/ya.make b/ydb/core/kqp/workload_service/ya.make index 40ee9196cdad..d36580965937 100644 --- a/ydb/core/kqp/workload_service/ya.make +++ b/ydb/core/kqp/workload_service/ya.make @@ -7,6 +7,8 @@ SRCS( PEERDIR( ydb/core/cms/console + ydb/core/fq/libs/compute/common + ydb/core/kqp/workload_service/actors ) diff --git a/ydb/core/resource_pools/resource_pool_settings.cpp b/ydb/core/resource_pools/resource_pool_settings.cpp index 1a5c39644a54..f477d334c625 100644 --- a/ydb/core/resource_pools/resource_pool_settings.cpp +++ b/ydb/core/resource_pools/resource_pool_settings.cpp @@ -7,7 +7,8 @@ std::unordered_map GetPropertiesMap(TPoolSettings& settings, std::unordered_map properties = { {"concurrent_query_limit", &settings.ConcurrentQueryLimit}, {"queue_size", &settings.QueueSize}, - {"query_memory_limit_percent_per_node", &settings.QueryMemoryLimitPercentPerNode} + {"query_memory_limit_percent_per_node", &settings.QueryMemoryLimitPercentPerNode}, + {"database_load_cpu_threshold", &settings.DatabaseLoadCpuThreshold} }; if (!restricted) { properties.insert({"query_cancel_after_seconds", &settings.QueryCancelAfter}); diff --git a/ydb/core/resource_pools/resource_pool_settings.h b/ydb/core/resource_pools/resource_pool_settings.h index c2dc319838e1..cecfd4eefb59 100644 --- a/ydb/core/resource_pools/resource_pool_settings.h +++ b/ydb/core/resource_pools/resource_pool_settings.h @@ -17,6 +17,8 @@ struct TPoolSettings { TPercent QueryMemoryLimitPercentPerNode = -1; // Percent from node memory capacity, -1 = disabled + TPercent DatabaseLoadCpuThreshold = -1; // -1 = disabled + bool operator==(const TPoolSettings& other) const = default; };