Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ping service #12100

Merged
merged 4 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
#include <ydb/services/backup/grpc_service.h>
#include <ydb/services/ydb/ydb_logstore.h>
#include <ydb/services/ydb/ydb_operation.h>
#include <ydb/services/ydb/ydb_debug.h>
#include <ydb/services/ydb/ydb_query.h>
#include <ydb/services/ydb/ydb_scheme.h>
#include <ydb/services/ydb/ydb_scripting.h>
Expand Down Expand Up @@ -881,6 +882,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}

server.AddService(new NGRpcService::TGRpcYdbDebugService(ActorSystem.Get(), Counters,
grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));

if (hasLogStore) {
server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters,
grpcRequestProxies[0], hasLogStore.IsRlAllowed()));
Expand Down
251 changes: 251 additions & 0 deletions ydb/core/grpc_services/rpc_ping.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
#include "service_debug.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

#include <ydb/public/api/protos/ydb_debug.pb.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;

namespace {

using namespace NActors;

////////////////////////////////////////////////////////////////////////////////

using TEvKqpProxyRequest = TGrpcRequestNoOperationCall<Debug::KqpProxyRequest, Debug::KqpProxyResponse>;

class TExecuteKqpPingRPC : public TActorBootstrapped<TExecuteKqpPingRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteKqpPingRPC(TEvKqpProxyRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(NKqp::TEvKqp::TEvProxyPingResponse, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to KQP proxy");
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), new NKqp::TEvKqp::TEvProxyPingRequest())) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(NKqp::TEvKqp::TEvProxyPingResponse::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteKqpPingRPC in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvKqpProxyRequest> Request_;
};

////////////////////////////////////////////////////////////////////////////////

using TEvSchemeCacheRequest = TGrpcRequestNoOperationCall<Debug::SchemeCacheRequest, Debug::SchemeCacheResponse>;

class TExecuteSchemeCachePingRPC : public TActorBootstrapped<TExecuteSchemeCachePingRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteSchemeCachePingRPC(TEvSchemeCacheRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to SchemeCache");

auto* request = new TEvTxProxySchemeCache::TEvNavigateKeySet(new NSchemeCache::TSchemeCacheNavigate());
if (!ctx.Send(MakeSchemeCacheID(), request)) {
LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to SchemeCache");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from SchemeCache");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteSchemeCachePingRPC in state " << state <<
" received unexpected event " << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvSchemeCacheRequest> Request_;
};

////////////////////////////////////////////////////////////////////////////////

using TEvTxProxyRequest = TGrpcRequestNoOperationCall<Debug::TxProxyRequest, Debug::TxProxyResponse>;

class TExecuteTxProxyPingRPC : public TActorBootstrapped<TExecuteTxProxyPingRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::OTHER;
}

TExecuteTxProxyPingRPC(TEvTxProxyRequest* request)
: Request_(request)
{}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TThis::StateWork);

Proceed(ctx);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
try {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
default:
UnexpectedEvent(__func__, ev);
}
} catch (const yexception& ex) {
InternalError(ex.what());
}
}

void Proceed(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to TxProxy");
if (!ctx.Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Наверно, лучше отправлять EvGetProxyServicesRequest. Иначе, если слишком часто пинговать, можно истощить запас tx id-ов.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, что как раз хочется проверить именно с аллокацией. TxId сейчас аллоцируются по 5000. При наших нагрузках, они довольно быстро истощаются и достаточно часто (~раз в секунду в моём случае) ходим аллоцировать новый range (идём раньше истощения). Если пинговать 100-200 раз в секунду, то это не повлияет никак. Но если транзакций столько, что TxId всё время истощены, то ping это поймает.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Просто немного странно, что пинг будет иметь сайд-эффекты. Если так сделано не случайно, а специально, наверно норм. @snaury fyi

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я сначала тоже удивился, но потом подумал, и в tx proxy действительно всё так или иначе проходит через аллокацию, в этом смысле проверять работоспособность tx proxy действительно правильно через аллокацию TxId. С другой стороны там исторически раздельные плохие очереди, и AllocateTxId, так уж получилось, самый низкоприоритетный. Причём сам kqp этим запросом только недавно начал пользоваться, например в случае транзакций с топиками. Завёл про это #12147

LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to TxProxy");
ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx);
}
}

void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr&, const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from TxProxy");
ReplyWithResult(StatusIds::SUCCESS, ctx);
}

private:
void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) {
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}

void InternalError(const TString& message) {
ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}

void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TExecuteTxProxyPingRPC in state " << state << " received unexpected event "
<< ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}

