From 32809cce356289ff4a81c5626a9a934e902f3d87 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Wed, 25 Sep 2024 17:59:00 +0300 Subject: [PATCH] YQ-3689 added kqp proxy database cache (#9644) --- ydb/core/kqp/common/events/events.h | 46 ++++ ydb/core/kqp/common/events/query.h | 9 + .../kqp/common/events/script_executions.h | 52 ++-- ydb/core/kqp/common/events/workload_service.h | 13 +- ydb/core/kqp/common/events/ya.make | 1 + ydb/core/kqp/common/kqp_event_impl.cpp | 4 + ydb/core/kqp/common/simple/kqp_event_ids.h | 6 +- .../resource_pool_classifier/checker.cpp | 30 ++- .../kqp_proxy_databases_cache.cpp | 249 ++++++++++++++++++ .../kqp/proxy_service/kqp_proxy_service.cpp | 100 +++++-- .../proxy_service/kqp_proxy_service_impl.h | 59 +++++ ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 110 ++++++++ ydb/core/kqp/proxy_service/ut/ya.make | 1 + ydb/core/kqp/proxy_service/ya.make | 1 + .../workload_service/actors/scheme_actors.cpp | 4 +- ydb/core/kqp/workload_service/common/events.h | 32 --- .../workload_service/kqp_workload_service.cpp | 4 +- .../kqp_workload_service_impl.h | 6 +- .../ut/kqp_workload_service_ut.cpp | 4 +- ydb/core/protos/kqp.proto | 1 + 20 files changed, 643 insertions(+), 89 deletions(-) create mode 100644 ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 7e700123c42c..b28ad1db2a35 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -108,6 +108,18 @@ struct TEvKqp { struct TEvScriptRequest : public TEventLocal { TEvScriptRequest() = default; + const TString& GetDatabase() const { + return Record.GetRequest().GetDatabase(); + } + + const TString& GetDatabaseId() const { + return Record.GetRequest().GetDatabaseId(); + } + + void SetDatabaseId(const TString& databaseId) { + Record.MutableRequest()->SetDatabaseId(databaseId); + } + mutable NKikimrKqp::TEvQueryRequest Record; TDuration ForgetAfter; TDuration ResultsTtl; @@ -161,6 +173,40 @@ struct TEvKqp { return issues; } }; + + struct TEvUpdateDatabaseInfo : public TEventLocal { + TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Database(database) + , Issues(std::move(issues)) + {} + + TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless) + : Status(Ydb::StatusIds::SUCCESS) + , Database(database) + , DatabaseId(databaseId) + , Serverless(serverless) + , Issues({}) + {} + + Ydb::StatusIds::StatusCode Status; + TString Database; + TString DatabaseId; + bool Serverless = false; + NYql::TIssues Issues; + }; + + struct TEvDelayedRequestError : public TEventLocal { + TEvDelayedRequestError(THolder requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : RequestEvent(std::move(requestEvent)) + , Status(status) + , Issues(std::move(issues)) + {} + + THolder RequestEvent; + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 18818e958182..960927332005 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -351,6 +351,14 @@ struct TEvQueryRequest: public NActors::TEventLocal Token_; TActorId RequestActorId; TString Database; + TString DatabaseId; TString SessionId; TString YqlText; TString QueryId; diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index f6bf6d424101..62e2d767c744 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 { FS_ROLLBACK, }; -struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal { - TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) +template +struct TEventWithDatabaseId : public NActors::TEventLocal { + TEventWithDatabaseId(const TString& database) : Database(database) - , OperationId(id) {} + const TString& GetDatabase() const { + return Database; + } + + const TString& GetDatabaseId() const { + return DatabaseId; + } + + void SetDatabaseId(const TString& databaseId) { + DatabaseId = databaseId; + } + const TString Database; + TString DatabaseId; +}; + +struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId { + TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) + , OperationId(id) + {} + const NOperationId::TOperationId OperationId; }; @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal { - explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) - : Database(database) +struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId { + TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) , OperationId(id) - { - } + {} - TString Database; NOperationId::TOperationId OperationId; }; @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal Metadata; }; -struct TEvListScriptExecutionOperations : public NActors::TEventLocal { +struct TEvListScriptExecutionOperations : public TEventWithDatabaseId { TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken) - : Database(database) + : TEventWithDatabaseId(database) , PageSize(pageSize) , PageToken(pageToken) {} - TString Database; ui64 PageSize; TString PageToken; }; @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB { }; -struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal { - explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) - : Database(database) +struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId { + TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) , OperationId(id) - { - } + {} - TString Database; NOperationId::TOperationId OperationId; }; diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index c1d36a957a76..9781b951e2d5 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -90,14 +91,20 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal SecurityObject; }; -struct TEvUpdateDatabaseInfo : public NActors::TEventLocal { - TEvUpdateDatabaseInfo(const TString& database, bool serverless) - : Database(database) +struct TEvFetchDatabaseResponse : public NActors::TEventLocal { + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) + : Status(status) + , Database(database) , Serverless(serverless) + , PathId(pathId) + , Issues(std::move(issues)) {} + const Ydb::StatusIds::StatusCode Status; const TString Database; const bool Serverless; + const TPathId PathId; + const NYql::TIssues Issues; }; } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/common/events/ya.make b/ydb/core/kqp/common/events/ya.make index 9729e9518eed..a442cb07ea7c 100644 --- a/ydb/core/kqp/common/events/ya.make +++ b/ydb/core/kqp/common/events/ya.make @@ -15,6 +15,7 @@ PEERDIR( ydb/core/kqp/common/shutdown ydb/core/kqp/common/compilation ydb/core/resource_pools + ydb/core/scheme ydb/library/yql/dq/actors ydb/public/api/protos diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index cadd44a1c89e..ee4e834e6c88 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetPoolId(PoolId); } + if (!DatabaseId.empty()) { + Record.MutableRequest()->SetDatabaseId(DatabaseId); + } + Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index c927c0b11568..f6989d5b69bd 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -43,7 +43,9 @@ struct TKqpEvents { EvListSessionsRequest, EvListSessionsResponse, EvListProxyNodesRequest, - EvListProxyNodesResponse + EvListProxyNodesResponse, + EvUpdateDatabaseInfo, + EvDelayedRequestError }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); @@ -174,8 +176,8 @@ struct TKqpWorkloadServiceEvents { EvCleanupRequest, EvCleanupResponse, EvUpdatePoolInfo, - EvUpdateDatabaseInfo, EvSubscribeOnPoolChanges, + EvFetchDatabaseResponse, }; }; diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp index 5b6bae22b411..7200999c7ca1 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -20,6 +19,31 @@ using namespace NResourcePool; using namespace NWorkload; +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvRanksCheckerResponse = EventSpaceBegin(TEvents::ES_PRIVATE), + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvRanksCheckerResponse : public TEventLocal { + TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues) + : Status(status) + , MaxRank(maxRank) + , NumberClassifiers(numberClassifiers) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const i64 MaxRank; + const ui64 NumberClassifiers; + const NYql::TIssues Issues; + }; +}; + class TRanksCheckerActor : public NKikimr::TQueryBase { using TBase = NKikimr::TQueryBase; @@ -177,7 +201,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrappedGet()->Status != Ydb::StatusIds::SUCCESS) { FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues); return; @@ -223,7 +247,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped + +#include + + +namespace NKikimr::NKqp { + +namespace { + + +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvSubscribeOnDatabase = EventSpaceBegin(TEvents::ES_PRIVATE), + EvPingDatabaseSubscription, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvSubscribeOnDatabase : public TEventLocal { + explicit TEvSubscribeOnDatabase(const TString& database) + : Database(database) + {} + + const TString Database; + }; + + struct TEvPingDatabaseSubscription : public TEventLocal { + explicit TEvPingDatabaseSubscription(const TString& database) + : Database(database) + {} + + const TString Database; + }; +}; + +class TDatabaseSubscriberActor : public TActor { + struct TDatabaseState { + TString Database; + TString DatabaseId = ""; + bool Serverless = false; + + bool FetchRequestIsRunning = true; + TInstant LastUpdateTime = TInstant::Now(); + ui32 WatchKey = 0; + }; + + using TBase = TActor; + +public: + TDatabaseSubscriberActor(TDuration idleTimeout) + : TBase(&TDatabaseSubscriberActor::StateFunc) + , IdleTimeout(idleTimeout) + , DatabaseStates(std::numeric_limits::max()) + {} + + void Registered(TActorSystem* sys, const TActorId& owner) { + TBase::Registered(sys, owner); + Owner = owner; + } + + void Handle(TEvPrivate::TEvSubscribeOnDatabase::TPtr& ev) { + const TString& database = ev->Get()->Database; + auto databaseStateIt = DatabaseStates.Find(database); + + if (databaseStateIt == DatabaseStates.End()) { + DatabaseStates.Insert({database, TDatabaseState{.Database = database}}); + Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database)); + StartIdleCheck(); + return; + } + + databaseStateIt->LastUpdateTime = TInstant::Now(); + if (databaseStateIt->DatabaseId) { + SendSubscriberInfo(*databaseStateIt, Ydb::StatusIds::SUCCESS); + } + } + + void Handle(TEvPrivate::TEvPingDatabaseSubscription::TPtr& ev) { + auto databaseStateIt = DatabaseStates.Find(ev->Get()->Database); + if (databaseStateIt != DatabaseStates.End()) { + databaseStateIt->LastUpdateTime = TInstant::Now(); + } + } + + void Handle(NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { + auto databaseStateIt = DatabaseStates.Find(ev->Get()->Database); + if (databaseStateIt == DatabaseStates.End()) { + return; + } + + databaseStateIt->FetchRequestIsRunning = false; + UpdateDatabaseState(*databaseStateIt, ev->Get()->PathId, ev->Get()->Serverless); + SendSubscriberInfo(*databaseStateIt, ev->Get()->Status, ev->Get()->Issues); + + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + FreeWatchKey++; + databaseStateIt->WatchKey = FreeWatchKey; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(ev->Get()->PathId, FreeWatchKey)); + } + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { + auto databaseStateIt = DatabaseStates.Find(ev->Get()->Path); + if (databaseStateIt == DatabaseStates.End()) { + return; + } + + UnsubscribeFromSchemeCache(*databaseStateIt); + SendSubscriberInfo(*databaseStateIt, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); + DatabaseStates.Erase(databaseStateIt); + } + + void HandlePoison() { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); + TBase::PassAway(); + } + + void HandleWakeup() { + IdleCheckStarted = false; + const auto minimalTime = TInstant::Now() - IdleTimeout; + while (!DatabaseStates.Empty()) { + auto oldestIt = DatabaseStates.FindOldest(); + if (oldestIt->LastUpdateTime > minimalTime) { + break; + } + + UnsubscribeFromSchemeCache(*oldestIt); + SendSubscriberInfo(*oldestIt, Ydb::StatusIds::ABORTED, {NYql::TIssue{"Database subscription was dropped by idle timeout"}}); + DatabaseStates.Erase(oldestIt); + } + + if (!DatabaseStates.Empty()) { + StartIdleCheck(); + } + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvSubscribeOnDatabase, Handle); + hFunc(TEvPrivate::TEvPingDatabaseSubscription, Handle); + hFunc(NWorkload::TEvFetchDatabaseResponse, Handle); + sFunc(TEvents::TEvPoison, HandlePoison); + sFunc(TEvents::TEvWakeup, HandleWakeup); + + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated); + ) + +private: + static void UpdateDatabaseState(TDatabaseState& databaseState, TPathId pathId, bool serverless) { + databaseState.LastUpdateTime = TInstant::Now(); + databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << databaseState.Database; + databaseState.Serverless = serverless; + } + + void UnsubscribeFromSchemeCache(TDatabaseState& databaseState) const { + if (databaseState.WatchKey) { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(databaseState.WatchKey)); + databaseState.WatchKey = 0; + } + } + + void SendSubscriberInfo(const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { + Send(Owner, new TEvKqp::TEvUpdateDatabaseInfo(databaseState.Database, databaseState.DatabaseId, databaseState.Serverless)); + } else { + NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database " << databaseState.Database); + for (const auto& issue : issues) { + rootIssue.AddSubIssue(MakeIntrusive(issue)); + } + Send(Owner, new TEvKqp::TEvUpdateDatabaseInfo(databaseState.Database, status, {rootIssue})); + } + } + + void StartIdleCheck() { + if (!IdleCheckStarted) { + IdleCheckStarted = true; + Schedule(IdleTimeout, new TEvents::TEvWakeup()); + } + } + +private: + const TDuration IdleTimeout; + TActorId Owner; + bool IdleCheckStarted = false; + + TLRUCache DatabaseStates; + ui32 FreeWatchKey = 0; +}; + +} // anonymous namespace + +TDatabasesCache::TDatabasesCache(TDuration idleTimeout) + : IdleTimeout(idleTimeout) +{} + +const TString& TDatabasesCache::GetTenantName() { + if (!TenantName) { + TenantName = CanonizePath(AppData()->TenantName); + } + return TenantName; +} + +void TDatabasesCache::UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { + auto it = DatabasesCache.find(event->Get()->Database); + if (it == DatabasesCache.end()) { + return; + } + it->second.DatabaseId = event->Get()->DatabaseId; + + const bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; + for (auto& delayedEvent : it->second.DelayedEvents) { + if (success) { + actorContext.Send(std::move(delayedEvent.Event)); + } else { + actorContext.Send(actorContext.SelfID, new TEvKqp::TEvDelayedRequestError(std::move(delayedEvent.Event), event->Get()->Status, event->Get()->Issues), 0, delayedEvent.RequestType); + } + } + it->second.DelayedEvents.clear(); + + if (!success) { + DatabasesCache.erase(it); + } +} + +void TDatabasesCache::SubscribeOnDatabase(const TString& database, TActorContext actorContext) { + if (!SubscriberActor) { + SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor(IdleTimeout)); + } + actorContext.Send(SubscriberActor, new TEvPrivate::TEvSubscribeOnDatabase(database)); +} + +void TDatabasesCache::PingDatabaseSubscription(const TString& database, TActorContext actorContext) const { + if (SubscriberActor) { + actorContext.Send(SubscriberActor, new TEvPrivate::TEvPingDatabaseSubscription(database)); + } +} + +void TDatabasesCache::StopSubscriberActor(TActorContext actorContext) const { + if (SubscriberActor) { + actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); + } +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index c6793fc65208..022ddadc58c8 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -176,6 +176,15 @@ class TKqpProxyService : public TActorBootstrapped { }; }; + enum class EDelayedRequestType { + QueryRequest, + ScriptRequest, + ForgetScriptExecutionOperation, + GetScriptExecutionOperation, + ListScriptExecutionOperations, + CancelScriptExecutionOperation, + }; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_PROXY_ACTOR; @@ -486,6 +495,7 @@ class TKqpProxyService : public TActorBootstrapped { }); ResourcePoolsCache.UnsubscribeFromResourcePoolClassifiers(ActorContext()); + DatabasesCache.StopSubscriberActor(ActorContext()); return TActor::PassAway(); } @@ -635,6 +645,10 @@ class TKqpProxyService : public TActorBootstrapped { } void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + if (!DatabasesCache.SetDatabaseIdOrDefer(ev, static_cast(EDelayedRequestType::QueryRequest), ActorContext())) { + return; + } + const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); @@ -735,7 +749,7 @@ class TKqpProxyService : public TActorBootstrapped { } void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) { - if (CheckScriptExecutionsTablesReady(ev)) { + if (CheckScriptExecutionsTablesReady(ev, EDelayedRequestType::ScriptRequest)) { auto req = ev->Get()->Record.MutableRequest(); auto maxRunTime = GetQueryTimeout(req->GetType(), req->GetTimeoutMs(), TableServiceConfig, QueryServiceConfig); req->SetTimeoutMs(maxRunTime.MilliSeconds()); @@ -1359,7 +1373,8 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); hFunc(NWorkload::TEvUpdatePoolInfo, Handle); - hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvDelayedRequestError, Handle); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", @@ -1627,12 +1642,53 @@ class TKqpProxyService : public TActorBootstrapped { NYql::NDq::SetYqlLogLevels(yqlPriority); } - template - bool CheckScriptExecutionsTablesReady(TEvent& ev) { + void HanleDelayedRequestError(EDelayedRequestType requestType, THolder requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { + switch (requestType) { + case EDelayedRequestType::QueryRequest: { + auto response = std::make_unique(); + response->Record.GetRef().SetYdbStatus(status); + NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); + Send(requestEvent->Sender, std::move(response), 0, requestEvent->Cookie); + break; + } + + case EDelayedRequestType::ScriptRequest: + HanleDelayedScriptRequestError(std::move(requestEvent), status, std::move(issues)); + break; + + case EDelayedRequestType::ForgetScriptExecutionOperation: + HanleDelayedScriptRequestError(std::move(requestEvent), status, std::move(issues)); + break; + + case EDelayedRequestType::GetScriptExecutionOperation: + HanleDelayedScriptRequestError(std::move(requestEvent), status, std::move(issues)); + break; + + case EDelayedRequestType::ListScriptExecutionOperations: + HanleDelayedScriptRequestError(std::move(requestEvent), status, std::move(issues)); + break; + + case EDelayedRequestType::CancelScriptExecutionOperation: + HanleDelayedScriptRequestError(std::move(requestEvent), status, std::move(issues)); + break; + } + } + + template + void HanleDelayedScriptRequestError(THolder requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) const { + Send(requestEvent->Sender, new TResponse(status, std::move(issues)), 0, requestEvent->Cookie); + } + + template + bool CheckScriptExecutionsTablesReady(TEvent& ev, EDelayedRequestType requestType) { if (!AppData()->FeatureFlags.GetEnableScriptExecutionOperations()) { NYql::TIssues issues; issues.AddIssue("ExecuteScript feature is not enabled"); - Send(ev->Sender, new TResponse(Ydb::StatusIds::UNSUPPORTED, std::move(issues))); + HanleDelayedRequestError(requestType, std::move(ev), Ydb::StatusIds::UNSUPPORTED, std::move(issues)); + return false; + } + + if (!DatabasesCache.SetDatabaseIdOrDefer(ev, static_cast(requestType), ActorContext())) { return false; } @@ -1645,14 +1701,12 @@ class TKqpProxyService : public TActorBootstrapped { if (DelayedEventsQueue.size() < 10000) { DelayedEventsQueue.push_back({ .Event = std::move(ev), - .ResponseBuilder = [](Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { - return new TResponse(status, std::move(issues)); - } + .RequestType = static_cast(requestType) }); } else { NYql::TIssues issues; issues.AddIssue("Too many queued requests"); - Send(ev->Sender, new TResponse(Ydb::StatusIds::OVERLOADED, std::move(issues))); + HanleDelayedRequestError(requestType, std::move(ev), Ydb::StatusIds::OVERLOADED, std::move(issues)); } return false; case EScriptExecutionsCreationStatus::Finished: @@ -1677,32 +1731,32 @@ class TKqpProxyService : public TActorBootstrapped { if (ev->Get()->Success) { Send(std::move(delayedEvent.Event)); } else { - Send(delayedEvent.Event->Sender, delayedEvent.ResponseBuilder(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue})); + HanleDelayedRequestError(static_cast(delayedEvent.RequestType), std::move(delayedEvent.Event), Ydb::StatusIds::INTERNAL_ERROR, {rootIssue}); } DelayedEventsQueue.pop_front(); } } void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) { - if (CheckScriptExecutionsTablesReady(ev)) { + if (CheckScriptExecutionsTablesReady(ev, EDelayedRequestType::ForgetScriptExecutionOperation)) { Register(CreateForgetScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) { - if (CheckScriptExecutionsTablesReady(ev)) { + if (CheckScriptExecutionsTablesReady(ev, EDelayedRequestType::GetScriptExecutionOperation)) { Register(CreateGetScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) { - if (CheckScriptExecutionsTablesReady(ev)) { + if (CheckScriptExecutionsTablesReady(ev, EDelayedRequestType::ListScriptExecutionOperations)) { Register(CreateListScriptExecutionOperationsActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { - if (CheckScriptExecutionsTablesReady(ev)) { + if (CheckScriptExecutionsTablesReady(ev, EDelayedRequestType::CancelScriptExecutionOperation)) { Register(CreateCancelScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } @@ -1806,8 +1860,15 @@ class TKqpProxyService : public TActorBootstrapped { ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); } - void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) { - ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + } + DatabasesCache.UpdateDatabaseInfo(ev, ActorContext()); + } + + void Handle(TEvKqp::TEvDelayedRequestError::TPtr& ev) { + HanleDelayedRequestError(static_cast(ev->Cookie), std::move(ev->Get()->RequestEvent), ev->Get()->Status, std::move(ev->Get()->Issues)); } void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { @@ -1867,16 +1928,13 @@ class TKqpProxyService : public TActorBootstrapped { Pending, Finished, }; - struct TDelayedEvent { - THolder Event; - std::function ResponseBuilder; - }; EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted; - std::deque DelayedEventsQueue; + std::deque DelayedEventsQueue; bool IsLookupByRmScheduled = false; TActorId KqpTempTablesAgentActor; TResourcePoolsCache ResourcePoolsCache; + TDatabasesCache DatabasesCache; }; } // namespace diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index d10289009104..3d40621124e4 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -642,4 +642,63 @@ class TResourcePoolsCache { bool SubscribedOnResourcePoolClassifiers = false; }; +class TDatabasesCache { +public: + struct TDelayedEvent { + THolder Event; + i32 RequestType; + }; + +private: + struct TDatabaseInfo { + TString DatabaseId; // string "::" + std::vector DelayedEvents; + }; + +public: + TDatabasesCache(TDuration idleTimeout = TDuration::Seconds(60)); + + template + bool SetDatabaseIdOrDefer(TEvent& event, i32 requestType, TActorContext actorContext) { + if (!event->Get()->GetDatabaseId().empty()) { + return true; + } + + const auto& database = CanonizePath(event->Get()->GetDatabase()); + if (database.empty() || database == GetTenantName()) { + event->Get()->SetDatabaseId(GetTenantName()); + return true; + } + + auto& databaseInfo = DatabasesCache[database]; + if (databaseInfo.DatabaseId) { + PingDatabaseSubscription(database, actorContext); + event->Get()->SetDatabaseId(databaseInfo.DatabaseId); + return true; + } + + SubscribeOnDatabase(database, actorContext); + databaseInfo.DelayedEvents.push_back(TDelayedEvent{ + .Event = std::move(event), + .RequestType = requestType + }); + + return false; + } + + void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext); + void StopSubscriberActor(TActorContext actorContext) const; + +private: + const TString& GetTenantName(); + void SubscribeOnDatabase(const TString& database, TActorContext actorContext); + void PingDatabaseSubscription(const TString& database, TActorContext actorContext) const; + +private: + const TDuration IdleTimeout; + std::unordered_map DatabasesCache; + TActorId SubscriberActor; + TString TenantName; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index 883ec7d9198e..1d008a72be80 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -65,6 +67,89 @@ TString CreateSession(TTestActorRuntime* runtime, const TActorId& kqpProxy, cons return sessionId; } +class TDatabaseCacheTestActor : public TActorBootstrapped { +public: + TDatabaseCacheTestActor(const TString& database, const TString& expectedDatabaseId, TDuration idleTimeout, NThreading::TPromise promise) + : IdleTimeout(idleTimeout) + , Database(database) + , ExpectedDatabaseId(expectedDatabaseId) + , Cache(idleTimeout) + , Promise(promise) + {} + + void Bootstrap() { + Become(&TDatabaseCacheTestActor::StateFunc); + + auto event = MakeHolder(); + event->Record.MutableRequest()->SetDatabase(Database); + Send(SelfId(), event.Release()); + + Schedule(3 * IdleTimeout, new TEvents::TEvWakeup()); + } + + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + if (!CacheUpdated) { + UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, Ydb::StatusIds::SUCCESS, TStringBuilder() << GetErrorString() << ev->Get()->Issues.ToString()); + Cache.UpdateDatabaseInfo(ev, ActorContext()); + CacheUpdated = true; + } else { + UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, Ydb::StatusIds::ABORTED, TStringBuilder() << GetErrorString() << ev->Get()->Issues.ToString()); + UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->Issues.ToString(), "Database subscription was dropped by idle timeout", GetErrorString()); + Finish(); + } + } + + void Handle(TEvKqp::TEvDelayedRequestError::TPtr& ev) { + UNIT_ASSERT_C(false, TStringBuilder() << "Unexpected fail, status: " << ev->Get()->Status << ", " << GetErrorString() << ev->Get()->Issues.ToString()); + } + + void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + auto success = Cache.SetDatabaseIdOrDefer(ev, 0, ActorContext()); + + bool dedicated = Database == ExpectedDatabaseId; + if (CacheUpdated || dedicated) { + UNIT_ASSERT_C(success, TStringBuilder() << "Expected database id from cache, " << GetErrorString()); + UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->GetDatabaseId(), ExpectedDatabaseId, GetErrorString()); + if (dedicated) { + Finish(); + } + } else { + UNIT_ASSERT_C(!success, TStringBuilder() << "Unexpected database id from cache, " << GetErrorString()); + } + } + + void HandleWakeup() { + UNIT_ASSERT_C(false, TStringBuilder() << "Test cache timeout, " << GetErrorString()); + Finish(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvDelayedRequestError, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); + sFunc(TEvents::TEvWakeup, HandleWakeup); + ) + +private: + TString GetErrorString() const { + return TStringBuilder() << "cache updated: " << CacheUpdated << ", database: " << Database << "\n"; + } + + void Finish() { + Promise.SetValue(); + PassAway(); + } + +private: + const TDuration IdleTimeout; + const TString Database; + const TString ExpectedDatabaseId; + TDatabasesCache Cache; + NThreading::TPromise Promise; + + bool CacheUpdated = false; +}; + } Y_UNIT_TEST_SUITE(KqpProxy) { @@ -542,5 +627,30 @@ Y_UNIT_TEST_SUITE(KqpProxy) { UNIT_ASSERT(allDoneOk); } + + Y_UNIT_TEST(DatabasesCacheForServerless) { + auto ydb = NWorkload::TYdbSetupSettings() + .CreateSampleTenants(true) + .Create(); + + auto& runtime = *ydb->GetRuntime(); + TDuration idleTimeout = TDuration::Seconds(5); + + auto checkCache = [&](const TString& database, const TString& expectedDatabaseId, ui32 nodeIndex) { + auto promise = NThreading::NewPromise(); + runtime.Register(new TDatabaseCacheTestActor(database, expectedDatabaseId, idleTimeout, promise), nodeIndex); + promise.GetFuture().GetValueSync(); + }; + + const auto& dedicatedTennant = ydb->GetSettings().GetDedicatedTenantName(); + checkCache(dedicatedTennant, dedicatedTennant, 2); + + const auto& sharedTennant = ydb->GetSettings().GetSharedTenantName(); + checkCache(sharedTennant, sharedTennant, 1); + + const auto& serverlessTennant = ydb->GetSettings().GetServerlessTenantName(); + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, 1); + } + } // namspace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/proxy_service/ut/ya.make b/ydb/core/kqp/proxy_service/ut/ya.make index 730f59a3fcae..fc6d9e7c89cb 100644 --- a/ydb/core/kqp/proxy_service/ut/ya.make +++ b/ydb/core/kqp/proxy_service/ut/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/core/kqp/run_script_actor ydb/core/kqp/proxy_service ydb/core/kqp/ut/common + ydb/core/kqp/workload_service/ut/common ydb/library/yql/sql/pg_dummy ydb/public/sdk/cpp/client/ydb_query ydb/public/sdk/cpp/client/ydb_driver diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make index e1c2b9e1b76a..8a143789e701 100644 --- a/ydb/core/kqp/proxy_service/ya.make +++ b/ydb/core/kqp/proxy_service/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( kqp_proxy_service.cpp + kqp_proxy_databases_cache.cpp kqp_proxy_peer_stats_calculator.cpp kqp_script_executions.cpp kqp_session_info.cpp diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 781d9f4a6eca..e7cb25220418 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -489,6 +489,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } if (result.DomainInfo) { Serverless = result.DomainInfo->IsServerless(); + PathId = result.DomainInfo->DomainKey; } Reply(Ydb::StatusIds::SUCCESS); return; @@ -537,7 +538,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues))); + Send(ReplyActorId, new TEvFetchDatabaseResponse(status, Database, Serverless, PathId, std::move(Issues))); PassAway(); } @@ -560,6 +561,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { const NACLib::EAccessRights CheckAccess; bool Serverless = false; + TPathId PathId; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index df821a4c26d6..57e332ac66b3 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -22,7 +22,6 @@ struct TEvPrivate { EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvResolvePoolResponse, EvFetchPoolResponse, - EvFetchDatabaseResponse, EvCreatePoolResponse, EvPrepareTablesRequest, EvPlaceRequestIntoPoolResponse, @@ -47,8 +46,6 @@ struct TEvPrivate { EvStartRequestResponse, EvCleanupRequestsResponse, - EvRanksCheckerResponse, - EvEnd }; @@ -94,20 +91,6 @@ struct TEvPrivate { const NYql::TIssues Issues; }; - struct TEvFetchDatabaseResponse : public NActors::TEventLocal { - TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues) - : Status(status) - , Database(database) - , Serverless(serverless) - , Issues(std::move(issues)) - {} - - const Ydb::StatusIds::StatusCode Status; - const TString Database; - const bool Serverless; - const NYql::TIssues Issues; - }; - struct TEvCreatePoolResponse : public NActors::TEventLocal { TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Status(status) @@ -334,21 +317,6 @@ struct TEvPrivate { const std::vector SesssionIds; const NYql::TIssues Issues; }; - - // Resource pool classifier events - struct TEvRanksCheckerResponse : public TEventLocal { - TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues) - : Status(status) - , MaxRank(maxRank) - , NumberClassifiers(numberClassifiers) - , Issues(std::move(issues)) - {} - - const Ydb::StatusIds::StatusCode Status; - const i64 MaxRank; - const ui64 NumberClassifiers; - const NYql::TIssues Issues; - }; }; } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 26653cda3e3e..9bcfe49f4a4e 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -214,7 +214,7 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvCleanupRequest, Handle); hFunc(TEvents::TEvWakeup, Handle); - hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle); + hFunc(TEvFetchDatabaseResponse, Handle); hFunc(TEvPrivate::TEvFetchPoolResponse, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); @@ -231,7 +231,7 @@ class TKqpWorkloadService : public TActorBootstrapped { ) private: - void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + void Handle(TEvFetchDatabaseResponse::TPtr& ev) { GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev); } diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 813f97b6e107..e9e292d81dfc 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -70,17 +70,13 @@ struct TDatabaseState { subscribers.clear(); } - void UpdateDatabaseInfo(const TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + void UpdateDatabaseInfo(const TEvFetchDatabaseResponse::TPtr& ev) { DatabaseUnsupported = ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED; if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { ReplyContinueError(ev->Get()->Status, GroupIssues(ev->Get()->Issues, "Failed to fetch database info")); return; } - if (Serverless != ev->Get()->Serverless) { - ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless)); - } - LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 1813f8b41500..bb0ee347e6be 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -635,7 +635,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForFail(TIntrusivePtr ydb, const TQueryRunnerSettings& settings, const TString& poolId) { - ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); @@ -644,7 +644,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForSuccess(TIntrusivePtr ydb, const TQueryRunnerSettings& settings) { - ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier success", [ydb, settings](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier success", [ydb, settings](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c9f4cc7442d0..a4c6906783b5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -117,6 +117,7 @@ message TQueryRequest { optional string UserSID = 33; optional uint64 OutputChunkMaxSize = 34; optional string PoolId = 35; + optional string DatabaseId = 36; } message TKqpPathIdProto {