Skip to content

Commit

Permalink
Merge pull request #2 from pavelbezpravel/etcd-kv-service
Browse files Browse the repository at this point in the history
etcd kv service
  • Loading branch information
pavelbezpravel authored Apr 8, 2024
2 parents 2ca3d00 + 9fe1a26 commit 9fe47b5
Show file tree
Hide file tree
Showing 14 changed files with 583 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ struct TKikimrEvents : TEvents {
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
ES_ETCD_REVISION,
ES_ETCD_KV,
};
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ union TBasicKikimrServicesMask {

bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
bool EnableEtcdService:1;
};

struct {
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
#include <ydb/services/dynamic_config/grpc_service.h>
#include <ydb/services/datastreams/grpc_service.h>
#include <ydb/services/discovery/grpc_service.h>
#include <ydb/services/etcd/api/kv/grpc_service.h>
#include <ydb/services/fq/grpc_service.h>
#include <ydb/services/fq/private_grpc.h>
#include <ydb/services/kesus/grpc_service.h>
Expand Down Expand Up @@ -583,6 +584,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["query_service"] = &hasQueryService;
TServiceCfg hasKeyValue = services.empty();
names["keyvalue"] = &hasKeyValue;
TServiceCfg hasEtcdService = services.empty();
names["etcd_service"] = &hasEtcdService;

std::unordered_set<TString> enabled;
for (const auto& name : services) {
Expand Down Expand Up @@ -850,6 +853,12 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
server.AddService(service);
}
}

if (hasEtcdService) {
// TODO [pavelbezpravel]: add pther etcd services later.
server.AddService(new NGRpcService::TGRpcEtcdKVService(ActorSystem.Get(), Counters,
grpcRequestProxies, hasEtcdService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}
};

if (appConfig.HasGRpcConfig() && appConfig.GetGRpcConfig().GetStartGRpcProxy()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ PEERDIR(
ydb/services/dynamic_config
ydb/services/datastreams
ydb/services/discovery
ydb/services/etcd
ydb/services/fq
ydb/services/kesus
ydb/services/keyvalue
Expand Down
125 changes: 125 additions & 0 deletions ydb/core/etcd/kv/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#pragma once

#include "proto.h"

#include <ydb/core/base/events.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NYdb::NEtcd {

struct TEvEtcdKV {
// Event ids
enum EEv : ui32 {
EvCreateTableResponse = EventSpaceBegin(NKikimr::TKikimrEvents::ES_ETCD_KV),
EvRangeResponse,
EvPutResponse,
EvDeleteRangeResponse,
EvTxnCompareResponse,
EvTxnResponse,
EvCompactionResponse,

EvEnd
};

static_assert(
EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD_KV),
"expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_ETCD_KV)"
);

// Events
struct TEvCreateTableResponse : public NActors::TEventLocal<TEvCreateTableResponse, EvCreateTableResponse> {
};

struct TEvRangeResponse : public NActors::TEventLocal<TEvRangeResponse, EvRangeResponse> {
TEvRangeResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TRangeResponse&& response)
: Status(status)
, Issues(issues)
, TxId(std::move(txId))
, Response(response)
{
}

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString TxId;
TRangeResponse Response;
};

struct TEvPutResponse : public NActors::TEventLocal<TEvPutResponse, EvPutResponse> {
TEvPutResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TPutResponse&& response)
: Status(status)
, Issues(issues)
, TxId(std::move(txId))
, Response(response)
{
}

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString TxId;
TPutResponse Response;
};

struct TEvDeleteRangeResponse : public NActors::TEventLocal<TEvDeleteRangeResponse, EvDeleteRangeResponse> {
TEvDeleteRangeResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TDeleteRangeResponse&& response)
: Status(status)
, Issues(issues)
, TxId(std::move(txId))
, Response(response)
{
}

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString TxId;
TDeleteRangeResponse Response;
};

struct TEvTxnCompareResponse : public NActors::TEventLocal<TEvTxnCompareResponse, EvTxnCompareResponse> {
TEvTxnCompareResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TTxnCompareResponse&& response)
: Status(status)
, Issues(issues)
, TxId(std::move(txId))
, Response(response)
{
}

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString TxId;
TTxnCompareResponse Response;
};