private:
std::shared_ptr<TEvTxProxyRequest> Request_;
};

} // anonymous

////////////////////////////////////////////////////////////////////////////////

void DoGrpcProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
// we are in the GRPC proxy already (or in the check actor in case of auth check),
// thus ready to reply right here
using TRequest = TGrpcRequestNoOperationCall<Debug::GrpcProxyRequest, Debug::GrpcProxyResponse>;
TRequest* request = dynamic_cast<TRequest *>(p.get());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoGrpcProxyPing");
request->ReplyWithYdbStatus(StatusIds::SUCCESS);
}

void DoKqpPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvKqpProxyRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoKqpPing");
f.RegisterActor(new TExecuteKqpPingRPC(request));
}

void DoSchemeCachePing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvSchemeCacheRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoSchemeCachePing");
f.RegisterActor(new TExecuteSchemeCachePingRPC(request));
}

void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
auto* request = dynamic_cast<TEvTxProxyRequest*>(p.release());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoTxProxyPing");
f.RegisterActor(new TExecuteTxProxyPingRPC(request));
}

} // namespace NKikimr::NGRpcService
16 changes: 16 additions & 0 deletions ydb/core/grpc_services/service_debug.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <memory>

namespace NKikimr::NGRpcService {

class IRequestOpCtx;
class IRequestNoOpCtx;
class IFacilityProvider;

void DoGrpcProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoKqpPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoSchemeCachePing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
void DoTxProxyPing(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);

} // namespace NKikimr::NGRpcService
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ SRCS(
rpc_make_directory.cpp
rpc_modify_permissions.cpp
rpc_monitoring.cpp
rpc_ping.cpp
rpc_prepare_data_query.cpp
rpc_rate_limiter_api.cpp
rpc_read_columns.cpp
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ enum class ERequestType: size_t {

BSCONFIG_REPLACESTORAGECONFIG,
BSCONFIG_FETCHSTORAGECONFIG,
PING_GRPC,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Может быть стоит пустую строку добавить? Остальные группами идут и не смешиваются.

PING_PROXY,
PING_KQP,
PING_SCHEME_CACHE,
PING_TX_PROXY,

REQUEST_TYPES_CNT, // Add new types above this line
};
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ struct TEvKqp {
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};

struct TEvProxyPingRequest : public TEventLocal<TEvProxyPingRequest, TKqpEvents::EvProxyPingRequest> {
};

struct TEvProxyPingResponse : public TEventLocal<TEvProxyPingResponse, TKqpEvents::EvProxyPingResponse> {
};
};

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct TKqpEvents {
EvDelayedRequestError,
EvBufferWrite,
EvBufferWriteResult,
EvProxyPingRequest,
EvProxyPingResponse,
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
Send(ev->Sender, responseEv.Release(), 0, ev->Cookie);
}

void Handle(TEvKqp::TEvProxyPingRequest::TPtr& ev) {
Send(ev->Sender, new TEvKqp::TEvProxyPingResponse());
}

void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) {
if (!DatabasesCache.SetDatabaseIdOrDefer(ev, static_cast<i32>(EDelayedRequestType::QueryRequest), ActorContext())) {
return;
Expand Down Expand Up @@ -1346,6 +1350,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
hFunc(TEvKqp::TEvProxyPingRequest, Handle);
hFunc(TEvKqp::TEvQueryRequest, Handle);
hFunc(TEvKqp::TEvScriptRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/grpc/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SRCS(
ydb_auth_v1.proto
ydb_cms_v1.proto
ydb_coordination_v1.proto
ydb_debug_v1.proto
ydb_discovery_v1.proto
ydb_export_v1.proto
ydb_import_v1.proto
Expand Down
14 changes: 14 additions & 0 deletions ydb/public/api/grpc/ydb_debug_v1.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package Ydb.Debug.V1;
option java_package = "com.yandex.ydb.debug.v1";

import "ydb/public/api/protos/ydb_debug.proto";

service DebugService {
rpc PingPlainGrpc(Debug.PlainGrpcRequest) returns (Debug.PlainGrpcResponse);
rpc PingGrpcProxy(Debug.GrpcProxyRequest) returns (Debug.GrpcProxyResponse);
rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse);
rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse);
rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse);
}
1 change: 1 addition & 0 deletions ydb/public/api/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ SRCS(
ydb_federation_discovery.proto
persqueue_error_codes_v1.proto
ydb_auth.proto
ydb_debug.proto
ydb_persqueue_v1.proto
ydb_persqueue_cluster_discovery.proto
ydb_clickhouse_internal.proto
Expand Down
Loading
Loading