From 365a93dd605610d2609c08ff7b0fdc32c66c2c1f Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Tue, 12 Mar 2024 02:20:04 +0300 Subject: [PATCH 01/21] WIP: introduce etcd KV grpc service --- .../etcd/api/etcdserverpb/kv/grpc_service.cpp | 66 +++++++++++++++++++ .../etcd/api/etcdserverpb/kv/grpc_service.h | 35 ++++++++++ ydb/services/etcd/ya.make | 14 ++++ ydb/services/ya.make | 1 + 4 files changed, 116 insertions(+) create mode 100644 ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp create mode 100644 ydb/services/etcd/api/etcdserverpb/kv/grpc_service.h create mode 100644 ydb/services/etcd/ya.make diff --git a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp new file mode 100644 index 000000000000..9fbf25b75cc4 --- /dev/null +++ b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp @@ -0,0 +1,66 @@ +#include "grpc_service.h" + +#include +#include +#include +#include + + +namespace NKikimr::NGRpcService { + +TGRpcEtcdKVService::TGRpcEtcdKVService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId &proxyId, bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxyId, rlAllowed), + HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) {} + +TGRpcEtcdKVService::TGRpcEtcdKVService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector &proxies, bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxies, rlAllowed), + HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) {} + +void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { + using namespace etcdserverpb; + using namespace NEtcd; + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + size_t proxyCounter = 0; + + // TODO [pavelbezpravel]: WIP. Remove. + Y_UNUSED(logger); + Y_UNUSED(proxyCounter); + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif +#define ADD_REQUEST(NAME, IN, OUT, CB, ...) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive>(this, &Service_, cq, \ + [this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ + new TGrpcRequestNoOperationCall \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr __VA_OPT__(, TAuditMode::__VA_ARGS__)})); \ + }, &etcdserverpb::KV::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("KV", #NAME))->Run(); \ + ++proxyCounter; \ + } \ + } + + // TODO [pavelbezpravel]: WIP. + + // ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); + // ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); + // ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); + // ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); + // ADD_REQUEST(Compact, CompactionRequest, CompactionResponse, DoCompact); + +#undef ADD_REQUEST +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.h b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.h new file mode 100644 index 000000000000..953b5c90d703 --- /dev/null +++ b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.h @@ -0,0 +1,35 @@ +#pragma once + +#include + +#include + +namespace NKikimr::NGRpcService { + +class TGRpcEtcdKVService : public TGrpcServiceBase +{ +public: + using TGrpcServiceBase::TGrpcServiceBase; + + TGRpcEtcdKVService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + + TGRpcEtcdKVService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue); + +private: + void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; +}; + +} // namespace NKikimr::NGRpcService \ No newline at end of file diff --git a/ydb/services/etcd/ya.make b/ydb/services/etcd/ya.make new file mode 100644 index 000000000000..61f7958227ca --- /dev/null +++ b/ydb/services/etcd/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + api/etcdserverpb/kv/grpc_service.cpp +) + +PEERDIR( + ydb/public/api/etcd + ydb/library/grpc/server + ydb/core/grpc_services + ydb/core/grpc_services/base +) + +END() diff --git a/ydb/services/ya.make b/ydb/services/ya.make index 3928a5797057..ab15f10e932f 100644 --- a/ydb/services/ya.make +++ b/ydb/services/ya.make @@ -5,6 +5,7 @@ RECURSE( dynamic_config datastreams discovery + etcd fq kesus keyvalue From 6322f6b7e9fe1df7b42fee1d8b8317722dab8574 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Tue, 12 Mar 2024 02:25:08 +0300 Subject: [PATCH 02/21] WIP: add impl for KV::Range call --- ydb/core/base/events.h | 1 + ydb/core/etcd/events.h | 24 +++++++ .../etcd/api/etcdserverpb/rpc_range.cpp | 70 +++++++++++++++++++ .../etcd/api/etcdserverpb/service_kv.h | 21 ++++++ ydb/core/grpc_services/ya.make | 3 + 5 files changed, 119 insertions(+) create mode 100644 ydb/core/etcd/events.h create mode 100644 ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp create mode 100644 ydb/core/grpc_services/etcd/api/etcdserverpb/service_kv.h diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 93c28aa34edf..1fea5e7e84c4 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -174,6 +174,7 @@ struct TKikimrEvents : TEvents { ES_REPLICATION_SERVICE, ES_CHANGE_EXCHANGE, ES_S3_FILE_QUEUE, + ES_ETCD, }; }; diff --git a/ydb/core/etcd/events.h b/ydb/core/etcd/events.h new file mode 100644 index 000000000000..eb89185f4de0 --- /dev/null +++ b/ydb/core/etcd/events.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include + +namespace NKikimr::NEtcd { + +struct TEvEtcd { + enum EEv { + EvRangeResponse = EventSpaceBegin(NKikimr::TKikimrEvents::ES_ETCD), + + EvEnd + }; + + static_assert( + EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::TKikimrEvents::ES_ETCD), + "expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD)"); + + struct TEvRangeResponse : public TEventLocal {}; + +}; + +} // namespace NKikimr::NEtcd \ No newline at end of file diff --git a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp new file mode 100644 index 000000000000..2e157a947bc3 --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp @@ -0,0 +1,70 @@ +#include "service_kv.h" + +#include +#include +#include + +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvRangeRequest = TGrpcRequestNoOperationCall; + +class TRangeRequestRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TRangeRequestRPC(TEvRangeRequest* request) + : Request_(request) + {} + + void Bootstrap() { + Become(&TRangeRequestRPC::StateFunc); + const auto& request = *Request_->GetProtoRequest(); + Reply(request); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NKikimr::NEtcd::TEvEtcd::TEvRangeResponse, Handle); + ) + + void Handle(NKikimr::NEtcd::TEvEtcd::TEvRangeResponse::TPtr& ev) { + Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. + const auto& request = *Request_->GetProtoRequest(); + Reply(request); + } + + void Reply(const etcdserverpb::RangeRequest& request) { + etcdserverpb::RangeResponse response{}; + auto* kv = response.add_kvs(); + kv->set_key(request.key()); + kv->set_value("TODO [pavelbezpravel]: Range stub."); + PassAway(); + } + +private: + std::unique_ptr Request_; +}; + +} // namespace + +namespace NEtcd { + +void DoRange(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TRangeRequestRPC(req)); +} + +} + +} // namespace NKikimr::NGRpcService \ No newline at end of file diff --git a/ydb/core/grpc_services/etcd/api/etcdserverpb/service_kv.h b/ydb/core/grpc_services/etcd/api/etcdserverpb/service_kv.h new file mode 100644 index 000000000000..67fb1a27f34f --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/etcdserverpb/service_kv.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +namespace NKikimr::NGRpcService { + +class IRequestOpCtx; +class IRequestNoOpCtx; +class IFacilityProvider; + +namespace NEtcd { + +void DoRange(std::unique_ptr p, const IFacilityProvider& f); +void DoPut(std::unique_ptr p, const IFacilityProvider& f); +void DoDeleteRange(std::unique_ptr p, const IFacilityProvider&); +void DoTxn(std::unique_ptr p, const IFacilityProvider& f); +void DoCompact(std::unique_ptr p, const IFacilityProvider& f); + +} // namespace NEtcd + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 6af54cac7e44..9766bce4accf 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -82,6 +82,8 @@ SRCS( query/rpc_attach_session.cpp query/rpc_kqp_tx.cpp query/service_query.h + + # etcd/api/etcdserverpb/rpc_range.cpp ) PEERDIR( @@ -125,6 +127,7 @@ PEERDIR( ydb/library/yql/public/types ydb/library/yql/public/issue ydb/library/services + ydb/public/api/etcd ydb/public/api/grpc/draft ydb/public/api/protos ydb/public/lib/fq From 06365318dc6297abb976797272c9bdd5f2a0fd1a Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Sat, 30 Mar 2024 11:47:49 +0300 Subject: [PATCH 03/21] WIP: add FillImpl for etcd RangeResponse --- ydb/core/grpc_services/base/base.h | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index f3bbb9463444..091be7535093 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -30,6 +30,12 @@ #include +namespace etcdserverpb { + +class RangeResponse; + +} + namespace NKikimr { namespace NSchemeCache { @@ -456,6 +462,12 @@ class IRequestNoOpCtx : public IRequestCtx { }; struct TCommonResponseFillerImpl { + static void FillImpl(etcdserverpb::RangeResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { + Y_UNUSED(resp); + Y_UNUSED(issues); + Y_UNUSED(status); + } + template static void FillImpl(T& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { resp.set_status(status); From 25ba8bf944821f08a65b19c13ee7457370e90c60 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Sat, 30 Mar 2024 11:51:51 +0300 Subject: [PATCH 04/21] feat: add Range rpc implementation --- ydb/core/etcd/events.h | 38 ++++++++++++++++++- .../etcd/api/etcdserverpb/rpc_range.cpp | 4 +- ydb/core/grpc_services/ya.make | 5 ++- .../etcd/api/etcdserverpb/kv/grpc_service.cpp | 2 +- 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/ydb/core/etcd/events.h b/ydb/core/etcd/events.h index eb89185f4de0..717ed9058b6f 100644 --- a/ydb/core/etcd/events.h +++ b/ydb/core/etcd/events.h @@ -1,11 +1,36 @@ #pragma once +#include +#include +#include + +#include "ydb/library/yql/public/issue/yql_issue.h" #include +#include +#include + #include +#include namespace NKikimr::NEtcd { +// TODO [pavelbezpravel]: test. + +struct TKeyValue { + TString key; + i64 create_revision; + i64 mod_revision; + i64 version; + TString value; +}; + +struct TRangeResponse { + TVector Kvs; + bool More; + size_t Count; +}; + struct TEvEtcd { enum EEv { EvRangeResponse = EventSpaceBegin(NKikimr::TKikimrEvents::ES_ETCD), @@ -17,7 +42,18 @@ struct TEvEtcd { EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::TKikimrEvents::ES_ETCD), "expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD)"); - struct TEvRangeResponse : public TEventLocal {}; + struct TEvRangeResponse : public NActors::TEventLocal { + TEvRangeResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TRangeResponse&& rangeResponse) + : Status(status) + , Issues(issues) + , RangeResponse(rangeResponse) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TRangeResponse RangeResponse; + }; }; diff --git a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp index 2e157a947bc3..bdb1a52a13da 100644 --- a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp @@ -4,7 +4,9 @@ #include #include +#include #include +#include #include @@ -43,7 +45,7 @@ class TRangeRequestRPC : public TActorBootstrapped { Reply(request); } - void Reply(const etcdserverpb::RangeRequest& request) { + void Reply(const ::etcdserverpb::RangeRequest& request) { etcdserverpb::RangeResponse response{}; auto* kv = response.add_kvs(); kv->set_key(request.key()); diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 9766bce4accf..aa61ad25fad7 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -4,6 +4,9 @@ SRCS( audit_log.cpp audit_dml_operations.cpp db_metadata_cache.h + + etcd/api/etcdserverpb/rpc_range.cpp + grpc_endpoint_publish_actor.cpp grpc_helper.cpp grpc_mon.cpp @@ -82,8 +85,6 @@ SRCS( query/rpc_attach_session.cpp query/rpc_kqp_tx.cpp query/service_query.h - - # etcd/api/etcdserverpb/rpc_range.cpp ) PEERDIR( diff --git a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp index 9fbf25b75cc4..33843b2795cb 100644 --- a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp @@ -54,7 +54,7 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { // TODO [pavelbezpravel]: WIP. - // ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); + ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); // ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); // ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); // ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); From 2c5652819ec706ad11439a27057deb8489f679c4 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Sat, 30 Mar 2024 11:53:10 +0300 Subject: [PATCH 05/21] feat: run etcd kv service --- ydb/core/driver_lib/run/run.cpp | 8 ++++++++ ydb/core/driver_lib/run/ya.make | 1 + 2 files changed, 9 insertions(+) diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index e670fe9a0a66..f9f1294ab545 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -94,6 +94,7 @@ #include #include #include +#include #include #include #include @@ -583,6 +584,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["query_service"] = &hasQueryService; TServiceCfg hasKeyValue = services.empty(); names["keyvalue"] = &hasKeyValue; + TServiceCfg hasEtcdKVService = services.empty(); + names["etcd_kv_service"] = &hasEtcdKVService; std::unordered_set enabled; for (const auto& name : services) { @@ -850,6 +853,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { server.AddService(service); } } + + if (hasEtcdKVService) { + server.AddService(new NGRpcService::TGRpcEtcdKVService(ActorSystem.Get(), Counters, + grpcRequestProxies, hasEtcdKVService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); + } }; if (appConfig.HasGRpcConfig() && appConfig.GetGRpcConfig().GetStartGRpcProxy()) { diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 62714d224329..81a9d9cebd63 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -144,6 +144,7 @@ PEERDIR( ydb/services/dynamic_config ydb/services/datastreams ydb/services/discovery + ydb/services/etcd ydb/services/fq ydb/services/kesus ydb/services/keyvalue From 052b3501e5ffce1cece61e3b92203d5b395a5faa Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Thu, 4 Apr 2024 17:27:13 +0300 Subject: [PATCH 06/21] WIP: introduce Range rpc call everything works, but it requires refactoring --- ydb/core/driver_lib/run/config.h | 1 + ydb/core/grpc_services/base/base.h | 6 +++++- ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp | 3 ++- ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp | 4 ---- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 2cd944329a68..7f62a2ac47f7 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -84,6 +84,7 @@ union TBasicKikimrServicesMask { bool EnableDatabaseMetadataCache:1; bool EnableGraphService:1; + bool EnableEtcdKVService:1; // TODO [pavelbezpravel]: enable all etcd services, not only the KV? }; struct { diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 091be7535093..066b4f1d574a 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -462,7 +462,7 @@ class IRequestNoOpCtx : public IRequestCtx { }; struct TCommonResponseFillerImpl { - static void FillImpl(etcdserverpb::RangeResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { + static void FillImpl(::etcdserverpb::RangeResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { Y_UNUSED(resp); Y_UNUSED(issues); Y_UNUSED(status); @@ -477,6 +477,10 @@ struct TCommonResponseFillerImpl { template struct TCommonResponseFiller : private TCommonResponseFillerImpl { + static void Fill(::etcdserverpb::RangeResponse& resp, const NYql::TIssues& issues, Ydb::CostInfo* costInfo, Ydb::StatusIds::StatusCode status) { + Y_UNUSED(costInfo); + FillImpl(resp, issues, status); + } static void Fill(TResp& resp, const NYql::TIssues& issues, Ydb::CostInfo* costInfo, Ydb::StatusIds::StatusCode status) { auto& operation = *resp.mutable_operation(); operation.set_ready(true); diff --git a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp index bdb1a52a13da..5b0bf54230c8 100644 --- a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp @@ -16,7 +16,7 @@ namespace { using namespace NActors; -using TEvRangeRequest = TGrpcRequestNoOperationCall; +using TEvRangeRequest = TGrpcRequestNoOperationCall<::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>; class TRangeRequestRPC : public TActorBootstrapped { public: @@ -50,6 +50,7 @@ class TRangeRequestRPC : public TActorBootstrapped { auto* kv = response.add_kvs(); kv->set_key(request.key()); kv->set_value("TODO [pavelbezpravel]: Range stub."); + Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); PassAway(); } diff --git a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp index 33843b2795cb..20dd2e776661 100644 --- a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp @@ -30,10 +30,6 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); size_t proxyCounter = 0; - // TODO [pavelbezpravel]: WIP. Remove. - Y_UNUSED(logger); - Y_UNUSED(proxyCounter); - #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif From caa06d6c1b12506ef8bf5ec3d47435aa3edb7bdd Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 11:42:34 +0300 Subject: [PATCH 07/21] maintenance: move files to subdirectory --- ydb/core/driver_lib/run/run.cpp | 2 +- .../grpc_services/etcd/api/{etcdserverpb => kv}/rpc_range.cpp | 2 +- .../etcd/api/{etcdserverpb/service_kv.h => kv/service.h} | 0 ydb/core/grpc_services/ya.make | 2 +- ydb/services/etcd/api/{etcdserverpb => }/kv/grpc_service.cpp | 2 +- ydb/services/etcd/api/{etcdserverpb => }/kv/grpc_service.h | 0 ydb/services/etcd/ya.make | 2 +- 7 files changed, 5 insertions(+), 5 deletions(-) rename ydb/core/grpc_services/etcd/api/{etcdserverpb => kv}/rpc_range.cpp (98%) rename ydb/core/grpc_services/etcd/api/{etcdserverpb/service_kv.h => kv/service.h} (100%) rename ydb/services/etcd/api/{etcdserverpb => }/kv/grpc_service.cpp (97%) rename ydb/services/etcd/api/{etcdserverpb => }/kv/grpc_service.h (100%) diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index f9f1294ab545..bf4356130347 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -94,7 +94,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp similarity index 98% rename from ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp rename to ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp index 5b0bf54230c8..0651b42bfac8 100644 --- a/ydb/core/grpc_services/etcd/api/etcdserverpb/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp @@ -1,4 +1,4 @@ -#include "service_kv.h" +#include "service.h" #include #include diff --git a/ydb/core/grpc_services/etcd/api/etcdserverpb/service_kv.h b/ydb/core/grpc_services/etcd/api/kv/service.h similarity index 100% rename from ydb/core/grpc_services/etcd/api/etcdserverpb/service_kv.h rename to ydb/core/grpc_services/etcd/api/kv/service.h diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index aa61ad25fad7..7573a81edf71 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -5,7 +5,7 @@ SRCS( audit_dml_operations.cpp db_metadata_cache.h - etcd/api/etcdserverpb/rpc_range.cpp + etcd/api/kv/rpc_range.cpp grpc_endpoint_publish_actor.cpp grpc_helper.cpp diff --git a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp b/ydb/services/etcd/api/kv/grpc_service.cpp similarity index 97% rename from ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp rename to ydb/services/etcd/api/kv/grpc_service.cpp index 20dd2e776661..194bc7d73473 100644 --- a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/kv/grpc_service.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include namespace NKikimr::NGRpcService { diff --git a/ydb/services/etcd/api/etcdserverpb/kv/grpc_service.h b/ydb/services/etcd/api/kv/grpc_service.h similarity index 100% rename from ydb/services/etcd/api/etcdserverpb/kv/grpc_service.h rename to ydb/services/etcd/api/kv/grpc_service.h diff --git a/ydb/services/etcd/ya.make b/ydb/services/etcd/ya.make index 61f7958227ca..2c72bcb5ac22 100644 --- a/ydb/services/etcd/ya.make +++ b/ydb/services/etcd/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - api/etcdserverpb/kv/grpc_service.cpp + api/kv/grpc_service.cpp ) PEERDIR( From 19be898520c5bcc9dd9f681e4902d4e067ab7262 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 12:05:13 +0300 Subject: [PATCH 08/21] maintenance: minor renaming --- ydb/core/driver_lib/run/config.h | 2 +- ydb/core/driver_lib/run/run.cpp | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 7f62a2ac47f7..ed2287ffc0b9 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -84,7 +84,7 @@ union TBasicKikimrServicesMask { bool EnableDatabaseMetadataCache:1; bool EnableGraphService:1; - bool EnableEtcdKVService:1; // TODO [pavelbezpravel]: enable all etcd services, not only the KV? + bool EnableEtcdService:1; }; struct { diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index bf4356130347..77029e376db6 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -584,8 +584,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["query_service"] = &hasQueryService; TServiceCfg hasKeyValue = services.empty(); names["keyvalue"] = &hasKeyValue; - TServiceCfg hasEtcdKVService = services.empty(); - names["etcd_kv_service"] = &hasEtcdKVService; + TServiceCfg hasEtcdService = services.empty(); + names["etcd_service"] = &hasEtcdService; std::unordered_set enabled; for (const auto& name : services) { @@ -854,9 +854,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } } - if (hasEtcdKVService) { + if (hasEtcdService) { + // TODO [pavelbezpravel]: add pther etcd services later. server.AddService(new NGRpcService::TGRpcEtcdKVService(ActorSystem.Get(), Counters, - grpcRequestProxies, hasEtcdKVService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); + grpcRequestProxies, hasEtcdService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } }; From 7562366de49c298985f6281c916e512dafd090e7 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 16:38:54 +0300 Subject: [PATCH 09/21] maintenance: minor changes --- ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp index 0651b42bfac8..71dac358ce6d 100644 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp @@ -30,7 +30,7 @@ class TRangeRequestRPC : public TActorBootstrapped { void Bootstrap() { Become(&TRangeRequestRPC::StateFunc); - const auto& request = *Request_->GetProtoRequest(); + const auto request = *Request_->GetProtoRequest(); Reply(request); } @@ -41,16 +41,21 @@ class TRangeRequestRPC : public TActorBootstrapped { void Handle(NKikimr::NEtcd::TEvEtcd::TEvRangeResponse::TPtr& ev) { Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. - const auto& request = *Request_->GetProtoRequest(); + const auto request = *Request_->GetProtoRequest(); Reply(request); } void Reply(const ::etcdserverpb::RangeRequest& request) { - etcdserverpb::RangeResponse response{}; + auto response = etcdserverpb::RangeResponse{}; + + // TODO [pavelbezpravel]: WIP. auto* kv = response.add_kvs(); kv->set_key(request.key()); kv->set_value("TODO [pavelbezpravel]: Range stub."); + Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); + + // TODO [pavelbezpravel]: introduce Finish() method. PassAway(); } From 69fc49204aaa1726d539b8d0048710667215d063 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 17:10:21 +0300 Subject: [PATCH 10/21] maintenance: update events use NYdb namespace instead of NKikimr --- ydb/core/base/events.h | 3 +- ydb/core/etcd/events.h | 60 -------- ydb/core/etcd/kv/events.h | 109 +++++++++++++++ ydb/core/etcd/kv/proto.h | 132 ++++++++++++++++++ .../grpc_services/etcd/api/kv/rpc_range.cpp | 6 +- 5 files changed, 246 insertions(+), 64 deletions(-) delete mode 100644 ydb/core/etcd/events.h create mode 100644 ydb/core/etcd/kv/events.h create mode 100644 ydb/core/etcd/kv/proto.h diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 1fea5e7e84c4..33855eca65d3 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -174,7 +174,8 @@ struct TKikimrEvents : TEvents { ES_REPLICATION_SERVICE, ES_CHANGE_EXCHANGE, ES_S3_FILE_QUEUE, - ES_ETCD, + ES_ETCD_REVISION, + ES_ETCD_KV, }; }; diff --git a/ydb/core/etcd/events.h b/ydb/core/etcd/events.h deleted file mode 100644 index 717ed9058b6f..000000000000 --- a/ydb/core/etcd/events.h +++ /dev/null @@ -1,60 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "ydb/library/yql/public/issue/yql_issue.h" -#include - -#include -#include - -#include -#include - -namespace NKikimr::NEtcd { - -// TODO [pavelbezpravel]: test. - -struct TKeyValue { - TString key; - i64 create_revision; - i64 mod_revision; - i64 version; - TString value; -}; - -struct TRangeResponse { - TVector Kvs; - bool More; - size_t Count; -}; - -struct TEvEtcd { - enum EEv { - EvRangeResponse = EventSpaceBegin(NKikimr::TKikimrEvents::ES_ETCD), - - EvEnd - }; - - static_assert( - EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::TKikimrEvents::ES_ETCD), - "expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD)"); - - struct TEvRangeResponse : public NActors::TEventLocal { - TEvRangeResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TRangeResponse&& rangeResponse) - : Status(status) - , Issues(issues) - , RangeResponse(rangeResponse) - { - } - - Ydb::StatusIds::StatusCode Status; - NYql::TIssues Issues; - TRangeResponse RangeResponse; - }; - -}; - -} // namespace NKikimr::NEtcd \ No newline at end of file diff --git a/ydb/core/etcd/kv/events.h b/ydb/core/etcd/kv/events.h new file mode 100644 index 000000000000..6886541fb6aa --- /dev/null +++ b/ydb/core/etcd/kv/events.h @@ -0,0 +1,109 @@ +#pragma once + +#include "proto.h" + +#include +#include +#include + +namespace NYdb::NEtcd { + +struct TEvEtcdKV { + // Event ids + enum EEv : ui32 { + EvCreateTableResponse = EventSpaceBegin(NKikimr::TKikimrEvents::ES_ETCD_KV), + EvRangeResponse, + EvPutResponse, + EvDeleteResponse, + EvTxnCompareResponse, + EvTxnResponse, + + EvEnd + }; + + static_assert( + EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD_KV), + "expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD_KV)" + ); + + // Events + struct TEvCreateTableResponse : public NActors::TEventLocal { + }; + + struct TEvRangeResponse : public NActors::TEventLocal { + TEvRangeResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TRangeResponse&& response) + : Status(status) + , Issues(issues) + , TxId(std::move(txId)) + , Response(response) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TString TxId; + TRangeResponse Response; + }; + + struct TEvPutResponse : public NActors::TEventLocal { + TEvPutResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TPutResponse&& response) + : Status(status) + , Issues(issues) + , TxId(std::move(txId)) + , Response(response) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TString TxId; + TPutResponse Response; + }; + + struct TEvDeleteResponse : public NActors::TEventLocal { + TEvDeleteResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TDeleteResponse&& response) + : Status(status) + , Issues(issues) + , TxId(std::move(txId)) + , Response(response) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TString TxId; + TDeleteResponse Response; + }; + + struct TEvTxnCompareResponse : public NActors::TEventLocal { + TEvTxnCompareResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TTxnCompareResponse&& response) + : Status(status) + , Issues(issues) + , TxId(std::move(txId)) + , Response(response) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TString TxId; + TTxnCompareResponse Response; + }; + + struct TEvTxnResponse : public NActors::TEventLocal { + TEvTxnResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TTxnResponse&& response) + : Status(status) + , Issues(issues) + , TxId(std::move(txId)) + , Response(response) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TString TxId; + TTxnResponse Response; + }; +}; + +} // namespace NYdb::NEtcd diff --git a/ydb/core/etcd/kv/proto.h b/ydb/core/etcd/kv/proto.h new file mode 100644 index 000000000000..b03dff688c64 --- /dev/null +++ b/ydb/core/etcd/kv/proto.h @@ -0,0 +1,132 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace NYdb::NEtcd { + +struct TKeyValue { + TString key; + i64 create_revision; + i64 mod_revision; + i64 version; + TString value; +}; + +struct TRangeRequest { + enum class ESortOrder { + NONE, + ASCEND, + DESCEND, + }; + enum class ESortTarget { + KEY, + CREATE, + MOD, + VERSION, + VALUE, + }; + + TString key; + TString range_end; + size_t limit; + i64 revision; + ESortOrder sort_order; + ESortTarget sort_target; + bool serializable; + bool keys_only; + bool count_only; + i64 min_mod_revision; + i64 max_mod_revision; + i64 min_create_revision; + i64 max_create_revision; +}; + +struct TRangeResponse { + TVector Kvs; + bool More; + size_t Count; +}; + +struct TPutRequest { + TVector> Kvs; + bool PrevKv; + bool IgnoreValue; +}; + +struct TPutResponse { + TVector PrevKvs; +}; + +struct TDeleteRequest { + TString Key; + TString RangeEnd; + bool PrevKv; +}; + +struct TDeleteResponse { + size_t Deleted; + TVector PrevKvs; +}; + +struct TTxnRequest; + +using TRequestOp = std::variant< + std::shared_ptr, + std::shared_ptr, + std::shared_ptr, + std::shared_ptr +>; + +struct TTxnResponse; + +using TResponseOp = std::variant< + std::shared_ptr, + std::shared_ptr, + std::shared_ptr, + std::shared_ptr +>; + +struct TTxnCompareRequest { + enum class ECompareResult { + EQUAL, + GREATER, + LESS, + NOT_EQUAL, + }; + ECompareResult Result; + TMaybe Target_create_revision; + TMaybe Target_mod_revision; + TMaybe Target_version; + TMaybe Target_value; + TString Key; + TString Range_end; +}; + +struct TTxnCompareResponse { + bool Succeeded; +}; + +struct TTxnRequest { + TVector Compare; + std::array, 2> Requests; +}; + +struct TTxnResponse { + bool Succeeded; + TVector Responses; +}; + +} // namespace NYdb::NEtcd diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp index 71dac358ce6d..199507c41efa 100644 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp @@ -1,6 +1,6 @@ #include "service.h" -#include +#include #include #include @@ -36,10 +36,10 @@ class TRangeRequestRPC : public TActorBootstrapped { private: STRICT_STFUNC(StateFunc, - hFunc(NKikimr::NEtcd::TEvEtcd::TEvRangeResponse, Handle); + hFunc(NYdb::NEtcd::TEvEtcdKV::TEvRangeResponse, Handle); ) - void Handle(NKikimr::NEtcd::TEvEtcd::TEvRangeResponse::TPtr& ev) { + void Handle(NYdb::NEtcd::TEvEtcdKV::TEvRangeResponse::TPtr& ev) { Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. const auto request = *Request_->GetProtoRequest(); Reply(request); From 1be3d8ab606c9aebab2363d1cfa48eece0c1faaf Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 17:18:10 +0300 Subject: [PATCH 11/21] feat: add Put rpc implementation --- .../grpc_services/etcd/api/kv/rpc_put.cpp | 76 +++++++++++++++++++ ydb/core/grpc_services/ya.make | 1 + ydb/services/etcd/api/kv/grpc_service.cpp | 2 +- 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp new file mode 100644 index 000000000000..de1cd84c369a --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp @@ -0,0 +1,76 @@ +#include "service.h" + +#include +#include +#include + +#include +#include +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvPutRequest = TGrpcRequestNoOperationCall<::etcdserverpb::PutRequest, ::etcdserverpb::PutResponse>; + +class TPutRequestRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TPutRequestRPC(TEvPutRequest* request) + : Request_(request) + {} + + void Bootstrap() { + Become(&TPutRequestRPC::StateFunc); + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NYdb::NEtcd::TEvEtcdKV::TEvPutResponse, Handle); + ) + + void Handle(NYdb::NEtcd::TEvEtcdKV::TEvPutResponse::TPtr& ev) { + Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + + void Reply(const ::etcdserverpb::PutRequest& request) { + auto response = etcdserverpb::PutResponse{}; + + // TODO [pavelbezpravel]: WIP. + Y_UNUSED(request); + + Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); + + // TODO [pavelbezpravel]: introduce Finish() method. + PassAway(); + } + +private: + std::unique_ptr Request_; +}; + +} // namespace + +namespace NEtcd { + +void DoPut(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TPutRequestRPC(req)); +} + +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 7573a81edf71..d057fe6fc315 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -5,6 +5,7 @@ SRCS( audit_dml_operations.cpp db_metadata_cache.h + etcd/api/kv/rpc_put.cpp etcd/api/kv/rpc_range.cpp grpc_endpoint_publish_actor.cpp diff --git a/ydb/services/etcd/api/kv/grpc_service.cpp b/ydb/services/etcd/api/kv/grpc_service.cpp index 194bc7d73473..1f3234677238 100644 --- a/ydb/services/etcd/api/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/kv/grpc_service.cpp @@ -51,7 +51,7 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { // TODO [pavelbezpravel]: WIP. ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); - // ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); + ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); // ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); // ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); // ADD_REQUEST(Compact, CompactionRequest, CompactionResponse, DoCompact); From 2aba8ecf96ae413820d905bd79f894137d5de9a9 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 17:24:36 +0300 Subject: [PATCH 12/21] fix: minor renaming --- ydb/core/etcd/kv/events.h | 8 ++++---- ydb/core/etcd/kv/proto.h | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ydb/core/etcd/kv/events.h b/ydb/core/etcd/kv/events.h index 6886541fb6aa..2296fad33e73 100644 --- a/ydb/core/etcd/kv/events.h +++ b/ydb/core/etcd/kv/events.h @@ -14,7 +14,7 @@ struct TEvEtcdKV { EvCreateTableResponse = EventSpaceBegin(NKikimr::TKikimrEvents::ES_ETCD_KV), EvRangeResponse, EvPutResponse, - EvDeleteResponse, + EvDeleteRangeResponse, EvTxnCompareResponse, EvTxnResponse, @@ -60,8 +60,8 @@ struct TEvEtcdKV { TPutResponse Response; }; - struct TEvDeleteResponse : public NActors::TEventLocal { - TEvDeleteResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TDeleteResponse&& response) + struct TEvDeleteRangeResponse : public NActors::TEventLocal { + TEvDeleteRangeResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TDeleteRangeResponse&& response) : Status(status) , Issues(issues) , TxId(std::move(txId)) @@ -72,7 +72,7 @@ struct TEvEtcdKV { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; TString TxId; - TDeleteResponse Response; + TDeleteRangeResponse Response; }; struct TEvTxnCompareResponse : public NActors::TEventLocal { diff --git a/ydb/core/etcd/kv/proto.h b/ydb/core/etcd/kv/proto.h index b03dff688c64..c58c304c060c 100644 --- a/ydb/core/etcd/kv/proto.h +++ b/ydb/core/etcd/kv/proto.h @@ -70,13 +70,13 @@ struct TPutResponse { TVector PrevKvs; }; -struct TDeleteRequest { +struct TDeleteRangeRequest { TString Key; TString RangeEnd; bool PrevKv; }; -struct TDeleteResponse { +struct TDeleteRangeResponse { size_t Deleted; TVector PrevKvs; }; @@ -86,7 +86,7 @@ struct TTxnRequest; using TRequestOp = std::variant< std::shared_ptr, std::shared_ptr, - std::shared_ptr, + std::shared_ptr, std::shared_ptr >; @@ -95,7 +95,7 @@ struct TTxnResponse; using TResponseOp = std::variant< std::shared_ptr, std::shared_ptr, - std::shared_ptr, + std::shared_ptr, std::shared_ptr >; From 876fd0e59c8d24bc162f3a6387c93d9d896c752f Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 17:25:45 +0300 Subject: [PATCH 13/21] feat: add DeleteRange rpc implementation --- .../etcd/api/kv/rpc_delete_range.cpp | 76 +++++++++++++++++++ ydb/core/grpc_services/ya.make | 1 + ydb/services/etcd/api/kv/grpc_service.cpp | 2 +- 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp new file mode 100644 index 000000000000..67a4671e865f --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp @@ -0,0 +1,76 @@ +#include "service.h" + +#include +#include +#include + +#include +#include +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvDeleteDeleteRangeRequest = TGrpcRequestNoOperationCall<::etcdserverpb::DeleteRangeRequest, ::etcdserverpb::DeleteRangeResponse>; + +class TDeleteRangeRequestRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TDeleteRangeRequestRPC(TEvDeleteDeleteRangeRequest* request) + : Request_(request) + {} + + void Bootstrap() { + Become(&TDeleteRangeRequestRPC::StateFunc); + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NYdb::NEtcd::TEvEtcdKV::TEvDeleteRangeResponse, Handle); + ) + + void Handle(NYdb::NEtcd::TEvEtcdKV::TEvDeleteRangeResponse::TPtr& ev) { + Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + + void Reply(const ::etcdserverpb::DeleteRangeRequest& request) { + auto response = etcdserverpb::DeleteRangeResponse{}; + + // TODO [pavelbezpravel]: WIP. + Y_UNUSED(request); + + Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); + + // TODO [pavelbezpravel]: introduce Finish() method. + PassAway(); + } + +private: + std::unique_ptr Request_; +}; + +} // namespace + +namespace NEtcd { + +void DoDeleteRange(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TDeleteRangeRequestRPC(req)); +} + +} + +} // namespace NKikimr::NGRpcService \ No newline at end of file diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index d057fe6fc315..e28115f9dda8 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -5,6 +5,7 @@ SRCS( audit_dml_operations.cpp db_metadata_cache.h + etcd/api/kv/delete_range.cpp etcd/api/kv/rpc_put.cpp etcd/api/kv/rpc_range.cpp diff --git a/ydb/services/etcd/api/kv/grpc_service.cpp b/ydb/services/etcd/api/kv/grpc_service.cpp index 1f3234677238..174ca31297ab 100644 --- a/ydb/services/etcd/api/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/kv/grpc_service.cpp @@ -52,7 +52,7 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); - // ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); + ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); // ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); // ADD_REQUEST(Compact, CompactionRequest, CompactionResponse, DoCompact); From 70c99ced7518a1c5ff17f42647bdfb3a562f064e Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 17:30:48 +0300 Subject: [PATCH 14/21] feat: add Txn rpc implementation --- .../grpc_services/etcd/api/kv/rpc_txn.cpp | 76 +++++++++++++++++++ ydb/core/grpc_services/ya.make | 1 + ydb/services/etcd/api/kv/grpc_service.cpp | 2 +- 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp new file mode 100644 index 000000000000..40289c874532 --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp @@ -0,0 +1,76 @@ +#include "service.h" + +#include +#include +#include + +#include +#include +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvTxnRequest = TGrpcRequestNoOperationCall<::etcdserverpb::TxnRequest, ::etcdserverpb::TxnResponse>; + +class TTxnRequestRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TTxnRequestRPC(TEvTxnRequest* request) + : Request_(request) + {} + + void Bootstrap() { + Become(&TTxnRequestRPC::StateFunc); + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NYdb::NEtcd::TEvEtcdKV::TEvTxnResponse, Handle); + ) + + void Handle(NYdb::NEtcd::TEvEtcdKV::TEvTxnResponse::TPtr& ev) { + Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + + void Reply(const ::etcdserverpb::TxnRequest& request) { + auto response = etcdserverpb::TxnResponse{}; + + // TODO [pavelbezpravel]: WIP. + Y_UNUSED(request); + + Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); + + // TODO [pavelbezpravel]: introduce Finish() method. + PassAway(); + } + +private: + std::unique_ptr Request_; +}; + +} // namespace + +namespace NEtcd { + +void DoTxn(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TTxnRequestRPC(req)); +} + +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index e28115f9dda8..d8bf32e766e2 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -8,6 +8,7 @@ SRCS( etcd/api/kv/delete_range.cpp etcd/api/kv/rpc_put.cpp etcd/api/kv/rpc_range.cpp + etcd/api/kv/rpc_txn.cpp grpc_endpoint_publish_actor.cpp grpc_helper.cpp diff --git a/ydb/services/etcd/api/kv/grpc_service.cpp b/ydb/services/etcd/api/kv/grpc_service.cpp index 174ca31297ab..16a6010bf656 100644 --- a/ydb/services/etcd/api/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/kv/grpc_service.cpp @@ -53,7 +53,7 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); - // ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); + ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); // ADD_REQUEST(Compact, CompactionRequest, CompactionResponse, DoCompact); #undef ADD_REQUEST From f01ad3844f9c63fa23ae483b267d741928b5b81a Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 17:31:53 +0300 Subject: [PATCH 15/21] fix: correct filename --- ydb/core/grpc_services/ya.make | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index d8bf32e766e2..29749301c541 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -5,7 +5,7 @@ SRCS( audit_dml_operations.cpp db_metadata_cache.h - etcd/api/kv/delete_range.cpp + etcd/api/kv/rpc_delete_range.cpp etcd/api/kv/rpc_put.cpp etcd/api/kv/rpc_range.cpp etcd/api/kv/rpc_txn.cpp From aeca4de185948fd0332118b909c4b6162df751a9 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 21:10:12 +0300 Subject: [PATCH 16/21] fix: add concept as possible solution for filling etcd's responses --- ydb/core/grpc_services/base/base.h | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 066b4f1d574a..5a30224149e7 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -30,12 +30,6 @@ #include -namespace etcdserverpb { - -class RangeResponse; - -} - namespace NKikimr { namespace NSchemeCache { @@ -461,8 +455,21 @@ class IRequestOpCtx : public IRequestCtx { class IRequestNoOpCtx : public IRequestCtx { }; +template +concept YdbProto = requires { + typename T::status; + typename T::issues; +}; + +template +concept EtcdProto = requires { + !YdbProto; +}; + struct TCommonResponseFillerImpl { - static void FillImpl(::etcdserverpb::RangeResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { + // TODO [pavelbezpravel]: it's not the best solution, but it works. + template requires EtcdProto + static void FillImpl(T& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { Y_UNUSED(resp); Y_UNUSED(issues); Y_UNUSED(status); @@ -477,10 +484,6 @@ struct TCommonResponseFillerImpl { template struct TCommonResponseFiller : private TCommonResponseFillerImpl { - static void Fill(::etcdserverpb::RangeResponse& resp, const NYql::TIssues& issues, Ydb::CostInfo* costInfo, Ydb::StatusIds::StatusCode status) { - Y_UNUSED(costInfo); - FillImpl(resp, issues, status); - } static void Fill(TResp& resp, const NYql::TIssues& issues, Ydb::CostInfo* costInfo, Ydb::StatusIds::StatusCode status) { auto& operation = *resp.mutable_operation(); operation.set_ready(true); From 5eba5c9a3b1b7e95359407ea2e6da20c2f8dba61 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 21:11:11 +0300 Subject: [PATCH 17/21] maintenance: minor changes --- ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp index 199507c41efa..3063869b23d4 100644 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp @@ -49,9 +49,7 @@ class TRangeRequestRPC : public TActorBootstrapped { auto response = etcdserverpb::RangeResponse{}; // TODO [pavelbezpravel]: WIP. - auto* kv = response.add_kvs(); - kv->set_key(request.key()); - kv->set_value("TODO [pavelbezpravel]: Range stub."); + Y_UNUSED(request); Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); From be28c81a75a13d87ce22d134ec499cc164bde5d0 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 21:32:11 +0300 Subject: [PATCH 18/21] feat: add Compact rpc implementation --- ydb/core/etcd/kv/events.h | 16 ++++ ydb/core/etcd/kv/proto.h | 3 + .../grpc_services/etcd/api/kv/rpc_compact.cpp | 76 +++++++++++++++++++ ydb/core/grpc_services/ya.make | 1 + ydb/services/etcd/api/kv/grpc_service.cpp | 2 +- 5 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp diff --git a/ydb/core/etcd/kv/events.h b/ydb/core/etcd/kv/events.h index 2296fad33e73..31d01401c199 100644 --- a/ydb/core/etcd/kv/events.h +++ b/ydb/core/etcd/kv/events.h @@ -17,6 +17,7 @@ struct TEvEtcdKV { EvDeleteRangeResponse, EvTxnCompareResponse, EvTxnResponse, + EvCompactionResponse, EvEnd }; @@ -104,6 +105,21 @@ struct TEvEtcdKV { TString TxId; TTxnResponse Response; }; + + struct TEvCompactionResponse : public NActors::TEventLocal { + TEvCompactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TCompactionResponse&& response) + : Status(status) + , Issues(issues) + , TxId(std::move(txId)) + , Response(response) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + TString TxId; + TCompactionResponse Response; + }; }; } // namespace NYdb::NEtcd diff --git a/ydb/core/etcd/kv/proto.h b/ydb/core/etcd/kv/proto.h index c58c304c060c..6cca02cda259 100644 --- a/ydb/core/etcd/kv/proto.h +++ b/ydb/core/etcd/kv/proto.h @@ -129,4 +129,7 @@ struct TTxnResponse { TVector Responses; }; +// TODO [pavelbezpravel]: WIP. +struct TCompactionResponse {}; + } // namespace NYdb::NEtcd diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp new file mode 100644 index 000000000000..aa563fad6809 --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp @@ -0,0 +1,76 @@ +#include "service.h" + +#include +#include +#include + +#include +#include +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvCompactionRequest = TGrpcRequestNoOperationCall<::etcdserverpb::CompactionRequest, ::etcdserverpb::CompactionResponse>; + +class TCompactionRequestRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TCompactionRequestRPC(TEvCompactionRequest* request) + : Request_(request) + {} + + void Bootstrap() { + Become(&TCompactionRequestRPC::StateFunc); + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NYdb::NEtcd::TEvEtcdKV::TEvCompactionResponse, Handle); + ) + + void Handle(NYdb::NEtcd::TEvEtcdKV::TEvCompactionResponse::TPtr& ev) { + Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. + const auto request = *Request_->GetProtoRequest(); + Reply(request); + } + + void Reply(const ::etcdserverpb::CompactionRequest& request) { + auto response = etcdserverpb::CompactionResponse{}; + + // TODO [pavelbezpravel]: WIP. + Y_UNUSED(request); + + Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); + + // TODO [pavelbezpravel]: introduce Finish() method. + PassAway(); + } + +private: + std::unique_ptr Request_; +}; + +} // namespace + +namespace NEtcd { + +void DoCompact(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TCompactionRequestRPC(req)); +} + +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 29749301c541..fd6383c9f890 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -5,6 +5,7 @@ SRCS( audit_dml_operations.cpp db_metadata_cache.h + etcd/api/kv/rpc_compact.cpp etcd/api/kv/rpc_delete_range.cpp etcd/api/kv/rpc_put.cpp etcd/api/kv/rpc_range.cpp diff --git a/ydb/services/etcd/api/kv/grpc_service.cpp b/ydb/services/etcd/api/kv/grpc_service.cpp index 16a6010bf656..bb1da2901725 100644 --- a/ydb/services/etcd/api/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/kv/grpc_service.cpp @@ -54,7 +54,7 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); ADD_REQUEST(Txn, TxnRequest, TxnResponse, DoTxn); - // ADD_REQUEST(Compact, CompactionRequest, CompactionResponse, DoCompact); + ADD_REQUEST(Compact, CompactionRequest, CompactionResponse, DoCompact); #undef ADD_REQUEST } From d0e2295a06cf0869983318c7d99ac551e6669c04 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Fri, 5 Apr 2024 21:32:44 +0300 Subject: [PATCH 19/21] maintenance: format --- ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp | 2 +- ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp | 2 +- ydb/services/etcd/api/kv/grpc_service.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp index 67a4671e865f..77cd8f99f984 100644 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp @@ -73,4 +73,4 @@ void DoDeleteRange(std::unique_ptr p, const IFacilityProvider& } -} // namespace NKikimr::NGRpcService \ No newline at end of file +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp index 3063869b23d4..4603588fd4a1 100644 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp +++ b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp @@ -73,4 +73,4 @@ void DoRange(std::unique_ptr p, const IFacilityProvider& f) { } -} // namespace NKikimr::NGRpcService \ No newline at end of file +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/etcd/api/kv/grpc_service.h b/ydb/services/etcd/api/kv/grpc_service.h index 953b5c90d703..f565ea830d80 100644 --- a/ydb/services/etcd/api/kv/grpc_service.h +++ b/ydb/services/etcd/api/kv/grpc_service.h @@ -32,4 +32,4 @@ class TGRpcEtcdKVService : public TGrpcServiceBase const size_t HandlersPerCompletionQueue; }; -} // namespace NKikimr::NGRpcService \ No newline at end of file +} // namespace NKikimr::NGRpcService From 223f762b4a8f12ce90dc64c3d3d04031684515e2 Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Sun, 7 Apr 2024 19:59:38 +0300 Subject: [PATCH 20/21] maintenance: remove TODO --- ydb/services/etcd/api/kv/grpc_service.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/ydb/services/etcd/api/kv/grpc_service.cpp b/ydb/services/etcd/api/kv/grpc_service.cpp index bb1da2901725..9fa507fb92b1 100644 --- a/ydb/services/etcd/api/kv/grpc_service.cpp +++ b/ydb/services/etcd/api/kv/grpc_service.cpp @@ -48,8 +48,6 @@ void TGRpcEtcdKVService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { } \ } - // TODO [pavelbezpravel]: WIP. - ADD_REQUEST(Range, RangeRequest, RangeResponse, DoRange); ADD_REQUEST(Put, PutRequest, PutResponse, DoPut); ADD_REQUEST(DeleteRange, DeleteRangeRequest, DeleteRangeResponse, DoDeleteRange); From 9fe1a26ee21914a848a14959d6876b0e80a4824d Mon Sep 17 00:00:00 2001 From: pavelbezpravel Date: Sun, 7 Apr 2024 20:02:23 +0300 Subject: [PATCH 21/21] feat: refactor actor --- .../grpc_services/etcd/api/kv/rpc_compact.cpp | 76 --------- .../etcd/api/kv/rpc_delete_range.cpp | 76 --------- .../grpc_services/etcd/api/kv/rpc_put.cpp | 76 --------- .../grpc_services/etcd/api/kv/rpc_range.cpp | 76 --------- .../grpc_services/etcd/api/kv/rpc_txn.cpp | 76 --------- .../grpc_services/etcd/api/kv/service.cpp | 156 ++++++++++++++++++ ydb/core/grpc_services/ya.make | 6 +- 7 files changed, 157 insertions(+), 385 deletions(-) delete mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp delete mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp delete mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp delete mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp delete mode 100644 ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp create mode 100644 ydb/core/grpc_services/etcd/api/kv/service.cpp diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp deleted file mode 100644 index aa563fad6809..000000000000 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_compact.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "service.h" - -#include -#include -#include - -#include -#include -#include - -#include - -namespace NKikimr::NGRpcService { - -namespace { - -using namespace NActors; - -using TEvCompactionRequest = TGrpcRequestNoOperationCall<::etcdserverpb::CompactionRequest, ::etcdserverpb::CompactionResponse>; - -class TCompactionRequestRPC : public TActorBootstrapped { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - TCompactionRequestRPC(TEvCompactionRequest* request) - : Request_(request) - {} - - void Bootstrap() { - Become(&TCompactionRequestRPC::StateFunc); - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - -private: - STRICT_STFUNC(StateFunc, - hFunc(NYdb::NEtcd::TEvEtcdKV::TEvCompactionResponse, Handle); - ) - - void Handle(NYdb::NEtcd::TEvEtcdKV::TEvCompactionResponse::TPtr& ev) { - Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - - void Reply(const ::etcdserverpb::CompactionRequest& request) { - auto response = etcdserverpb::CompactionResponse{}; - - // TODO [pavelbezpravel]: WIP. - Y_UNUSED(request); - - Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); - - // TODO [pavelbezpravel]: introduce Finish() method. - PassAway(); - } - -private: - std::unique_ptr Request_; -}; - -} // namespace - -namespace NEtcd { - -void DoCompact(std::unique_ptr p, const IFacilityProvider& f) { - auto* req = dynamic_cast(p.release()); - Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); - f.RegisterActor(new TCompactionRequestRPC(req)); -} - -} - -} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp deleted file mode 100644 index 77cd8f99f984..000000000000 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_delete_range.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "service.h" - -#include -#include -#include - -#include -#include -#include - -#include - -namespace NKikimr::NGRpcService { - -namespace { - -using namespace NActors; - -using TEvDeleteDeleteRangeRequest = TGrpcRequestNoOperationCall<::etcdserverpb::DeleteRangeRequest, ::etcdserverpb::DeleteRangeResponse>; - -class TDeleteRangeRequestRPC : public TActorBootstrapped { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - TDeleteRangeRequestRPC(TEvDeleteDeleteRangeRequest* request) - : Request_(request) - {} - - void Bootstrap() { - Become(&TDeleteRangeRequestRPC::StateFunc); - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - -private: - STRICT_STFUNC(StateFunc, - hFunc(NYdb::NEtcd::TEvEtcdKV::TEvDeleteRangeResponse, Handle); - ) - - void Handle(NYdb::NEtcd::TEvEtcdKV::TEvDeleteRangeResponse::TPtr& ev) { - Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - - void Reply(const ::etcdserverpb::DeleteRangeRequest& request) { - auto response = etcdserverpb::DeleteRangeResponse{}; - - // TODO [pavelbezpravel]: WIP. - Y_UNUSED(request); - - Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); - - // TODO [pavelbezpravel]: introduce Finish() method. - PassAway(); - } - -private: - std::unique_ptr Request_; -}; - -} // namespace - -namespace NEtcd { - -void DoDeleteRange(std::unique_ptr p, const IFacilityProvider& f) { - auto* req = dynamic_cast(p.release()); - Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); - f.RegisterActor(new TDeleteRangeRequestRPC(req)); -} - -} - -} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp deleted file mode 100644 index de1cd84c369a..000000000000 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_put.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "service.h" - -#include -#include -#include - -#include -#include -#include - -#include - -namespace NKikimr::NGRpcService { - -namespace { - -using namespace NActors; - -using TEvPutRequest = TGrpcRequestNoOperationCall<::etcdserverpb::PutRequest, ::etcdserverpb::PutResponse>; - -class TPutRequestRPC : public TActorBootstrapped { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - TPutRequestRPC(TEvPutRequest* request) - : Request_(request) - {} - - void Bootstrap() { - Become(&TPutRequestRPC::StateFunc); - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - -private: - STRICT_STFUNC(StateFunc, - hFunc(NYdb::NEtcd::TEvEtcdKV::TEvPutResponse, Handle); - ) - - void Handle(NYdb::NEtcd::TEvEtcdKV::TEvPutResponse::TPtr& ev) { - Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - - void Reply(const ::etcdserverpb::PutRequest& request) { - auto response = etcdserverpb::PutResponse{}; - - // TODO [pavelbezpravel]: WIP. - Y_UNUSED(request); - - Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); - - // TODO [pavelbezpravel]: introduce Finish() method. - PassAway(); - } - -private: - std::unique_ptr Request_; -}; - -} // namespace - -namespace NEtcd { - -void DoPut(std::unique_ptr p, const IFacilityProvider& f) { - auto* req = dynamic_cast(p.release()); - Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); - f.RegisterActor(new TPutRequestRPC(req)); -} - -} - -} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp deleted file mode 100644 index 4603588fd4a1..000000000000 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_range.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "service.h" - -#include -#include -#include - -#include -#include -#include - -#include - -namespace NKikimr::NGRpcService { - -namespace { - -using namespace NActors; - -using TEvRangeRequest = TGrpcRequestNoOperationCall<::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>; - -class TRangeRequestRPC : public TActorBootstrapped { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - TRangeRequestRPC(TEvRangeRequest* request) - : Request_(request) - {} - - void Bootstrap() { - Become(&TRangeRequestRPC::StateFunc); - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - -private: - STRICT_STFUNC(StateFunc, - hFunc(NYdb::NEtcd::TEvEtcdKV::TEvRangeResponse, Handle); - ) - - void Handle(NYdb::NEtcd::TEvEtcdKV::TEvRangeResponse::TPtr& ev) { - Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - - void Reply(const ::etcdserverpb::RangeRequest& request) { - auto response = etcdserverpb::RangeResponse{}; - - // TODO [pavelbezpravel]: WIP. - Y_UNUSED(request); - - Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); - - // TODO [pavelbezpravel]: introduce Finish() method. - PassAway(); - } - -private: - std::unique_ptr Request_; -}; - -} // namespace - -namespace NEtcd { - -void DoRange(std::unique_ptr p, const IFacilityProvider& f) { - auto* req = dynamic_cast(p.release()); - Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); - f.RegisterActor(new TRangeRequestRPC(req)); -} - -} - -} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp b/ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp deleted file mode 100644 index 40289c874532..000000000000 --- a/ydb/core/grpc_services/etcd/api/kv/rpc_txn.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "service.h" - -#include -#include -#include - -#include -#include -#include - -#include - -namespace NKikimr::NGRpcService { - -namespace { - -using namespace NActors; - -using TEvTxnRequest = TGrpcRequestNoOperationCall<::etcdserverpb::TxnRequest, ::etcdserverpb::TxnResponse>; - -class TTxnRequestRPC : public TActorBootstrapped { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - TTxnRequestRPC(TEvTxnRequest* request) - : Request_(request) - {} - - void Bootstrap() { - Become(&TTxnRequestRPC::StateFunc); - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - -private: - STRICT_STFUNC(StateFunc, - hFunc(NYdb::NEtcd::TEvEtcdKV::TEvTxnResponse, Handle); - ) - - void Handle(NYdb::NEtcd::TEvEtcdKV::TEvTxnResponse::TPtr& ev) { - Y_UNUSED(ev); // TODO [pavelbezpravel]: WIP. - const auto request = *Request_->GetProtoRequest(); - Reply(request); - } - - void Reply(const ::etcdserverpb::TxnRequest& request) { - auto response = etcdserverpb::TxnResponse{}; - - // TODO [pavelbezpravel]: WIP. - Y_UNUSED(request); - - Request_->SendSerializedResult(std::move(response.SerializeAsString()), Ydb::StatusIds::SUCCESS); - - // TODO [pavelbezpravel]: introduce Finish() method. - PassAway(); - } - -private: - std::unique_ptr Request_; -}; - -} // namespace - -namespace NEtcd { - -void DoTxn(std::unique_ptr p, const IFacilityProvider& f) { - auto* req = dynamic_cast(p.release()); - Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); - f.RegisterActor(new TTxnRequestRPC(req)); -} - -} - -} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/etcd/api/kv/service.cpp b/ydb/core/grpc_services/etcd/api/kv/service.cpp new file mode 100644 index 000000000000..451e8738ce41 --- /dev/null +++ b/ydb/core/grpc_services/etcd/api/kv/service.cpp @@ -0,0 +1,156 @@ +#include "service.h" + +#include +#include +#include + +#include +#include +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +etcdserverpb::RangeResponse FillResponse(const NYdb::NEtcd::TRangeResponse& response) { + Y_UNUSED(response); + return {}; +} + +etcdserverpb::PutResponse FillResponse(const NYdb::NEtcd::TPutResponse& response) { + Y_UNUSED(response); + return {}; +} + +etcdserverpb::DeleteRangeResponse FillResponse(const NYdb::NEtcd::TDeleteRangeResponse& response) { + Y_UNUSED(response); + return {}; +} + +etcdserverpb::TxnResponse FillResponse(const NYdb::NEtcd::TTxnResponse& response) { + Y_UNUSED(response); + return {}; + +} + +etcdserverpb::CompactionResponse FillResponse(const NYdb::NEtcd::TCompactionResponse& response) { + Y_UNUSED(response); + return {}; +} + +template +class TEtcdKVRequestRPC : public TActorBootstrapped> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TEtcdKVRequestRPC(TEvRequestType* request) + : Request_(request) {} + + void Bootstrap() { + this->Become(&TEtcdKVRequestRPC::StateFunc); + // TODO [pavelbezpravel]: WIP. + + this->Send(this->SelfId(), new TEvResponseType({}, {}, {}, {})); + // auto req = *Request_->GetProtoRequest(); + // proto -> struct + // this->Register(CreateKvActor(..., struct)); + } + +private: + STRICT_STFUNC(StateFunc, hFunc(TEvResponseType, Handle)) + void Handle(TEvResponseType::TPtr& ev) { + // if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + // Request_->ReplyWithYdbStatus(Ydb::StatusIds::BAD_REQUEST); + // } + // else + Request_->SendSerializedResult(std::move(FillResponse(ev->Get()->Response).SerializeAsString()), Ydb::StatusIds::SUCCESS); + this->PassAway(); + } + +private: + std::unique_ptr Request_; +}; + +} // namespace + +namespace NEtcd { + +using TEvRangeRequest = TGrpcRequestNoOperationCall< + ::etcdserverpb::RangeRequest, + ::etcdserverpb::RangeResponse>; + +using TRangeRPC = TEtcdKVRequestRPC< + TEvRangeRequest, + NYdb::NEtcd::TEvEtcdKV::TEvRangeResponse>; + +void DoRange(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TRangeRPC(req)); +} + +using TEvPutRequest = TGrpcRequestNoOperationCall< + ::etcdserverpb::PutRequest, + ::etcdserverpb::PutResponse>; + +using TPutRPC = TEtcdKVRequestRPC< + TEvPutRequest, + NYdb::NEtcd::TEvEtcdKV::TEvPutResponse>; + +void DoPut(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TPutRPC(req)); +} + +using TEvDeleteRangeRequest = TGrpcRequestNoOperationCall< + ::etcdserverpb::DeleteRangeRequest, + ::etcdserverpb::DeleteRangeResponse>; + +using TDeleteRangeRPC = TEtcdKVRequestRPC< + TEvDeleteRangeRequest, + NYdb::NEtcd::TEvEtcdKV::TEvDeleteRangeResponse>; + +void DoDeleteRange(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TDeleteRangeRPC(req)); +} + +using TEvTxnRequest = TGrpcRequestNoOperationCall< + ::etcdserverpb::TxnRequest, + ::etcdserverpb::TxnResponse>; + +using TTxnRPC = TEtcdKVRequestRPC< + TEvTxnRequest, + NYdb::NEtcd::TEvEtcdKV::TEvTxnResponse>; + +void DoTxn(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TTxnRPC(req)); +} + +using TEvCompactionRequest = TGrpcRequestNoOperationCall< + ::etcdserverpb::CompactionRequest, + ::etcdserverpb::CompactionResponse>; + +using TCompactRPC = TEtcdKVRequestRPC< + TEvCompactionRequest, + NYdb::NEtcd::TEvEtcdKV::TEvCompactionResponse>; + +void DoCompact(std::unique_ptr p, const IFacilityProvider& f) { + auto* req = dynamic_cast(p.release()); + Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TCompactRPC(req)); +} + +} // namespace NEtcd + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index fd6383c9f890..78e75bb56366 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -5,11 +5,7 @@ SRCS( audit_dml_operations.cpp db_metadata_cache.h - etcd/api/kv/rpc_compact.cpp - etcd/api/kv/rpc_delete_range.cpp - etcd/api/kv/rpc_put.cpp - etcd/api/kv/rpc_range.cpp - etcd/api/kv/rpc_txn.cpp + etcd/api/kv/service.cpp grpc_endpoint_publish_actor.cpp grpc_helper.cpp