Skip to content

Commit

Permalink
YQ-3689 added kqp proxy database cache (ydb-platform#9644)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 25, 2024
1 parent 569413c commit 32809cc
Show file tree
Hide file tree
Showing 20 changed files with 643 additions and 89 deletions.
46 changes: 46 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ struct TEvKqp {
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
TEvScriptRequest() = default;

const TString& GetDatabase() const {
return Record.GetRequest().GetDatabase();
}

const TString& GetDatabaseId() const {
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
Record.MutableRequest()->SetDatabaseId(databaseId);
}

mutable NKikimrKqp::TEvQueryRequest Record;
TDuration ForgetAfter;
TDuration ResultsTtl;
Expand Down Expand Up @@ -161,6 +173,40 @@ struct TEvKqp {
return issues;
}
};

struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
, Database(database)
, Issues(std::move(issues))
{}

TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
: Status(Ydb::StatusIds::SUCCESS)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, Issues({})
{}

Ydb::StatusIds::StatusCode Status;
TString Database;
TString DatabaseId;
bool Serverless = false;
NYql::TIssues Issues;
};

struct TEvDelayedRequestError : public TEventLocal<TEvDelayedRequestError, TKqpEvents::EvDelayedRequestError> {
TEvDelayedRequestError(THolder<IEventHandle> requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: RequestEvent(std::move(requestEvent))
, Status(status)
, Issues(std::move(issues))
{}

THolder<IEventHandle> RequestEvent;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};
};

} // namespace NKikimr::NKqp
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return PoolConfig;
}

const TString& GetDatabaseId() const {
return DatabaseId ? DatabaseId : Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

mutable NKikimrKqp::TEvQueryRequest Record;

private:
Expand All @@ -363,6 +371,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
TActorId RequestActorId;
TString Database;
TString DatabaseId;
TString SessionId;
TString YqlText;
TString QueryId;
Expand Down
52 changes: 34 additions & 18 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
FS_ROLLBACK,
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
template <typename TEv, ui32 TEventType>
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
TEventWithDatabaseId(const TString& database)
: Database(database)
, OperationId(id)
{}

const TString& GetDatabase() const {
return Database;
}

const TString& GetDatabaseId() const {
return DatabaseId;
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

const TString Database;
TString DatabaseId;
};

struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{}

const NOperationId::TOperationId OperationId;
};

Expand All @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
NYql::TIssues Issues;
};

struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down Expand Up @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
TMaybe<google::protobuf::Any> Metadata;
};

struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
: Database(database)
: TEventWithDatabaseId(database)
, PageSize(pageSize)
, PageToken(pageToken)
{}

TString Database;
ui64 PageSize;
TString PageToken;
};
Expand Down Expand Up @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
};

struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/scheme/scheme_pathid.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/event_local.h>
Expand Down Expand Up @@ -90,14 +91,20 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues)
: Status(status)
, Database(database)
, Serverless(serverless)
, PathId(pathId)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const bool Serverless;
const TPathId PathId;
const NYql::TIssues Issues;
};

} // NKikimr::NKqp::NWorkload
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ PEERDIR(
ydb/core/kqp/common/shutdown
ydb/core/kqp/common/compilation
ydb/core/resource_pools
ydb/core/scheme

ydb/library/yql/dq/actors
ydb/public/api/protos
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

if (!DatabaseId.empty()) {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct TKqpEvents {
EvListSessionsRequest,
EvListSessionsResponse,
EvListProxyNodesRequest,
EvListProxyNodesResponse
EvListProxyNodesResponse,
EvUpdateDatabaseInfo,
EvDelayedRequestError
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down Expand Up @@ -174,8 +176,8 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
EvFetchDatabaseResponse,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <ydb/core/base/path.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/kqp/workload_service/actors/actors.h>
#include <ydb/core/kqp/workload_service/common/events.h>
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>

Expand All @@ -20,6 +19,31 @@ using namespace NResourcePool;
using namespace NWorkload;


struct TEvPrivate {
// Event ids
enum EEv : ui32 {
EvRanksCheckerResponse = EventSpaceBegin(TEvents::ES_PRIVATE),

EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");

struct TEvRanksCheckerResponse : public TEventLocal<TEvRanksCheckerResponse, EvRanksCheckerResponse> {
TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues)
: Status(status)
, MaxRank(maxRank)
, NumberClassifiers(numberClassifiers)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const i64 MaxRank;
const ui64 NumberClassifiers;
const NYql::TIssues Issues;
};
};

class TRanksCheckerActor : public NKikimr::TQueryBase {
using TBase = NKikimr::TQueryBase;

Expand Down Expand Up @@ -177,7 +201,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
TryFinish();
}

void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
void Handle(TEvFetchDatabaseResponse::TPtr& ev) {
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues);
return;
Expand Down Expand Up @@ -223,7 +247,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou

STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvFetchDatabaseResponse, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle)
Expand Down
Loading

0 comments on commit 32809cc

Please sign in to comment.