struct TEvTxnResponse : public NActors::TEventLocal<TEvTxnResponse, EvTxnResponse> {
TEvTxnResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TTxnResponse&& response)
: Status(status)
, Issues(issues)
, TxId(std::move(txId))
, Response(response)
{
}

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString TxId;
TTxnResponse Response;
};

struct TEvCompactionResponse : public NActors::TEventLocal<TEvCompactionResponse, EvCompactionResponse> {
TEvCompactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues, TString txId, TCompactionResponse&& response)
: Status(status)
, Issues(issues)
, TxId(std::move(txId))
, Response(response)
{
}

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TString TxId;
TCompactionResponse Response;
};
};

} // namespace NYdb::NEtcd
135 changes: 135 additions & 0 deletions ydb/core/etcd/kv/proto.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#pragma once

#include <array>
#include <memory>
#include <variant>

#include <util/generic/maybe.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
#include <util/system/types.h>

#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

#include <ydb/public/api/protos/ydb_status_codes.pb.h>

namespace NYdb::NEtcd {

struct TKeyValue {
TString key;
i64 create_revision;
i64 mod_revision;
i64 version;
TString value;
};

struct TRangeRequest {
enum class ESortOrder {
NONE,
ASCEND,
DESCEND,
};
enum class ESortTarget {
KEY,
CREATE,
MOD,
VERSION,
VALUE,
};

TString key;
TString range_end;
size_t limit;
i64 revision;
ESortOrder sort_order;
ESortTarget sort_target;
bool serializable;
bool keys_only;
bool count_only;
i64 min_mod_revision;
i64 max_mod_revision;
i64 min_create_revision;
i64 max_create_revision;
};

struct TRangeResponse {
TVector<TKeyValue> Kvs;
bool More;
size_t Count;
};

struct TPutRequest {
TVector<std::pair<TString, TString>> Kvs;
bool PrevKv;
bool IgnoreValue;
};

struct TPutResponse {
TVector<TKeyValue> PrevKvs;
};

struct TDeleteRangeRequest {
TString Key;
TString RangeEnd;
bool PrevKv;
};

struct TDeleteRangeResponse {
size_t Deleted;
TVector<TKeyValue> PrevKvs;
};

struct TTxnRequest;

using TRequestOp = std::variant<
std::shared_ptr<TRangeRequest>,
std::shared_ptr<TPutRequest>,
std::shared_ptr<TDeleteRangeRequest>,
std::shared_ptr<TTxnRequest>
>;

struct TTxnResponse;

using TResponseOp = std::variant<
std::shared_ptr<TRangeResponse>,
std::shared_ptr<TPutResponse>,
std::shared_ptr<TDeleteRangeResponse>,
std::shared_ptr<TTxnResponse>
>;

struct TTxnCompareRequest {
enum class ECompareResult {
EQUAL,
GREATER,
LESS,
NOT_EQUAL,
};
ECompareResult Result;
TMaybe<i64> Target_create_revision;
TMaybe<i64> Target_mod_revision;
TMaybe<i64> Target_version;
TMaybe<TString> Target_value;
TString Key;
TString Range_end;
};

struct TTxnCompareResponse {
bool Succeeded;
};

struct TTxnRequest {
TVector<TTxnCompareRequest> Compare;
std::array<TVector<TRequestOp>, 2> Requests;
};

struct TTxnResponse {
bool Succeeded;
TVector<TResponseOp> Responses;
};

// TODO [pavelbezpravel]: WIP.
struct TCompactionResponse {};

} // namespace NYdb::NEtcd
19 changes: 19 additions & 0 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,26 @@ class IRequestOpCtx : public IRequestCtx {
class IRequestNoOpCtx : public IRequestCtx {
};

template <typename T>
concept YdbProto = requires {
typename T::status;
typename T::issues;
};

template <typename T>
concept EtcdProto = requires {
!YdbProto<T>;
};

struct TCommonResponseFillerImpl {
// TODO [pavelbezpravel]: it's not the best solution, but it works.
template <typename T> requires EtcdProto<T>
static void FillImpl(T& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
Y_UNUSED(resp);
Y_UNUSED(issues);
Y_UNUSED(status);
}

template <typename T>
static void FillImpl(T& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
resp.set_status(status);
Expand Down
Loading

0 comments on commit 9fe47b5

Please sign in to comment.